JavaFlowAPI是在Java9中作为反应流规范Reactive Stream的实现而引入的。
在本文中,我们将首先研究反应流Reactive Stream。然后,我们将了解它与RxJava和flowapi的关系。
什么是Reactive Stream?
Reactive Manifesto引入了Reactive Streams,以指定具有非阻塞背压的异步流处理的标准。
反应流规范的范围是定义一组最小的接口来实现这些目的:
- org.reactivestreams.Publisher是一个数据提供者,它根据订阅者的需求向订阅者发布数据
- org.reactivestreams.Subscriber是数据的使用者,它可以在订阅发布服务器后接收数据
- org.reactivestreams.Subscription是在发布服务器接受订阅服务器时创建的
- org.reactivestreams.Processor既是订阅者又是发布者—它订阅发布者,处理数据,然后将处理后的数据传递给订阅者
FlowAPI源于规范。RxJava先于它,但是从2.0开始,RxJava也支持这个规范。
我们将深入讨论这两个问题,但首先,让我们看一个实际的用例。
反应流用例
在本文中,我们将使用实时流视频服务作为我们的用例。
与点播视频流相反,实时流视频不依赖于消费者。因此,服务器以自己的速度发布流,用户有责任进行调整。
在最简单的形式中,我们的模型由一个视频流发布者和一个作为订户的视频播放器组成。
让我们实现视频帧作为我们的数据项:
public class VideoFrame {
private long number;
// additional data fields
// constructor, getters, setters
}
然后让我们逐一介绍我们的Flow API和RxJava实现。
用Flow API实现
jdk9中的Flow API对应于反应流规范。使用FlowAPI,如果应用程序最初请求N个项目,那么发布者最多将N个项目推送到订阅服务器。
Flow API接口都在java.util.concurrent.Flow
接口中。它们在语义上等价于各自对应的反应流。
让我们实现VideoStreamServer
作为视频帧的发布者。
public class VideoStreamServer extends SubmissionPublisher<VideoFrame> {
public VideoStreamServer() {
super(Executors.newSingleThreadExecutor(), 5);
}
}
我们从SubmissionPublisher
扩展了VideoStreamServer
,而不是直接实现Flow::Publisher
。SubmissionPublisher
是Flow::Publisher
的JDK实现,用于与订阅者进行异步通信,因此它允许我们的VideoStreamServer
以自己的速度发送。
此外,它还有助于背压和缓冲区处理,因为在调用SubmissionPublisher::subscribe
时,它会创建一个BufferedSubscription
实例,然后将新订阅添加到其订阅链中。BufferedSubscription
可以将发布的项目缓冲到SubmissionPublisher#maxBufferCapacity
。
现在让我们定义VideoPlayer
,它消耗一个视频帧流。因此它必须实现Flow::Subscriber
。
public class VideoPlayer implements Flow.Subscriber<VideoFrame> {
Flow.Subscription subscription = null;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(VideoFrame item) {
log.info("play #{}" , item.getNumber());
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
log.error("There is an error in video streaming:{}" , throwable.getMessage());
}
@Override
public void onComplete() {
log.error("Video has ended");
}
}
VideoPlayer
订阅VideoStreamServer
,成功订阅后调用VideoPlayer::onSubscribe
方法,请求一帧。VideoPlayer::onNext
接收帧并请求新帧。请求的帧数取决于用例和订户实现。
最后,让我们总结一下:
VideoStreamServer streamServer = new VideoStreamServer();
streamServer.subscribe(new VideoPlayer());
// submit video frames
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
AtomicLong frameNumber = new AtomicLong();
executor.scheduleWithFixedDelay(() -> {
streamServer.offer(new VideoFrame(frameNumber.getAndIncrement()), (subscriber, videoFrame) -> {
subscriber.onError(new RuntimeException("Frame#" + videoFrame.getNumber()
+ " droped because of backpressure"));
return true;
});
}, 0, 1, TimeUnit.MILLISECONDS);
sleep(1000);
用RxJava实现
RxJava是ReactiveX的Java实现。ReactiveX(或reactiveextensions)项目旨在提供一个反应式编程概念。它是观察者模式、迭代器模式和函数式编程的组合。
RxJava的最新主要版本是3.x。RxJava从2.x版开始就支持反应流,它有一个可流动的基类,但是它比有几个基类(如可流动的、可观察的、单一的、可完成的)的反应流更重要。
作为反应流顺应性组件的可流动性是0到N个项目的流,带有背压处理。Flowable
从反应流扩展Publisher
。因此,许多RxJava运营商直接接受Publisher
,并允许与其他反应流实现直接互操作。
现在,让我们制作一个视频流生成器,它是一个无限的惰性流:
Stream<VideoFrame> videoStream = Stream.iterate(new VideoFrame(0), videoFrame -> {
// sleep for 1ms;
return new VideoFrame(videoFrame.getNumber() + 1);
});
然后我们定义一个可流动实例,在单独的线程上生成帧:
Flowable
.fromStream(videoStream)
.subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
需要注意的是,无限流对我们来说已经足够了,但是如果我们需要更灵活的方法来生成流,那么Flowable.create
是一个不错的选择。
Flowable
.create(new FlowableOnSubscribe<VideoFrame>() {
AtomicLong frame = new AtomicLong();
@Override
public void subscribe(@NonNull FlowableEmitter<VideoFrame> emitter) {
while (true) {
emitter.onNext(new VideoFrame(frame.incrementAndGet()));
//sleep for 1 ms to simualte delay
}
}
}, /* Set Backpressure Strategy Here */)
然后,在下一步,VideoPlayer
订阅这个Flowable
并在一个单独的线程上观察项目。
videoFlowable
.observeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
.subscribe(item -> {
log.info("play #" + item.getNumber());
// sleep for 30 ms to simualate frame display
});
最后,我们将配置背压策略。如果我们想在帧丢失的情况下停止视频,因此我们必须在缓冲区满时使用BackpressureOverflowStrategy::ERROR
。
Flowable
.fromStream(videoStream)
.subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
.onBackpressureBuffer(5, null, BackpressureOverflowStrategy.ERROR)
.observeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
.subscribe(item -> {
log.info("play #" + item.getNumber());
// sleep for 30 ms to simualate frame display
});
RxJava与Flow API的比较
即使在这两个简单的实现中,我们也可以看到RxJava的API是多么丰富,特别是对于缓冲区管理、错误处理和反压力策略。它的fluent API为我们提供了更多的选项和更少的代码行。现在让我们考虑更复杂的情况。
假设我们的播放器不能显示没有编解码器的视频帧。因此,使用Flow API,我们需要实现一个处理器来模拟编解码器,并将其置于服务器和播放器之间。使用RxJava,我们可以使用Flowable::flatMap
或Flowable::map
来完成。
或者让我们设想一下,我们的播放机也将播放现场翻译音频,因此我们必须将来自不同出版商的视频流和音频流结合起来。对于RxJava,我们可以使用Flowable::combinelatetest
,但是对于Flow API,这并不是一个容易的任务。
尽管如此,我们还是可以编写一个定制的处理器来订阅这两个流,并将合并后的数据发送给我们的视频播放器。然而,实施是一个令人头痛的问题。
为什么选择Flow API?
在这一点上,我们可能会有一个问题,Flow API背后的哲学是什么?
如果我们在JDK中搜索Flow API用法,我们可以在java.net.http
和jdk.internal.net.http
中找到一些东西。
此外,我们可以在reactor项目或reactive stream包中找到适配器。例如,org.reactivestreams.FlowAdapters
具有将Flow API接口转换为反应流接口的方法,反之亦然。因此,它有助于Flow API和具有反应流支持的库之间的互操作性。
所有这些事实都有助于我们理解Flow API的用途:它是在JDK中创建的一组反应式规范接口,不依赖第三方。此外,Java希望Flow API被接受为反应式规范的标准接口,并在JDK或其他实现中间件和实用程序的反应式规范的基于Java的库中使用。
完整代码地址:https://github.com/eugenp/tutorials/tree/master/core-java-modules/core-java-9-new-features
除特别注明外,本站所有文章均为老K的Java博客原创,转载请注明出处来自https://javakk.com/1931.html
暂无评论