Java9反应流允许我们实现非阻塞异步流处理。这是将反应式编程模型应用于核心java编程的一个重要步骤。
RxJava和Akka流是反应流的流行实现。现在Java9通过java.util.concurrent.Flow
API。
Java 9 Reactive Streams
反应流是关于流的异步处理,所以应该有一个发布者和一个订阅者。发布者发布数据流,订阅者使用数据。
有时我们必须在发布者和订阅者之间转换数据。处理器是位于最终发布者和订阅者之间的实体,用于转换从发布者接收的数据,以便订阅者能够理解它。我们可以有一个处理器链。
从上图可以很清楚地看出,处理器既是订阅者又是发布者。
Java 9流API
Java9 Flow API实现了反应流规范。FlowAPI是迭代器和观察者模式的组合。迭代器工作在拉模型上,应用程序从源中拉项目,而观察者工作在推模型上,当项目从源推送到应用程序时,它会做出反应。
Java9FlowAPI订阅服务器在订阅发布服务器时可以请求N个项目。然后将项目从发布服务器推送到订阅服务器,直到没有更多的项目可供推,或者出现错误为止。
Java9 Flow API类和接口
让我们快速了解一下流API类和接口。
1. java.util.concurrent.Flow
:这是流API的主要类。这个类封装了流API的所有重要接口。这是最后一节课,我们不能延长。
2. java.util.concurrent.Flow.Publisher
:这是一个功能接口,每个发布者都必须实现其subscribe
方法,才能添加给定的订阅者以接收消息。
3. java.util.concurrent.Flow.Subscriber
:每个订户都必须实现此接口。订阅服务器中的方法是按严格的顺序调用的。此接口中有四种方法:
onSubscribe
:这是在订阅服务器订阅发布服务器接收消息时调用的第一个方法。通常我们调用订阅.请求开始从处理器接收项目。onNext
:当从publisher接收到一个项目时,就会调用这个方法,在这里我们实现业务逻辑来处理流,然后从publisher请求更多的数据。onError
:当发生不可恢复的错误时调用此方法,我们可以在此方法中执行清理tak,例如关闭数据库连接。onComplete
:这类似于finally方法,当publisher没有生成其他项并且publisher关闭时调用它。我们可以用它来发送流处理成功的通知。
4. java.util.concurrent.Flow.Subscription
:用于在发布服务器和订阅服务器之间创建异步非阻塞链接。订阅者调用其请求方法从发布者请求项目。它还有取消订阅的方法,即关闭发布者和订阅者之间的链接。
5. java.util.concurrent.Flow.Processor
:此接口扩展发布服务器和订阅服务器,用于在发布服务器和订阅服务器之间转换消息。
6. java.util.concurrent.SubmissionPublisher
:发布服务器实现,它异步向当前订阅服务器发布已提交的项目,直到其关闭。它使用Executor
框架,我们将在反应流示例中使用该类来添加订户,然后将项目提交给订户。
Java9反应流示例
让我们从一个简单的示例开始,在这个示例中,我们将实现Flow API订户接口,并使用SubmissionPublisher
创建publisher
和发送消息。
流数据
假设我们有一个Employee
类,它将用于创建从发布者发送到订阅者的流消息。
package com.journaldev.reactive.beans;
public class Employee {
private int id;
private String name;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Employee(int i, String s) {
this.id = i;
this.name = s;
}
public Employee() {
}
@Override
public String toString() {
return "[id="+id+",name="+name+"]";
}
}
我们还拥有一个实用程序类,用于为我们的示例创建员工列表。
package com.journaldev.reactive_streams;
import java.util.ArrayList;
import java.util.List;
import com.journaldev.reactive.beans.Employee;
public class EmpHelper {
public static List<Employee> getEmps() {
Employee e1 = new Employee(1, "Pankaj");
Employee e2 = new Employee(2, "David");
Employee e3 = new Employee(3, "Lisa");
Employee e4 = new Employee(4, "Ram");
Employee e5 = new Employee(5, "Anupam");
List<Employee> emps = new ArrayList<>();
emps.add(e1);
emps.add(e2);
emps.add(e3);
emps.add(e4);
emps.add(e5);
return emps;
}
}
订阅者
package com.journaldev.reactive_streams;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import com.journaldev.reactive.beans.Employee;
public class MySubscriber implements Subscriber<Employee> {
private Subscription subscription;
private int counter = 0;
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("Subscribed");
this.subscription = subscription;
this.subscription.request(1); //requesting data from publisher
System.out.println("onSubscribe requested 1 item");
}
@Override
public void onNext(Employee item) {
System.out.println("Processing Employee "+item);
counter++;
this.subscription.request(1);
}
@Override
public void onError(Throwable e) {
System.out.println("Some error happened");
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("All Processing Done");
}
public int getCounter() {
return counter;
}
}
Subscription
变量以保留引用,以便可以在onNext
方法中发出请求。counter
变量来保持已处理项目数的计数,注意它的值在onNext
方法中是增加的。这将在我们的main
方法中使用,以便在结束主线程之前等待执行完成。- 在
onSubscribe
方法中调用订阅请求以开始处理。还请注意,在处理项目之后,onNext
方法会再次调用它,要求发布者处理下一个项目。 onError
和onComplete
在这里没有太多内容,但在实际场景中,它们应该用于在发生错误时执行纠正措施,或者在处理成功完成时执行资源清理。
反应流试验程序
我们将使用SubmissionPublisher
作为我们的示例的发布者,所以让我们看看用于我们的反应流实现的测试程序。
package com.journaldev.reactive_streams;
import java.util.List;
import java.util.concurrent.SubmissionPublisher;
import com.journaldev.reactive.beans.Employee;
public class MyReactiveApp {
public static void main(String args[]) throws InterruptedException {
// Create Publisher
SubmissionPublisher<Employee> publisher = new SubmissionPublisher<>();
// Register Subscriber
MySubscriber subs = new MySubscriber();
publisher.subscribe(subs);
List<Employee> emps = EmpHelper.getEmps();
// Publish items
System.out.println("Publishing Items to Subscriber");
emps.stream().forEach(i -> publisher.submit(i));
// logic to wait till processing of all messages are over
while (emps.size() != subs.getCounter()) {
Thread.sleep(10);
}
// close the Publisher
publisher.close();
System.out.println("Exiting the app");
}
}
上面代码中最重要的部分是publisher
的subscribe
和submit
方法调用。我们应该始终关闭发布服务器以避免任何内存泄漏。
当执行上述程序时,我们将得到以下输出。
Subscribed
Publishing Items to Subscriber
onSubscribe requested 1 item
Processing Employee [id=1,name=Pankaj]
Processing Employee [id=2,name=David]
Processing Employee [id=3,name=Lisa]
Processing Employee [id=4,name=Ram]
Processing Employee [id=5,name=Anupam]
Exiting the app
All Processing Done
注意,如果main方法在处理所有项之前没有等待的逻辑,那么我们将得到不需要的结果。
消息转换示例
处理器用于在发布者和订阅者之间转换消息。假设我们有另一个订阅者需要处理不同类型的消息。假设这个新消息类型是Freelancer
。
package com.journaldev.reactive.beans;
public class Freelancer extends Employee {
private int fid;
public int getFid() {
return fid;
}
public void setFid(int fid) {
this.fid = fid;
}
public Freelancer(int id, int fid, String name) {
super(id, name);
this.fid = fid;
}
@Override
public String toString() {
return "[id="+super.getId()+",name="+super.getName()+",fid="+fid+"]";
}
}
我们有一个新的订户消费Freelancer
流数据。
package com.journaldev.reactive_streams;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import com.journaldev.reactive.beans.Freelancer;
public class MyFreelancerSubscriber implements Subscriber<Freelancer> {
private Subscription subscription;
private int counter = 0;
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("Subscribed for Freelancer");
this.subscription = subscription;
this.subscription.request(1); //requesting data from publisher
System.out.println("onSubscribe requested 1 item for Freelancer");
}
@Override
public void onNext(Freelancer item) {
System.out.println("Processing Freelancer "+item);
counter++;
this.subscription.request(1);
}
@Override
public void onError(Throwable e) {
System.out.println("Some error happened in MyFreelancerSubscriber");
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("All Processing Done for MyFreelancerSubscriber");
}
public int getCounter() {
return counter;
}
}
Processor
其中最重要的部分是Processor
接口的实现。因为我们想使用SubmissionPublisher
,所以我们会扩展它并在任何适用的地方使用它。
package com.journaldev.reactive_streams;
import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Function;
import com.journaldev.reactive.beans.Employee;
import com.journaldev.reactive.beans.Freelancer;
public class MyProcessor extends SubmissionPublisher<Freelancer> implements Processor<Employee, Freelancer> {
private Subscription subscription;
private Function<Employee,Freelancer> function;
public MyProcessor(Function<Employee,Freelancer> function) {
super();
this.function = function;
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(Employee emp) {
submit((Freelancer) function.apply(emp));
subscription.request(1);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done");
}
}
函数将用于将Employee
对象转换为Freelancer
对象。
我们将在onNext方法中将传入的Employee
消息转换为Freelancer
消息,然后使用SubmissionPublisher
submit
方法将其发送给订阅者。
由于处理器同时作为订阅者和发布者工作,我们可以在终端发布者和订阅者之间创建一个处理器链。
消息转换测试
package com.journaldev.reactive_streams;
import java.util.List;
import java.util.concurrent.SubmissionPublisher;
import com.journaldev.reactive.beans.Employee;
import com.journaldev.reactive.beans.Freelancer;
public class MyReactiveAppWithProcessor {
public static void main(String[] args) throws InterruptedException {
// Create End Publisher
SubmissionPublisher<Employee> publisher = new SubmissionPublisher<>();
// Create Processor
MyProcessor transformProcessor = new MyProcessor(s -> {
return new Freelancer(s.getId(), s.getId() + 100, s.getName());
});
//Create End Subscriber
MyFreelancerSubscriber subs = new MyFreelancerSubscriber();
//Create chain of publisher, processor and subscriber
publisher.subscribe(transformProcessor); // publisher to processor
transformProcessor.subscribe(subs); // processor to subscriber
List<Employee> emps = EmpHelper.getEmps();
// Publish items
System.out.println("Publishing Items to Subscriber");
emps.stream().forEach(i -> publisher.submit(i));
// Logic to wait for messages processing to finish
while (emps.size() != subs.getCounter()) {
Thread.sleep(10);
}
// Closing publishers
publisher.close();
transformProcessor.close();
System.out.println("Exiting the app");
}
}
阅读程序中的注释以正确理解它,最重要的变化是创建生产者-处理器-订户链。当执行上述程序时,我们将得到以下输出。
Subscribed for Freelancer
Publishing Items to Subscriber
onSubscribe requested 1 item for Freelancer
Processing Freelancer [id=1,name=Pankaj,fid=101]
Processing Freelancer [id=2,name=David,fid=102]
Processing Freelancer [id=3,name=Lisa,fid=103]
Processing Freelancer [id=4,name=Ram,fid=104]
Processing Freelancer [id=5,name=Anupam,fid=105]
Exiting the app
All Processing Done for MyFreelancerSubscriber
Done
取消订阅
我们可以使用订阅取消方法来停止在订阅服务器中接收消息。请注意,如果取消订阅,则订阅服务器将不会收到onComplete
或onError
信号。
下面是一个示例代码,其中订阅者仅使用3条消息,然后取消订阅。
@Override
public void onNext(Employee item) {
System.out.println("Processing Employee "+item);
counter++;
if(counter==3) {
this.subscription.cancel();
return;
}
this.subscription.request(1);
}
注意,在这种情况下,在处理所有消息之前停止主线程的逻辑将进入无限循环。我们可以为这个场景添加一些额外的逻辑,如果订阅服务器已经停止处理或取消订阅,则可以查找一些全局变量。
Back Pressure 背压
当publisher
以比订阅者快得多的速度生成消息时,就会产生反压力。FlowAPI不提供任何机制来发出背压信号或处理背压。但是,我们可以设计自己的策略来处理它,例如微调用户或降低消息产生率。您可以阅读RxJava如何处理背压。
小结
Java9FlowAPI是朝着反应式编程和创建异步无阻塞应用程序的方向发展的一个很好的方向。但是,只有当所有系统API都支持时,才有可能创建真正的反应式应用程序。
文中完整代码github地址:https://github.com/journaldev/journaldev/tree/master/Java-9/Java9-Reactive-Streams
除特别注明外,本站所有文章均为老K的Java博客原创,转载请注明出处来自https://javakk.com/1742.html
暂无评论