在本文中,我们将研究Java9反应流Reactive Streams。简单地说,我们将能够使用Flow
类,它包含用于构建反应流处理逻辑的主要构建块。
反应流是具有非阻塞背压的异步流处理的标准。这个规范在Reactive Manifesto中定义,有各种各样的实现,例如RxJava或Akka Streams。
反应式API概述
为了构建一个流,我们可以使用三个主要的抽象,并将它们组合成异步处理逻辑。
每个流都需要处理发布服务器实例向其发布的事件;发布服务器有一个方法–subscribe()
。
如果任何订阅者想要接收它发布的事件,他们需要订阅给定的发布者。
消息的接收者需要实现订阅接口。通常,这是每个流处理的结束,因为它的实例不会进一步发送消息。
我们可以把订户看作一个水槽。这有四个需要重写的方法:onSubscribe()
、onNext()
、onError()
和onComplete()
。
如果我们想转换传入的消息并将其进一步传递给下一个订户,我们需要实现处理器接口。它既充当订阅者,因为它接收消息,又充当发布者,因为它处理这些消息并将其发送以供进一步处理。
发布和使用消息
假设我们想要创建一个简单的流,在这个流中,我们有一个发布消息的发布者,一个简单的订阅者在消息到达时消费消息—一次一个。
让我们创建一个EndSubscriber
类。我们需要实现订户接口。接下来,我们将重写所需的方法。
在处理开始之前调用onSubscribe()
方法。订阅的实例作为参数传递。它是一个类,用于控制订阅服务器和发布服务器之间的消息流:
public class EndSubscriber<T> implements Subscriber<T> {
private Subscription subscription;
public List<T> consumedElements = new LinkedList<>();
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
}
我们还初始化了将在测试中使用的consumedElements
的空列表。
现在,我们需要从订户接口实现其余的方法。这里的主要方法是onNext()
–每当发布者发布新消息时,都会调用此方法:
@Override
public void onNext(T item) {
System.out.println("Got : " + item);
subscription.request(1);
}
注意,当我们在onSubscribe()
方法中启动订阅时,当我们处理一条消息时,我们需要调用订阅上的request()
方法,以表示当前订阅服务器准备使用更多消息。
最后,我们需要实现onError()
——在处理过程中抛出异常时调用,以及onComplete()
——在发布服务器关闭时调用:
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done");
}
让我们为处理流编写一个测试。我们将使用SubmissionPublisher
类–来自java.util.concurrent
–实现发布者接口。
我们将向发布者提交N个元素—我们的最终订户将收到:
@Test
public void whenSubscribeToIt_thenShouldConsumeAll()
throws InterruptedException {
// given
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
EndSubscriber<String> subscriber = new EndSubscriber<>();
publisher.subscribe(subscriber);
List<String> items = List.of("1", "x", "2", "x", "3", "x");
// when
assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
items.forEach(publisher::submit);
publisher.close();
// then
await().atMost(1000, TimeUnit.MILLISECONDS)
.until(
() -> assertThat(subscriber.consumedElements)
.containsExactlyElementsOf(items)
);
}
注意,我们在EndSubscriber
实例上调用close()
方法。它将在给定发布服务器的每个订阅服务器上调用下面的onComplete()
回调。
运行该程序将产生以下输出:
Got : 1
Got : x
Got : 2
Got : x
Got : 3
Got : x
Done
信息的转换
假设我们希望在发布者和订阅者之间构建类似的逻辑,但也应用一些转换。
我们将创建TransformProcessor
类来实现Processor
并扩展SubmissionPublisher
,因为它将同时是Publisher
和Subscriber
。
我们将传入一个将输入转换为输出的函数:
public class TransformProcessor<T, R>
extends SubmissionPublisher<R>
implements Flow.Processor<T, R> {
private Function<T, R> function;
private Flow.Subscription subscription;
public TransformProcessor(Function<T, R> function) {
super();
this.function = function;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item) {
submit(function.apply(item));
subscription.request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
close();
}
}
现在,让我们用一个处理流编写一个快速测试,发布者在其中发布字符串元素。
我们的TransformProcessor
将把字符串解析为整数– 这意味着需要在这里进行转换:
@Test
public void whenSubscribeAndTransformElements_thenShouldConsumeAll()
throws InterruptedException {
// given
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
TransformProcessor<String, Integer> transformProcessor
= new TransformProcessor<>(Integer::parseInt);
EndSubscriber<Integer> subscriber = new EndSubscriber<>();
List<String> items = List.of("1", "2", "3");
List<Integer> expectedResult = List.of(1, 2, 3);
// when
publisher.subscribe(transformProcessor);
transformProcessor.subscribe(subscriber);
items.forEach(publisher::submit);
publisher.close();
// then
await().atMost(1000, TimeUnit.MILLISECONDS)
.until(() ->
assertThat(subscriber.consumedElements)
.containsExactlyElementsOf(expectedResult)
);
}
请注意,调用基本发布服务器上的close()
方法将导致调用TransformProcessor
上的onComplete()
方法。
请记住,处理链中的所有发布者都需要以这种方式关闭。
使用订阅控制消息需求
假设我们只想使用订阅中的第一个元素,应用一些逻辑并完成处理。我们可以使用request()
方法来实现这一点。
让我们修改EndSubscriber
,使其只使用N条消息。我们将传递该数字作为howmuchmessagescompe
构造函数参数:
public class EndSubscriber<T> implements Subscriber<T> {
private AtomicInteger howMuchMessagesConsume;
private Subscription subscription;
public List<T> consumedElements = new LinkedList<>();
public EndSubscriber(Integer howMuchMessagesConsume) {
this.howMuchMessagesConsume
= new AtomicInteger(howMuchMessagesConsume);
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item) {
howMuchMessagesConsume.decrementAndGet();
System.out.println("Got : " + item);
consumedElements.add(item);
if (howMuchMessagesConsume.get() > 0) {
subscription.request(1);
}
}
//...
}
我们可以要求元素,只要我们想。
让我们编写一个测试,其中我们只希望使用给定订阅中的一个元素:
@Test
public void whenRequestForOnlyOneElement_thenShouldConsumeOne()
throws InterruptedException {
// given
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
EndSubscriber<String> subscriber = new EndSubscriber<>(1);
publisher.subscribe(subscriber);
List<String> items = List.of("1", "x", "2", "x", "3", "x");
List<String> expected = List.of("1");
// when
assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
items.forEach(publisher::submit);
publisher.close();
// then
await().atMost(1000, TimeUnit.MILLISECONDS)
.until(() ->
assertThat(subscriber.consumedElements)
.containsExactlyElementsOf(expected)
);
}
尽管发布者发布了六个元素,但是我们的EndSubscriber
将只消耗一个元素,因为它只发出处理单个元素的信号。
通过对订阅使用request()
方法,我们可以实现更复杂的背压机制来控制消息消耗的速度。
小结
在本文中,我们了解了Java9反应流。
我们看到了如何创建由发布者和订阅者组成的处理流。我们创建了一个更复杂的处理流程,使用处理器转换元素。
最后,我们使用订阅来控制订阅服务器对元素的需求。
所有这些示例和代码片段的实现都可以在GitHub项目中找到:https://github.com/eugenp/tutorials/tree/master/core-java-modules/core-java-9-new-features
除特别注明外,本站所有文章均为老K的Java博客原创,转载请注明出处来自https://javakk.com/1834.html
暂无评论