3年前 (2021-05-19)  Java系列 |   抢沙发  345 
文章评分 0 次,平均分 0.0

 

RxJava和java9 FlowAPI的区别

JavaFlowAPI是在Java9中作为反应流规范Reactive Stream的实现而引入的。

在本文中,我们将首先研究反应流Reactive Stream。然后,我们将了解它与RxJava和flowapi的关系。

什么是Reactive Stream?

Reactive Manifesto引入了Reactive Streams,以指定具有非阻塞背压的异步流处理的标准

反应流规范的范围是定义一组最小的接口来实现这些目的:

  • org.reactivestreams.Publisher是一个数据提供者,它根据订阅者的需求向订阅者发布数据
  • org.reactivestreams.Subscriber是数据的使用者,它可以在订阅发布服务器后接收数据
  • org.reactivestreams.Subscription是在发布服务器接受订阅服务器时创建的
  • org.reactivestreams.Processor既是订阅者又是发布者—它订阅发布者,处理数据,然后将处理后的数据传递给订阅者

FlowAPI源于规范。RxJava先于它,但是从2.0开始,RxJava也支持这个规范。

我们将深入讨论这两个问题,但首先,让我们看一个实际的用例。

反应流用例

在本文中,我们将使用实时流视频服务作为我们的用例。

与点播视频流相反,实时流视频不依赖于消费者。因此,服务器以自己的速度发布流,用户有责任进行调整。

在最简单的形式中,我们的模型由一个视频流发布者和一个作为订户的视频播放器组成。

让我们实现视频帧作为我们的数据项:

public class VideoFrame {
    private long number;
    // additional data fields

    // constructor, getters, setters
}

然后让我们逐一介绍我们的Flow API和RxJava实现。

用Flow API实现

jdk9中的Flow API对应于反应流规范。使用FlowAPI,如果应用程序最初请求N个项目,那么发布者最多将N个项目推送到订阅服务器。

Flow API接口都在java.util.concurrent.Flow接口中。它们在语义上等价于各自对应的反应流。

让我们实现VideoStreamServer作为视频帧的发布者。

public class VideoStreamServer extends SubmissionPublisher<VideoFrame> {
  
    public VideoStreamServer() {
        super(Executors.newSingleThreadExecutor(), 5);
    }
}

我们从SubmissionPublisher扩展了VideoStreamServer,而不是直接实现Flow::PublisherSubmissionPublisherFlow::Publisher的JDK实现,用于与订阅者进行异步通信,因此它允许我们的VideoStreamServer以自己的速度发送。

此外,它还有助于背压和缓冲区处理,因为在调用SubmissionPublisher::subscribe时,它会创建一个BufferedSubscription实例,然后将新订阅添加到其订阅链中。BufferedSubscription可以将发布的项目缓冲到SubmissionPublisher#maxBufferCapacity

现在让我们定义VideoPlayer,它消耗一个视频帧流。因此它必须实现Flow::Subscriber

public class VideoPlayer implements Flow.Subscriber<VideoFrame> {
   
    Flow.Subscription subscription = null;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(VideoFrame item) {
        log.info("play #{}" , item.getNumber());
        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        log.error("There is an error in video streaming:{}" , throwable.getMessage());

    }

    @Override
    public void onComplete() {
        log.error("Video has ended");
    }
}

VideoPlayer订阅VideoStreamServer,成功订阅后调用VideoPlayer::onSubscribe方法,请求一帧。VideoPlayer::onNext接收帧并请求新帧。请求的帧数取决于用例和订户实现。

最后,让我们总结一下:

VideoStreamServer streamServer = new VideoStreamServer();
streamServer.subscribe(new VideoPlayer());

// submit video frames

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
AtomicLong frameNumber = new AtomicLong();
executor.scheduleWithFixedDelay(() -> {
    streamServer.offer(new VideoFrame(frameNumber.getAndIncrement()), (subscriber, videoFrame) -> {
        subscriber.onError(new RuntimeException("Frame#" + videoFrame.getNumber()
        + " droped because of backpressure"));
        return true;
    });
}, 0, 1, TimeUnit.MILLISECONDS);

sleep(1000);

用RxJava实现

RxJava是ReactiveX的Java实现。ReactiveX(或reactiveextensions)项目旨在提供一个反应式编程概念。它是观察者模式、迭代器模式和函数式编程的组合。

RxJava的最新主要版本是3.x。RxJava从2.x版开始就支持反应流,它有一个可流动的基类,但是它比有几个基类(如可流动的、可观察的、单一的、可完成的)的反应流更重要。

作为反应流顺应性组件的可流动性是0到N个项目的流,带有背压处理。Flowable从反应流扩展Publisher。因此,许多RxJava运营商直接接受Publisher,并允许与其他反应流实现直接互操作。

现在,让我们制作一个视频流生成器,它是一个无限的惰性流:

Stream<VideoFrame> videoStream = Stream.iterate(new VideoFrame(0), videoFrame -> {
    // sleep for 1ms;
    return new VideoFrame(videoFrame.getNumber() + 1);
});

然后我们定义一个可流动实例,在单独的线程上生成帧:

Flowable
  .fromStream(videoStream)
  .subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))

需要注意的是,无限流对我们来说已经足够了,但是如果我们需要更灵活的方法来生成流,那么Flowable.create是一个不错的选择。

Flowable
  .create(new FlowableOnSubscribe<VideoFrame>() {
      AtomicLong frame = new AtomicLong();
      @Override
      public void subscribe(@NonNull FlowableEmitter<VideoFrame> emitter) {
          while (true) {
              emitter.onNext(new VideoFrame(frame.incrementAndGet()));
              //sleep for 1 ms to simualte delay
          }
      }
  }, /* Set Backpressure Strategy Here */)

然后,在下一步,VideoPlayer订阅这个Flowable并在一个单独的线程上观察项目。

videoFlowable
  .observeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
  .subscribe(item -> {
      log.info("play #" + item.getNumber());
      // sleep for 30 ms to simualate frame display
  });

最后,我们将配置背压策略。如果我们想在帧丢失的情况下停止视频,因此我们必须在缓冲区满时使用BackpressureOverflowStrategy::ERROR

Flowable
  .fromStream(videoStream)
  .subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
  .onBackpressureBuffer(5, null, BackpressureOverflowStrategy.ERROR)
  .observeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
  .subscribe(item -> {
       log.info("play #" + item.getNumber()); 
       // sleep for 30 ms to simualate frame display 
  });

RxJava与Flow API的比较

即使在这两个简单的实现中,我们也可以看到RxJava的API是多么丰富,特别是对于缓冲区管理、错误处理和反压力策略。它的fluent API为我们提供了更多的选项和更少的代码行。现在让我们考虑更复杂的情况。

假设我们的播放器不能显示没有编解码器的视频帧。因此,使用Flow API,我们需要实现一个处理器来模拟编解码器,并将其置于服务器和播放器之间。使用RxJava,我们可以使用Flowable::flatMapFlowable::map来完成。

或者让我们设想一下,我们的播放机也将播放现场翻译音频,因此我们必须将来自不同出版商的视频流和音频流结合起来。对于RxJava,我们可以使用Flowable::combinelatetest,但是对于Flow API,这并不是一个容易的任务。

尽管如此,我们还是可以编写一个定制的处理器来订阅这两个流,并将合并后的数据发送给我们的视频播放器。然而,实施是一个令人头痛的问题。

为什么选择Flow API?

在这一点上,我们可能会有一个问题,Flow API背后的哲学是什么?

如果我们在JDK中搜索Flow API用法,我们可以在java.net.httpjdk.internal.net.http中找到一些东西。

此外,我们可以在reactor项目或reactive stream包中找到适配器。例如,org.reactivestreams.FlowAdapters具有将Flow API接口转换为反应流接口的方法,反之亦然。因此,它有助于Flow API和具有反应流支持的库之间的互操作性。

所有这些事实都有助于我们理解Flow API的用途:它是在JDK中创建的一组反应式规范接口,不依赖第三方。此外,Java希望Flow API被接受为反应式规范的标准接口,并在JDK或其他实现中间件和实用程序的反应式规范的基于Java的库中使用。

完整代码地址:https://github.com/eugenp/tutorials/tree/master/core-java-modules/core-java-9-new-features

 

除特别注明外,本站所有文章均为老K的Java博客原创,转载请注明出处来自https://javakk.com/1931.html

关于

发表评论

表情 格式

暂无评论

登录

忘记密码 ?

切换登录

注册