3年前 (2021-07-06)  Java系列 |   抢沙发  1129 
文章评分 0 次,平均分 0.0

在本文中,将通过一个示例了解Java9中的FlowAPI如何帮助我们使用新的发布者和订阅者接口构建反应模式。通过本文你可以了解这种新的编程风格及其优缺点。所有代码都在GitHub上:https://github.com/mechero/java-9-flow-reactive,因此您也可以自己运行或尝试一些修改。

Java9的Flow API简介

Java9在这个古老但非常流行的编程语言中引入了一些有趣的新特性。本指南主要介绍新的Flow API,它使我们能够采用只使用JDK的反应式编程,而不需要额外的库,如RxJava或projectreactor等。

然而,在看过API之后,您很快就会意识到它基本上就是它所承诺的:API。它由几个接口和一个实现组成:

  • 接口 Flow.Publisher定义了产生项目和控制信号的方法。
  • 接口 Flow.Subscriber定义了接收这些消息和信号的方法。
  • 接口 Subscription定义了链接发布服务器和订阅服务器的方法。
  • 接口 Flow.Processor定义了一些高级操作的方法,比如将项目从发布者链接到订阅者。
  • 最后,SubmissionPublisher类实现了Flow.Publisher,它是一个灵活的项目生产者,符合reactivestreams计划。

尽管可以使用的类不多,但包括Java9中的这个API是一个重大变化:供应商和第三方可以依赖这些接口为其库提供响应式支持,例如从JDBC驱动程序到RabbitMQ响应式实现。

从Pull到Push再到Pull-Push

反应式编程,如果我试着把它缩小到一个段落,是一种编程方式,在这种方式中,使用者控制着数据流,特别重视这样一个事实,即可能有一些较慢的使用者需要发布者放慢速度,以便能够读取数据流中的所有项目(背压概念)。这不是一种破坏性的技术;您本可以已经使用这种模式,但由于其在主要框架和库发行版(如Java9或Spring5)中的集成,以及需要相互通信的大量数据的分布式系统的兴起,这种模式现在变得流行起来。

回顾过去也有助于我们理解它的兴起。几年前,从消费者那里获取数据的最流行的技术是基于拉的机制。客户机定期轮询数据,如果可用,则读取数据。优点是在资源较少的情况下可以控制数据流(停止轮询);主要的缺点是在没有什么可消耗的情况下通过轮询数据来浪费处理时间和/或网络资源。

随着时间的推移,这一趋势发生了变化,从生产商那里推送数据并让消费者来处理变得流行起来。问题是,消费者可能比生产者拥有更有限的资源,在消费者速度慢的情况下,最终会得到完整的输入缓冲区,从而导致数据丢失。这可能是好的,如果它发生在我们的订户比例很低,但如果它发生在大多数人呢?我们最好放慢发行速度…

反应式编程附带的混合拉-推方法试图实现这两个方面的最佳效果:它让使用者负责请求数据和控制来自发布者的流,发布者还可以在缺少资源的情况下决定是阻塞数据还是丢弃数据。下面我们将看到一个很好的实际例子。

Flow 和 a Stream的区别

反应式编程并不是取代函数式编程的新炒作。两者兼容,完美配合。java8中引入的Streams API非常适合处理数据流(mapreduce和所有的变体),而flowapi则适用于通信端(请求、减速、删除、阻塞等)。您可以使用流作为Publisher的数据源,根据需要阻止它们或删除项目。您还可以在订阅服务器端使用它们,例如,在收到某些项目后执行聚合。更不用说那些反应流不适合的编程逻辑了,但是它可以用函数式编写,可读性和维护性是命令式编程的十倍。

但有一部分往往让我们困惑:如果您需要在两个系统之间应用转换来交换数据(而不是在项目发布之前),那么流和流如何协同工作?在这种情况下,我们可以使用Java8函数将源映射到目标(转换它),但不能在发布者和订阅者之间使用流,对吗?作为天才,我们可能会想到在两者之间创建一个订阅者,它从原始发布者那里获取项目,转换它,然后像发布者一样工作,成为原始订阅者订阅的对象。好消息:这就是java9Flow.Processor的模式,所以我们只需要实现这个接口并在那里编写函数来转换数据。

就我个人而言,我不喜欢完全被动、过度被动或是被动的福音传道者(我不能确定具体的术语)。尽量不要对它发疯,避免用流替换流,因为这样做没有意义。从技术上讲,可以在一个类方法中编写一些代码行,这些代码行接受一个int数组,创建一个发布服务器,然后在它旁边创建一个处理器(或转换器),将每个int映射到一个字符串,最后创建一个订阅服务器,将它们收集到一个长字符串消息中,例如。但是,你为什么要这么做?您不需要控制系统的两个部分或两个线程之间的流,那么,为什么要使代码更复杂呢?在这种情况下,最好使用流。

杂志出版商

Java9 Flow-Reactive Stream编程

本文中包含的示例代码模拟了一个杂志出版商的用例。出版商只有两个订户(不过,这是个好的开始)。

出版商将为每位订户制作20本系列杂志。他们知道他们的读者在投递的时候通常不在家,他们想避免邮递员把杂志退回或扔掉。这可能是因为发布者知道订阅者的邮箱通常很小,无法放置更多邮件(订阅者的缓冲区)。

取而代之的是,他们实施了一个非常创新的投递系统:订阅者在家时给他们打电话,他们几乎立即投递一本杂志(订阅者的下一本杂志)。出版商计划在办公室为每位订阅者保留一个小盒子,以防他们中的一些人一出版杂志就不打电话来拿。经理认为在出版商的办公室为每个订阅者保留最多8本杂志的空间就足够了(注意现在出版商的缓冲区是怎样的)。

然后,其中一名工人到经理办公室警告他们不同的情况:

1. 如果订阅者要求投递的速度足够快,就像出版商印刷新杂志一样快,就不会有空间问题。

2. 如果订阅者打电话的速度和杂志印刷的速度不一样,箱子可能就满了。工人问经理他们应该如何应对这种情况:

3. 将框的大小增加到每个订阅者20个,这将解决问题(即,在发布服务器端提供更多资源)。

4. 停止打印,直到情况得到解决(订阅者至少请求一个),然后放慢速度,这对某些订阅者造成了损害,因为这些订阅者的速度可能足够快,可以让他们的盒子保持空的状态。

5. 把任何不适合订户的杂志在制作后立即扔进回收站。

6. 中间解决方案:如果任何一个盒子都满了,在打印下一个数字之前等待最长时间。如果在这段时间之后还没有空间,那么他们将回收(丢弃)新号码。

经理解释说,他们负担不起第一种解决方案,花费如此多的资源来应付缓慢的用户将是一种浪费,并决定选择礼貌等待(d),这可能会损害一些用户,但只是为了缩短时间。营销团队决定称这种方法为反应式杂志出版,因为它“适应读者”。提出上述分析的工人成为当月最佳雇员。

请注意,我选择了这个解决方案(作为一个贪婪的杂志出版商),因为如果我们使用一个几乎无限的缓冲区,就很难理解反应式方法的所有概念,代码看起来简单明了,很难与其他解决方案进行比较。让我们看看密码。

Java 9 Flow 示例代码

代码地址:https://github.com/mechero/java-9-flow-reactive

完全控制流的简单订户

让我们从订户开始。MagazineSubscriber类实现Flow.Subscriber<Integer>(他们只会收到一个数字,但是让我们继续想象这是一本关于您喜欢的主题的非常好的杂志)。

package com.thepracticaldeveloper;

import java.util.concurrent.Flow;
import java.util.stream.IntStream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MagazineSubscriber implements Flow.Subscriber<Integer> {

  public static final String JACK = "Jack";
  public static final String PETE = "Pete";

  private static final Logger log = LoggerFactory.
    getLogger(MagazineSubscriber.class);

  private final long sleepTime;
  private final String subscriberName;
  private Flow.Subscription subscription;
  private int nextMagazineExpected;
  private int totalRead;

  MagazineSubscriber(final long sleepTime, final String subscriberName) {
    this.sleepTime = sleepTime;
    this.subscriberName = subscriberName;
    this.nextMagazineExpected = 1;
    this.totalRead = 0;
  }

  @Override
  public void onSubscribe(final Flow.Subscription subscription) {
    this.subscription = subscription;
    subscription.request(1);
  }

  @Override
  public void onNext(final Integer magazineNumber) {
    if (magazineNumber != nextMagazineExpected) {
      IntStream.range(nextMagazineExpected, magazineNumber).forEach(
        (msgNumber) ->
          log("Oh no! I missed the magazine " + msgNumber)
      );
      // Catch up with the number to keep tracking missing ones
      nextMagazineExpected = magazineNumber;
    }
    log("Great! I got a new magazine: " + magazineNumber);
    takeSomeRest();
    nextMagazineExpected++;
    totalRead++;

    log("I'll get another magazine now, next one should be: " +
      nextMagazineExpected);
    subscription.request(1);
  }

  @Override
  public void onError(final Throwable throwable) {
    log("Oops I got an error from the Publisher: " + throwable.getMessage());
  }

  @Override
  public void onComplete() {
    log("Finally! I completed the subscription, I got in total " +
      totalRead + " magazines.");
  }

  private void log(final String logMessage) {
    log.info("<=========== [" + subscriberName + "] : " + logMessage);
  }

  public String getSubscriberName() {
    return subscriberName;
  }

  private void takeSomeRest() {
    try {
      Thread.sleep(sleepTime);
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }
}

此类实现所需的接口方法:

  • onSubscribe(subcription)发布服务器将在获取新订阅服务器时调用此方法。通常您希望保存订阅,因为它稍后将用于向发布者发送信号:请求更多项目,或取消订阅。像我们在这里所做的那样,立即使用它来请求第一项也是很常见的。
  • onNext(magazineNumber)每当接收到新项时,就会调用此方法。在我们的例子中,我们还将遵循一个典型的场景,除了处理该项之外,我们还将请求一个新的。然而,在这两者之间,我们包含了一个睡眠时间,它在创建订户时是可配置的。这样,我们可以尝试不同的场景,看看当订户行为不正常时会发生什么。额外的逻辑仅仅是记录丢失的杂志以防丢失:我们提前知道序列,以便订阅者可以检测到何时发生这种情况。
  • onError(throwable)发布者调用它是为了告诉订阅者出了问题。在我们的实现中,我们只记录消息,因为发布者删除一个项目时会发生这种情况。
  • onComplete()。这一点也不奇怪:当发布者没有更多的项目要发送时,就会调用这个函数,这样订阅就完成了。

使用Java9 SubmissionPublisher

要构建发布服务器,我们将使用Java9的SubmissionPublisher类。正如Javadoc中所述,它为发布者实现了reactivestreams主动性的原则,发布者可能会在订户速度慢时阻塞,或者也可能会在需要时丢弃项目。让我们先看看代码,然后深入了解细节:

package com.thepracticaldeveloper;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReactiveFlowApp {

  private static final int NUMBER_OF_MAGAZINES = 20;
  private static final long MAX_SECONDS_TO_KEEP_IT_WHEN_NO_SPACE = 2;
  private static final Logger log =
    LoggerFactory.getLogger(ReactiveFlowApp.class);

  public static void main(String[] args) throws Exception {
    final ReactiveFlowApp app = new ReactiveFlowApp();

    log.info("\n\n### CASE 1: Subscribers are fast, buffer size is not so " +
      "important in this case.");
    app.magazineDeliveryExample(100L, 100L, 8);

    log.info("\n\n### CASE 2: A slow subscriber, but a good enough buffer " +
      "size on the publisher's side to keep all items until they're picked up");
    app.magazineDeliveryExample(1000L, 3000L, NUMBER_OF_MAGAZINES);

    log.info("\n\n### CASE 3: A slow subscriber, and a very limited buffer " +
      "size on the publisher's side so it's important to keep the slow " +
      "subscriber under control");
    app.magazineDeliveryExample(1000L, 3000L, 8);

  }

  void magazineDeliveryExample(final long sleepTimeJack,
                               final long sleepTimePete,
                               final int maxStorageInPO) throws Exception {
    final SubmissionPublisher<Integer> publisher =
      new SubmissionPublisher<>(ForkJoinPool.commonPool(), maxStorageInPO);

    final MagazineSubscriber jack = new MagazineSubscriber(
      sleepTimeJack,
      MagazineSubscriber.JACK
    );
    final MagazineSubscriber pete = new MagazineSubscriber(
      sleepTimePete,
      MagazineSubscriber.PETE
    );

    publisher.subscribe(jack);
    publisher.subscribe(pete);

    log.info("Printing 20 magazines per subscriber, with room in publisher for "
      + maxStorageInPO + ". They have " + MAX_SECONDS_TO_KEEP_IT_WHEN_NO_SPACE +
      " seconds to consume each magazine.");
    IntStream.rangeClosed(1, 20).forEach((number) -> {
      log.info("Offering magazine " + number + " to consumers");
      final int lag = publisher.offer(
        number,
        MAX_SECONDS_TO_KEEP_IT_WHEN_NO_SPACE,
        TimeUnit.SECONDS,
        (subscriber, msg) -> {
          subscriber.onError(
            new RuntimeException("Hey " + ((MagazineSubscriber) subscriber)
              .getSubscriberName() + "! You are too slow getting magazines" +
              " and we don't have more space for them! " +
              "I'll drop your magazine: " + msg));
          return false; // don't retry, we don't believe in second opportunities
        });
      if (lag < 0) {
        log("Dropping " + -lag + " magazines");
      } else {
        log("The slowest consumer has " + lag +
          " magazines in total to be picked up");
      }
    });

    // Blocks until all subscribers are done (this part could be improved
    // with latches, but this way we keep it simple)
    while (publisher.estimateMaximumLag() > 0) {
      Thread.sleep(500L);
    }

    // Closes the publisher, calling the onComplete() method on every subscriber
    publisher.close();
    // give some time to the slowest consumer to wake up and notice
    // that it's completed
    Thread.sleep(Math.max(sleepTimeJack, sleepTimePete));
  }

  private static void log(final String message) {
    log.info("===========> " + message);
  }

}

如你所见,这也是我们的主要课程。这是为了使本指南中的项目尽可能简单。主要的逻辑是magazineDeliveryExample方法,它允许我们为两个不同的订阅者处理两个不同的睡眠时间,还可以在发布服务器端(maxStorageInPostOffice)设置缓冲区大小。

然后我们遵循以下步骤:

1. 创建一个SubmissionPublisher,其中包含一个标准线程池(每个订阅服务器拥有一个线程)和选定的缓冲区大小(如果不是,则四舍五入为2的幂)。

2. 创建两个作为参数传递的具有不同睡眠时间和不同名称的订阅服务器,以便在日志中轻松识别它们。

3. 使用20个数字流作为数据源来模拟我们的杂志打印机,我们使用以下几个参数来调用该方法:

1.1 要提供给订阅服务器的项。

1.2 等待每个订户选择该项的最长时间(参数2和3)。

1.3 一个处理程序,用于控制如果给定的订阅者没有获得该项目,会发生什么。在本例中,我们向该订户发送一个错误,通过返回false,我们表示不想重试并再次等待。

4. 当丢弃发生时,offer方法返回一个负数。否则,它将返回最慢的订阅者(lag)要收集的待处理项目的估计最大数量。我们只要把那个号码记录到控制台上。

5. 应用程序周期的最后一部分只是为了避免主线程过早终止。在实际的应用程序中,我们可以用锁存器更好地控制它,但我在这里使用了自己的用例逻辑,首先等待发布者在缓冲区中没有任何内容,然后等待最慢的订阅者接收onComplete信号,这在close()方法中隐式发生。

main()方法使用三种不同的场景调用该逻辑,这些场景模拟了上面解释的真实情况:

订户的速度非常快,因此没有与缓冲相关的问题。

1. 其中一个订户速度非常慢,所以缓冲区开始满了。但是,缓冲区足够大,可以容纳所有的项目,这样订户就不会经历丢弃。

2. 其中一个订户速度非常慢,缓冲区不够大,无法容纳所有项目。在这种情况下,处理程序被多次调用,订阅者并没有收到所有的项目。

3. 请注意,这里还有其他组合可供探索。例如,如果您将常量MAX_SECONDS_TO_WAIT_WHEN_NO_SPACE设置为非常高的数字,您可以尝试使offer方法的行为类似于submit(请参阅下面的部分)。或者你可以看到当两个用户都很慢的时候会发生什么(剧透警报:更多的下降)。

运行应用程序

如果您现在从IDE运行应用程序代码,或者通过打包并从命令行运行应用程序代码,您将看到一个彩色控制台日志,指示这三种情况下发生了什么-我没有告诉您,但是GitHub存储库中还有一个额外的类来执行着色部分(ColorConsoleAppender

您可以阅读日志,了解一切是如何按预期工作的:

  • 第一次迭代进行得很顺利:没有丢弃,而且两个订户都很快,所以它完成得也很快。
  • 由于订阅速度慢,第二次迭代需要更长的时间才能完成,但这不会损失任何杂志。
  • 第三次迭代不太顺利:有好几次超时都过期了,所以速度慢的订阅者最终会收到来自发布者的一些错误和随后的丢弃。那位读者很遗憾会失去出版商的一些好文章。

我们讨论了使用SubmissionPublisher时最灵活的情况,并为订阅者提供了一个超时和一个drop处理程序。我们还可以使用该类的一个更简单的方法:submit,它只接受一个参数:item值。然而,如果我们这样做了,出版商就不会决定做什么。如果订阅服务器的任何缓冲区已满,则发布服务器将阻塞,直到有空间为止,从而影响所有其他订阅服务器。

像在软件开发中一样,对于如何使用flowapi,尤其是SubmissionPublisher,没有任何黑白的方法。但这里有一些建议:

  • 如果您事先知道要发布的项目的估计数以及可能拥有的订户数,则可以分析是否有可能将缓冲区的大小调整为大于最大项目数。
  • 或者,您可以在SubmissionPublisher之上创建一个包装类,并实现一个规则,其中每个发布服务器只允许一个订阅服务器,因此可以避免订阅服务器之间的干扰。尽管在某些情况下,如果发布者和数据源只是一件事,或者数量有限(例如,考虑到数据库的连接池),这可能没有意义。
  • 如果你控制了订户,你可以让他们更聪明、更支持你。如果他们检测到错误或高延迟,他们可以决定取消订阅。

这些灰色区域可能会有很大的不同,这就是为什么在这些情况下,有一个灵活的解决方案,如反应式api会有所帮助。你应该列出你的需求,并检查你是否选择一个简单的解决方案,更聪明的出版商,更聪明的订阅者或两者的结合。

结论

在本文中,我们了解了反应式编程的一些基本原则,并深入到一个示例应用程序中,其中包含一个用例,我希望它能帮助您更好地掌握概念,而不是泛型和无意义的示例。混合拉/推机制的概念现在应该更清楚了,字流和订阅者控制它的重要性也应该是我希望你从这个指南中得到的重要想法。

您不应该急于将此模式应用到实际项目中,但当您知道它可能会在将来发生时,您会发现一个完全适合反应式编程的问题:这样您将获得很多好处。将函数式编程和反应式编程很好地结合在一起的代码项目看起来非常适合阅读(从而理解/维护)和使用。他们的表现也更好;您了解了混合拉/推技术如何带来减少资源消耗的好处。

原文地址:https://thepracticaldeveloper.com/reactive-programming-java-9-flow/

  
 

除特别注明外,本站所有文章均为老K的Java博客原创,转载请注明出处来自https://javakk.com/2088.html

关于

发表评论

表情 格式

暂无评论

登录

忘记密码 ?

切换登录

注册