什么是RSocket?
RSocket是一种用于分布式应用程序的二进制点对点通信协议。从这个意义上说,它提供了一种替代HTTP等其他协议的方法。
RSocket和其他协议之间的全面比较超出了本文的范围。相反,我们将关注RSocket的一个关键特性:它的交互模型。
RSocket提供了四种交互模型。考虑到这一点,我们将用一个例子来探讨每一个问题。
Maven依赖项
对于我们的示例,RSocket只需要两个直接依赖项:
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-core</artifactId>
<version>0.11.13</version>
</dependency>
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-transport-netty</artifactId>
<version>0.11.13</version>
</dependency>
rsocket core
和rsocket transport netty
依赖项在Maven Central上提供。
一个重要的注意事项是RSocket库经常使用反应流。本文中使用了Flux
和Mono
类,因此对它们的基本理解将很有帮助。
服务器设置
首先,让我们创建服务器类:
public class Server {
private final Disposable server;
public Server() {
this.server = RSocketFactory.receive()
.acceptor((setupPayload, reactiveSocket) -> Mono.just(new RSocketImpl()))
.transport(TcpServerTransport.create("localhost", TCP_PORT))
.start()
.subscribe();
}
public void dispose() {
this.server.dispose();
}
private class RSocketImpl extends AbstractRSocket {}
}
接下来,要启动服务器,我们只需要实例化它:
Server server = new Server();
一个服务器实例可以处理多个连接。因此,只有一个服务器实例支持我们的所有示例。
完成后,dispose
方法将停止服务器并释放TCP端口。
交互作用模型
请求/响应
RSocket提供了一个请求/响应模型——每个请求接收一个响应。
对于这个模型,我们将创建一个简单的服务,将消息返回给客户机。
让我们首先向AbstractRSocket
的扩展RSocketImpl
添加一个方法:
@Override
public Mono<Payload> requestResponse(Payload payload) {
try {
return Mono.just(payload); // reflect the payload back to the sender
} catch (Exception x) {
return Mono.error(x);
}
}
requestResponse
方法为每个请求返回一个结果,我们可以通过Mono<Payload>
响应类型看到这一点。
Payload
是包含消息内容和元数据的类。所有的交互模型都使用它。有效负载的内容是二进制的,但是有一些方便的方法支持基于字符串的内容。
接下来,我们可以创建客户端类:
public class ReqResClient {
private final RSocket socket;
public ReqResClient() {
this.socket = RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", TCP_PORT))
.start()
.block();
}
public String callBlocking(String string) {
return socket
.requestResponse(DefaultPayload.create(string))
.map(Payload::getDataUtf8)
.block();
}
public void dispose() {
this.socket.dispose();
}
}
客户端使用RSocketFactory.connect()
方法启动与服务器的套接字连接。我们使用套接字上的requestResponse
方法向服务器发送有效负载。
我们的负载包含传递到客户端的字符串。当Mono<Payload>
响应到达时,我们可以使用getDataUtf8()
方法访问响应的字符串内容。
最后,我们可以运行集成测试来查看请求/响应的实际情况。我们将向服务器发送一个字符串,并验证返回的字符串是否相同:
@Test
public void whenSendingAString_thenRevceiveTheSameString() {
ReqResClient client = new ReqResClient();
String string = "Hello RSocket";
assertEquals(string, client.callBlocking(string));
client.dispose();
}
Fire-and-Forget
使用fire-and-forget
模型,客户端将不会收到来自服务器的响应。
在本例中,客户机将以50ms的间隔向服务器发送模拟测量。服务器将发布测量结果。
让我们在RSocketImpl
类中向服务器添加一个fire-and-forget
处理程序:
@Override
public Mono<Void> fireAndForget(Payload payload) {
try {
dataPublisher.publish(payload); // forward the payload
return Mono.empty();
} catch (Exception x) {
return Mono.error(x);
}
}
这个处理程序看起来非常类似于请求/响应处理程序。但是,fireAndForget
返回Mono<Void>
而不是Mono<Payload>
。
dataPublisher
是org.reactivestreams.Publisher
的实例. 因此,它使有效负载对订户可用。我们将在请求/流示例中使用它。
接下来,我们将创建fire-and-forget
客户端:
public class FireNForgetClient {
private final RSocket socket;
private final List<Float> data;
public FireNForgetClient() {
this.socket = RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", TCP_PORT))
.start()
.block();
}
/** Send binary velocity (float) every 50ms */
public void sendData() {
data = Collections.unmodifiableList(generateData());
Flux.interval(Duration.ofMillis(50))
.take(data.size())
.map(this::createFloatPayload)
.flatMap(socket::fireAndForget)
.blockLast();
}
// ...
}
套接字设置与以前完全相同。
sendData()
方法使用流量流发送多条消息。对于每条消息,我们调用socket::fireAndForget
。
我们需要为每条消息订阅Mono<Void>
响应。如果我们忘记订阅,那么socket::fireAndForget
将不会执行。
flatMap
操作符确保将Void
响应传递给订阅者,而blockLast
操作符充当订阅者。
我们要等到下一节才开始测试。此时,我们将创建一个请求/流客户机来接收fire-and-forget
客户机推送的数据。
请求/流
在request/stream 请求/流模型中,单个请求可以接收多个响应。看到这一点的行动,我们可以建立在fire-and-forget
的例子。为此,我们请求一个流来检索上一节中发送的度量。
与前面一样,让我们首先向服务器上的RSocketImpl
添加一个新的侦听器:
@Override
public Flux<Payload> requestStream(Payload payload) {
return Flux.from(dataPublisher);
}
requestStream
处理程序返回一个Flux<Payload>
流。正如我们在上一节中所回顾的,fireAndForget
处理程序将传入的数据发布到dataPublisher
。现在,我们将使用与事件源相同的dataPublisher
创建一个Flux
流。通过这样做,测量数据将从fire-and-forget
客户端异步流到请求/流客户端。
接下来创建请求/流客户端:
public class ReqStreamClient {
private final RSocket socket;
public ReqStreamClient() {
this.socket = RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", TCP_PORT))
.start()
.block();
}
public Flux<Float> getDataStream() {
return socket
.requestStream(DefaultPayload.create(DATA_STREAM_NAME))
.map(Payload::getData)
.map(buf -> buf.getFloat())
.onErrorReturn(null);
}
public void dispose() {
this.socket.dispose();
}
}
我们与服务器的连接方式与以前的客户机相同。
在getDataStream()
中,我们使用socket.requestStream()
从服务器接收Flux<Payload>
流。从该流中,我们从二进制数据中提取浮点值。最后,流被返回给调用者,允许调用者订阅流并处理结果。
现在让我们测试一下。我们将验证从火的往返路线,并忘记请求/流。
我们可以断言每个值的接收顺序与发送顺序相同。然后,我们可以断言接收到的值与发送的值相同:
@Test
public void whenSendingStream_thenReceiveTheSameStream() {
FireNForgetClient fnfClient = new FireNForgetClient();
ReqStreamClient streamClient = new ReqStreamClient();
List<Float> data = fnfClient.getData();
List<Float> dataReceived = new ArrayList<>();
Disposable subscription = streamClient.getDataStream()
.index()
.subscribe(
tuple -> {
assertEquals("Wrong value", data.get(tuple.getT1().intValue()), tuple.getT2());
dataReceived.add(tuple.getT2());
},
err -> LOG.error(err.getMessage())
);
fnfClient.sendData();
// ... dispose client & subscription
assertEquals("Wrong data count received", data.size(), dataReceived.size());
}
Channel
Channel
模型提供双向通信。在这个模型中,消息流在两个方向上异步流动。
让我们创建一个简单的游戏模拟来测试这一点。在这个游戏中,通道的每一方都将成为一个玩家。当游戏运行时,这些玩家将以随机的时间间隔向对方发送消息。对方会对信息做出反应。
首先,我们将在服务器上创建处理程序。与前面一样,我们在RSocketImpl
中添加了:
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
Flux.from(payloads)
.subscribe(gameController::processPayload);
return Flux.from(gameController);
}
requestChannel
处理程序具有用于输入和输出的有效负载流。Publisher<Payload>
输入参数是从客户机接收的有效负载流。当它们到达时,这些有效负载被传递给gameController::processPayload
函数。
作为响应,我们将不同的流量流返回给客户机。这个流是从我们的gameController
创建的,它也是一个发布者。
以下是GameController
类的摘要:
public class GameController implements Publisher<Payload> {
@Override
public void subscribe(Subscriber<? super Payload> subscriber) {
// send Payload messages to the subscriber at random intervals
}
public void processPayload(Payload payload) {
// react to messages from the other player
}
}
当GameController
收到订户时,它开始向该订户发送消息。
接下来,让我们创建客户端:
public class ChannelClient {
private final RSocket socket;
private final GameController gameController;
public ChannelClient() {
this.socket = RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", TCP_PORT))
.start()
.block();
this.gameController = new GameController("Client Player");
}
public void playGame() {
socket.requestChannel(Flux.from(gameController))
.doOnNext(gameController::processPayload)
.blockLast();
}
public void dispose() {
this.socket.dispose();
}
}
正如我们在前面的示例中看到的,客户机以与其他客户机相同的方式连接到服务器。
客户端创建自己的GameController
实例。
我们使用socket.requestChannel()
将有效负载流发送到服务器。服务器用自己的有效负载流进行响应。
当从服务器接收到有效负载时,我们将它们传递给gameController::processPayload
处理程序。
在我们的游戏模拟中,客户端和服务器是彼此的镜像。即,每一方正在发送有效负载流并且从另一端接收有效负载流。
流独立运行,没有同步。
最后,让我们在测试中运行模拟:
@Test
public void whenRunningChannelGame_thenLogTheResults() {
ChannelClient client = new ChannelClient();
client.playGame();
client.dispose();
}
总结
在这篇介绍性文章中,我们探讨了RSocket提供的交互模型。示例的完整源代码可以在我们的Github存储库中找到:https://github.com/eugenp/tutorials/tree/master/rsocket。
除特别注明外,本站所有文章均为老K的Java博客原创,转载请注明出处来自https://javakk.com/1712.html
暂无评论