4年前 (2021-04-19)  Java系列 |   抢沙发  605 
文章评分 0 次,平均分 0.0

Java9 Reactive Stream反应流式编程

在本文中,我们将研究Java9反应流Reactive Streams。简单地说,我们将能够使用Flow类,它包含用于构建反应流处理逻辑的主要构建块。

反应流是具有非阻塞背压的异步流处理的标准。这个规范在Reactive Manifesto中定义,有各种各样的实现,例如RxJava或Akka Streams。

反应式API概述

为了构建一个流,我们可以使用三个主要的抽象,并将它们组合成异步处理逻辑。

每个流都需要处理发布服务器实例向其发布的事件;发布服务器有一个方法–subscribe()

如果任何订阅者想要接收它发布的事件,他们需要订阅给定的发布者。

消息的接收者需要实现订阅接口。通常,这是每个流处理的结束,因为它的实例不会进一步发送消息。

我们可以把订户看作一个水槽。这有四个需要重写的方法:onSubscribe()onNext()onError()onComplete()

如果我们想转换传入的消息并将其进一步传递给下一个订户,我们需要实现处理器接口。它既充当订阅者,因为它接收消息,又充当发布者,因为它处理这些消息并将其发送以供进一步处理。

发布和使用消息

假设我们想要创建一个简单的流,在这个流中,我们有一个发布消息的发布者,一个简单的订阅者在消息到达时消费消息—一次一个。

让我们创建一个EndSubscriber类。我们需要实现订户接口。接下来,我们将重写所需的方法。

在处理开始之前调用onSubscribe()方法。订阅的实例作为参数传递。它是一个类,用于控制订阅服务器和发布服务器之间的消息流:

public class EndSubscriber<T> implements Subscriber<T> {
    private Subscription subscription;
    public List<T> consumedElements = new LinkedList<>();

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }
}

我们还初始化了将在测试中使用的consumedElements的空列表。

现在,我们需要从订户接口实现其余的方法。这里的主要方法是onNext()–每当发布者发布新消息时,都会调用此方法:

@Override
public void onNext(T item) {
    System.out.println("Got : " + item);
    subscription.request(1);
}

注意,当我们在onSubscribe()方法中启动订阅时,当我们处理一条消息时,我们需要调用订阅上的request()方法,以表示当前订阅服务器准备使用更多消息。

最后,我们需要实现onError()——在处理过程中抛出异常时调用,以及onComplete()——在发布服务器关闭时调用:

@Override
public void onError(Throwable t) {
    t.printStackTrace();
}

@Override
public void onComplete() {
    System.out.println("Done");
}

让我们为处理流编写一个测试。我们将使用SubmissionPublisher类–来自java.util.concurrent–实现发布者接口。

我们将向发布者提交N个元素—我们的最终订户将收到:

@Test
public void whenSubscribeToIt_thenShouldConsumeAll() 
  throws InterruptedException {
 
    // given
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    EndSubscriber<String> subscriber = new EndSubscriber<>();
    publisher.subscribe(subscriber);
    List<String> items = List.of("1", "x", "2", "x", "3", "x");

    // when
    assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
    items.forEach(publisher::submit);
    publisher.close();

    // then
     await().atMost(1000, TimeUnit.MILLISECONDS)
       .until(
         () -> assertThat(subscriber.consumedElements)
         .containsExactlyElementsOf(items)
     );
}

注意,我们在EndSubscriber实例上调用close()方法。它将在给定发布服务器的每个订阅服务器上调用下面的onComplete()回调。

运行该程序将产生以下输出:

Got : 1
Got : x
Got : 2
Got : x
Got : 3
Got : x
Done

信息的转换

假设我们希望在发布者和订阅者之间构建类似的逻辑,但也应用一些转换。

我们将创建TransformProcessor类来实现Processor并扩展SubmissionPublisher,因为它将同时是PublisherSubscriber

我们将传入一个将输入转换为输出的函数:

public class TransformProcessor<T, R> 
  extends SubmissionPublisher<R> 
  implements Flow.Processor<T, R> {

    private Function<T, R> function;
    private Flow.Subscription subscription;

    public TransformProcessor(Function<T, R> function) {
        super();
        this.function = function;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(T item) {
        submit(function.apply(item));
        subscription.request(1);
    }

    @Override
    public void onError(Throwable t) {
        t.printStackTrace();
    }

    @Override
    public void onComplete() {
        close();
    }
}

现在,让我们用一个处理流编写一个快速测试,发布者在其中发布字符串元素。

我们的TransformProcessor将把字符串解析为整数– 这意味着需要在这里进行转换:

@Test
public void whenSubscribeAndTransformElements_thenShouldConsumeAll()
  throws InterruptedException {
 
    // given
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    TransformProcessor<String, Integer> transformProcessor 
      = new TransformProcessor<>(Integer::parseInt);
    EndSubscriber<Integer> subscriber = new EndSubscriber<>();
    List<String> items = List.of("1", "2", "3");
    List<Integer> expectedResult = List.of(1, 2, 3);

    // when
    publisher.subscribe(transformProcessor);
    transformProcessor.subscribe(subscriber);
    items.forEach(publisher::submit);
    publisher.close();

    // then
     await().atMost(1000, TimeUnit.MILLISECONDS)
       .until(() -> 
         assertThat(subscriber.consumedElements)
         .containsExactlyElementsOf(expectedResult)
     );
}

请注意,调用基本发布服务器上的close()方法将导致调用TransformProcessor上的onComplete()方法。

请记住,处理链中的所有发布者都需要以这种方式关闭。

使用订阅控制消息需求

假设我们只想使用订阅中的第一个元素,应用一些逻辑并完成处理。我们可以使用request()方法来实现这一点。

让我们修改EndSubscriber,使其只使用N条消息。我们将传递该数字作为howmuchmessagescompe构造函数参数:

public class EndSubscriber<T> implements Subscriber<T> {
 
    private AtomicInteger howMuchMessagesConsume;
    private Subscription subscription;
    public List<T> consumedElements = new LinkedList<>();

    public EndSubscriber(Integer howMuchMessagesConsume) {
        this.howMuchMessagesConsume 
          = new AtomicInteger(howMuchMessagesConsume);
    }

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(T item) {
        howMuchMessagesConsume.decrementAndGet();
        System.out.println("Got : " + item);
        consumedElements.add(item);
        if (howMuchMessagesConsume.get() > 0) {
            subscription.request(1);
        }
    }
    //...
    
}

我们可以要求元素,只要我们想。

让我们编写一个测试,其中我们只希望使用给定订阅中的一个元素:

@Test
public void whenRequestForOnlyOneElement_thenShouldConsumeOne()
  throws InterruptedException {
 
    // given
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    EndSubscriber<String> subscriber = new EndSubscriber<>(1);
    publisher.subscribe(subscriber);
    List<String> items = List.of("1", "x", "2", "x", "3", "x");
    List<String> expected = List.of("1");

    // when
    assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
    items.forEach(publisher::submit);
    publisher.close();

    // then
    await().atMost(1000, TimeUnit.MILLISECONDS)
      .until(() -> 
        assertThat(subscriber.consumedElements)
       .containsExactlyElementsOf(expected)
    );
}

尽管发布者发布了六个元素,但是我们的EndSubscriber将只消耗一个元素,因为它只发出处理单个元素的信号。

通过对订阅使用request()方法,我们可以实现更复杂的背压机制来控制消息消耗的速度。

小结

在本文中,我们了解了Java9反应流

我们看到了如何创建由发布者和订阅者组成的处理流。我们创建了一个更复杂的处理流程,使用处理器转换元素。

最后,我们使用订阅来控制订阅服务器对元素的需求。

所有这些示例和代码片段的实现都可以在GitHub项目中找到:https://github.com/eugenp/tutorials/tree/master/core-java-modules/core-java-9-new-features

 

除特别注明外,本站所有文章均为老K的Java博客原创,转载请注明出处来自https://javakk.com/1834.html

关于

发表评论

表情 格式

暂无评论

登录

忘记密码 ?

切换登录

注册