使用RSocket,客户机和服务器之间的界限是模糊的。使用Rsocket,服务器可以向客户端发送消息,客户端可以像服务器一样响应这些请求。
实际上,RSocket文档不使用术语“client”或“server”,而是使用术语“requester”和“responder”。在RSocket中,任何组件都可以充当请求者,任何组件都可以充当响应者,甚至可以同时充当两者。在RSocket中,请求者和响应者之间的所有来回通信都是通过一个“双向”连接进行的。
今天,您将通过编程rsocket客户机来响应来自服务器的请求,从而利用这些特性。在服务器端代码中,您将监听客户端连接事件,当发生连接事件时,您将新客户端存储在已连接客户端的列表中。您还将要求每个客户机在其连接处于活动状态时将遥测消息流式传输回服务器。
您可以按照下面的说明编写代码。代码也可以从GitHub下载:
https://github.com/benwilcock/spring-rsocket-demo
步骤1:更新Spring Boot和RSocket
首先,做点家务。spring boot和RSocket java库都是最近更新的。这些更新包括常见的一轮错误修复、增强和弃用,因此升级符合我们的利益。
在Maven pom.xml的<parent>
部分中,将spring-boot-starter-parent
更改为version2.3.0.RELEASE
,如下所示。您需要做两次,因为您有两个独立的项目-rsocket client
和rsocket server
。
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>COPY
要刷新代码,请在两个项目的根文件夹中运行以下命令:
./mvnw clean compile
现在您可以继续进行编码任务。
步骤2:向客户端添加请求-响应消息处理程序
客户机需要能够响应来自服务器的消息。在rsocket客户端项目的RSocketShellClient.java
中,添加一个名为ClientHandler
的新内部类,如下所示:
@Slf4j
class ClientHandler {
@MessageMapping("client-status")
public Flux<String> statusUpdate(String status) {
log.info("Connection {}", status);
return Flux.interval(Duration.ofSeconds(5)).map(index -> String.valueOf(Runtime.getRuntime().freeMemory()));
}
}
此类包含一个名为statusUpdate()
的方法,该方法用@MessageMapping
注释修饰,类似于rsocket服务器项目中的注释。客户机使用这个类和这个方法来捕获和响应来自服务器的请求。响应本身就是一个消息流。每5秒,客户机就会告诉服务器当前的可用内存。可以将其视为客户端遥测数据。
要使其工作,必须使用RSocket连接“注册”此类。您将在下一节中进行此操作。
步骤3:在客户端的构造函数中注册ClientHandler
在客户端可以响应来自服务器的消息之前,它必须用RSocket连接注册ClientHandler
。下面列出了修改后的构造函数代码。请注意对构造函数的方法签名所做的更改,以添加RSocketStrategies变量。Spring将这个变量提供给构造函数。用下面列出的新代码替换旧的构造函数代码。
public RSocketShellClient(RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies) {
// (1)
String client = UUID.randomUUID().toString();
log.info("Connecting using client ID: {}", client);
// (2)
SocketAcceptor responder = RSocketMessageHandler.responder(strategies, new ClientHandler());
// (3)
this.rsocketRequester = rsocketRequesterBuilder
.setupRoute("shell-client")
.setupData(client)
.rsocketStrategies(strategies)
.rsocketConnector(connector -> connector.acceptor(responder))
.connectTcp("localhost", 7000)
.block();
// (4)
this.rsocketRequester.rsocket()
.onClose()
.doOnError(error -> log.warn("Connection CLOSED"))
.doFinally(consumer -> log.info("Client DISCONNECTED"))
.subscribe();
}
在上面的代码中,首先生成并存储标识此客户机实例(1)的唯一ID。接下来,使用RSocket策略和一个新的ClientHandler
实例(2)创建一个新的socketaccepter
。然后使用RSocketRequestBuilder
注册新的SocketAcceptor(3)。最后,确保通过处理RSocket onClose()
事件(4)来优雅地处理断开连接。
客户端代码就是这样。让我们继续讨论服务器端更改。
步骤4:记住服务器上的客户机
在rsocket服务器项目中要做的第一件事是通过向RSocketController.java
类添加类级变量来创建RSocketRequester
客户端的集合,如下所示:
private final List<RSocketRequester> CLIENTS = new ArrayList<>();
接下来,添加一个连接处理程序,方法如下:
@ConnectMapping("shell-client")
void connectShellClientAndAskForTelemetry
(RSocketRequester requester, @Payload String client) {
// The code for the method will go HERE
}
@ConnectMapping
注释允许您在客户端连接事件发生时侦听它们。使用此事件,可以安排两项工作。第一种方法是将每个新客户机添加到客户机列表中。第二种方法是调用每个客户机并启动它们的遥测流。
将以下代码添加到刚刚创建的方法中:
requester.rsocket()
.onClose() // (1)
.doFirst(() -> {
log.info("Client: {} CONNECTED.", client);
CLIENTS.add(requester); // (2)
})
.doOnError(error -> {
log.warn("Channel to client {} CLOSED", client); // (3)
})
.doFinally(consumer -> {
CLIENTS.remove(requester);
log.info("Client {} DISCONNECTED", client); // (4)
})
.subscribe();
首先要注意的是对requester.rsocket().onClose()
方法(1)的调用。此方法返回一个被动的Mono
对象,其中包含您需要的所有回调。
mono的doFirst()
方法在任何subscribe()
调用之前被调用,但是在mono的初始创建之后被调用。使用此方法将客户机的请求者对象添加到客户机列表(2)。
这段代码可能会觉得不太直观—在客户端连接时调用onClose()
,然后使用生成的mono存储对新客户端的引用。有时,事件驱动的API会让人觉得有点奇怪。但是可以把它看作是RSocket连接发送给你一个“我还活着”事件的单声道。您正在使用该创建事件触发列表中每个客户机引用的存储。
只要连接出现问题,RSocket就会调用mono的doOnError()
方法。这包括客户端选择关闭连接的情况。您可以使用提供的错误变量来决定要采取的操作。在上面的代码中,错误只是被记录为警告(3)。
当RSocket连接关闭时,mono的doFinally()
方法被触发。此方法是运行从客户机列表(4)中删除客户机的代码的理想位置。
最后,subscribe()
激活您添加到mono中的反应式代码,并发出您准备好处理事件的信号。
第五步:要求客户提供免费的内存读取
当每个客户端连接时,请求遥测读数流。为此,再次使用connectShellClientAndAskForTelemetry()
方法,需要向前面添加的客户机状态消息处理程序发送请求。其代码如下:
requester.route("client-status")
.data("OPEN")
.retrieveFlux(String.class)
.doOnNext(s -> log.info("Client: {} Free Memory: {}.",client,s))
.subscribe();
使用请求者,以路由“client status
”为目标。发送字符串“OPEN
”作为消息数据,并检索string类型的流量。到达的每个字符串都包含客户端当前的可用内存读取。将此读数记录到控制台。最后,不要忘记subscribe()
来激活您的反应式代码。
步骤6:构建并运行RSocket服务器
是时候测试你的代码了。打开一个终端窗口,移动到rsocket server目录并按如下方式运行服务器:
cd rsocket-server
./mvnw clean package spring-boot:run -DskipTests=true
服务器在localhost端口7000
上启动。
步骤7:构建并运行RSocket客户端
打开第二个终端窗口并移到rsocket客户机目录。在此基础上,构建并运行客户端,如下所示:
cd rsocket-client
./mvnw clean package spring-boot:run -DskipTests=true
工作原理
一旦启动,您将注意到客户机和服务器组件在控制台中都有新的日志条目。在rsocket client窗口中,您将看到显示客户端的唯一ID(以UUID的形式)和“OPEN
”连接状态的日志条目,如下所示:
Connecting using client ID: 0acc1c60-4bc4-444d-bb82-eb6b510f4168
Connection OPEN
Started RsocketShellClientApplication in 1.516 seconds (JVM running for 1.814)
shell:>
至少等待10秒钟,然后在shell:>
提示符处键入exit
。rsocket客户端现在与服务器断开连接并关闭。
现在切换到rsocket服务器窗口。日志条目如下所示:
Started RsocketServerApplication in 0.945 seconds (JVM running for 1.248)
Client: 0acc1c60-4bc4-444d-bb82-eb6b510f4168 CONNECTED.
Client: 0acc1c60-4bc4-444d-bb82-eb6b510f4168 Free Memory: 211317712.
Channel to client 0acc1c60-4bc4-444d-bb82-eb6b510f4168 CLOSED
Client 0acc1c60-4bc4-444d-bb82-eb6b510f4168 DISCONNECTED
当客户机连接时,它被添加到客户机列表中,控制台打印其客户机ID和状态“CONNECTED
”。然后,每隔5秒,日志就会显示客户端当前的“可用内存”读数。最后,当客户端断开连接时,它的RSocket通道的状态变为“CLOSED
”,客户端为“DISCONNECTED
”
您可以通过在rsocket服务器的终端窗口中按Ctrl-C
来停止rsocket服务器进程。
最后的想法
向客户呼喊的能力非常强大。它非常适合各种场景,包括移动、物联网或微服务。因为所有这些都可以通过TCP或WebSockets实现,所以您已经拥有了所需的所有基础设施,而不必求助于消息代理之类的重量级解决方案。
你在这里走了很多路。你学会了如何把服务器变成“请求者”,把客户端变成“响应者”,你学会了如何监听连接事件。您还了解了一点如何处理来自rsocket连接的错误和事件。而且,尽管在本练习中,您选择了“请求流”作为服务器-客户机通信,但您可以根据需要使用四种RSocket交互模式中的任何一种。
除特别注明外,本站所有文章均为老K的Java博客原创,转载请注明出处来自https://javakk.com/2073.html
暂无评论