在本博客中,您将继续第1部分https://javakk.com/2174.html之后的内容。您将探索RSocket通信模型Fire-and-Forget、Request-Stream和Channel。对于所有这些模型,您将创建服务器、客户机和单元测试。
在第1部分https://javakk.com/2174.html中,您学习了RSocket通信协议的基础知识。建议在继续第2部分之前先阅读第1部分。请记住,RSocket提供了4种通信模式:
- Request-Response (a stream of 1)
- Fire-and-Forget (no response)
- Request-Stream (a stream of many)
- Channel (bi-directional streams)
您在第1部分中介绍了请求-响应,其他内容将在第2部分中介绍。
本文中使用的源代码当然可以在GitHub上找到:https://github.com/mydeveloperplanet/myrsocketplanet
Fire-and-Forget模型
Fire-and-Forget模型与请求-响应模型非常相似。唯一的区别是,您不希望您的请求得到响应。
服务器端
在RSocketServerController
中,创建一个方法fireAndForget
。由于请求不返回任何内容,因此该方法的返回类型为void。同样,使用注释@MessageMapping
可以定义路由的名称。与请求-响应示例一样,服务器接收一条通知消息。为了在收到消息时看到发生的事情,只需记录通知消息。
@MessageMapping("my-fire-and-forget")
public void fireAndForget(Notification notification) {
logger.info("Received notification: " + notification);
}
客户端
在RSocketClientController
中,创建一个方法fireAndForget
。除了预期的返回类型之外,该实现与请求-响应示例相同。这里使用retrieveMono(Void.class)
而不是retrieveMono(Notification.class)
@GetMapping("/fire-and-forget")
public Mono<Void> fireAndForget() {
Notification notification = new Notification(CLIENT, SERVER, "Test the Fire-And-Forget interaction model");
logger.info("Send notification for my-fire-and-forget: " + notification);
return rSocketRequester
.route("my-fire-and-forget")
.data(notification)
.retrieveMono(Void.class);
}
启动服务器和客户端并调用URL:
$ http://localhost:8080/fire-and-forget
如您所见,没有返回响应。在客户端和服务器的日志记录中,您可以验证发送和接收消息。
客户端:
Send notification for my-fire-and-forget: Notification{source='Client', destination='Server', text='Test the Fire-And-Forget interaction model'}方
服务端:
Received notification: Notification{source='Client', destination='Server', text='Test the Fire-And-Forget interaction model'}
测试方
测试同样与客户机代码和请求-响应示例非常相似。为了验证Mono是否发出任何数据,调用verifyComplete
就足够了。您不需要使用Next调用ConsumerWithNext
。如果Mono确实发出数据,测试应该失败。但是,将路由my-fire-and-forget
替换为my-request-response
,并不会使测试失败。目前尚不清楚为什么这项测试不会失败。
@Test
void testFireAndForget() {
// Send a fire-and-forget message
Mono<Void> result = rSocketRequester
.route("my-fire-and-forget")
.data(new Notification(CLIENT, SERVER, "Test the Fire-And-Forget interaction model"))
.retrieveMono(Void.class);
// Assert that the result is a completed Mono.
StepVerifier
.create(result)
.verifyComplete();
}
Request-Stream模型
使用Request-Stream请求流模型,您向服务器发送一个请求,并将收到一个通知消息流。
服务器端
在RSocketServerController
中,创建一个方法requestStream
。这一次,服务器将返回大量通知消息。同样,使用注释@MessageMapping
可以定义路由的名称。在本例中,在收到通知消息后,将返回一个通量,该通量每3秒发出一个新通知。
@MessageMapping("my-request-stream")
Flux<Notification> requestStream(Notification notification) {
logger.info("Received notification for my-request-stream: " + notification);
return Flux
.interval(Duration.ofSeconds(3))
.map(i -> new Notification(notification.getDestination(), notification.getSource(), "In response to: " + notification.getText()));
}
客户端
在RSocketClientController
中,创建一个方法requestStream
。除了预期的返回类型之外,该实现与请求-响应示例相同。这里使用retrieveFlux(Notification.class)
而不是retrieveMono(Notification.class)
。
@GetMapping("/request-stream")
public ResponseEntity<Flux<Notification>> requestStream() {
Notification notification = new Notification(CLIENT, SERVER, "Test the Request-Stream interaction model");
logger.info("Send notification for my-request-stream: " + notification);
Flux<Notification> notificationFlux = rSocketRequester
.route("my-request-stream")
.data(notification)
.retrieveFlux(Notification.class);
return ResponseEntity.ok().contentType(MediaType.TEXT_EVENT_STREAM).body(notificationFlux);
}
启动服务器和客户端并调用URL:
$ curl http://localhost:8080/request-stream
data:{"source":"Server","destination":"Client","text":"In response to: Test the Request-Stream interaction model"}
data:{"source":"Server","destination":"Client","text":"In response to: Test the Request-Stream interaction model"}
data:{"source":"Server","destination":"Client","text":"In response to: Test the Request-Stream interaction model"}
data:{"source":"Server","destination":"Client","text":"In response to: Test the Request-Stream interaction model"}
...
如您所见,服务器每3秒发出一次通知。在客户端和服务器的日志记录中,您可以验证发送和接收消息。
客户端:
Send notification for my-request-stream: Notification{source='Client', destination='Server', text='Test the Request-Stream interaction model'}
服务端:
Received notification for my-request-stream: Notification{source='Client', destination='Server', text='Test the Request-Stream interaction model'}
测试方
测试同样类似于客户端代码。在验证过程中,先验证收到的第一条消息,然后验证是否收到5条消息,最后验证最后一条消息。
@Test
void testRequestStream() {
// Send a request message
Flux<Notification> result = rSocketRequester
.route("my-request-stream")
.data(new Notification(CLIENT, SERVER, "Test the Request-Stream interaction model"))
.retrieveFlux(Notification.class);
// Verify that the response messages contain the expected data
StepVerifier
.create(result)
.consumeNextWith(notification -> {
assertThat(notification.getSource()).isEqualTo(SERVER);
assertThat(notification.getDestination())
.isEqualTo(CLIENT);
assertThat(notification.getText()).isEqualTo("In response to: Test the Request-Stream interaction model");})
.expectNextCount(5)
.consumeNextWith(notification -> {
assertThat(notification.getSource()).isEqualTo(SERVER);
assertThat(notification.getDestination())
.isEqualTo(CLIENT);
assertThat(notification.getText()).isEqualTo("In response to: Test the Request-Stream interaction model");})
.thenCancel()
.verify();
}
Channel模型
通道模型比您目前看到的其他模型要复杂一些。在这里,您将发送一个通量,作为响应,将返回一个通量。这使您能够像在聊天对话中那样来回发送消息。
服务器端
在RSocketServerController
中,创建一个方法通道。收到通知后,您将增加一个计数器,并且每隔一秒钟计数器notificationCount
的结果将发送给客户端。为了能够跟踪正在发生的事情,您可以在接收通知和返回结果时添加日志记录。
@MessageMapping("my-channel")
public Flux<Long> channel(Flux<Notification> notifications) {
final AtomicLong notificationCount = new AtomicLong(0);
return notifications
.doOnNext(notification -> {
logger.info("Received notification for channel: " + notification);
notificationCount.incrementAndGet();
})
.switchMap(notification ->
Flux.interval(Duration.ofSeconds(1)).map(new Object() {
private Function<Long, Long> numberOfMessages(AtomicLong notificationCount) {
long count = notificationCount.get();
logger.info("Return flux with count: " + count);
return i -> count;
}
}.numberOfMessages(notificationCount))).log();
}
客户端
在RSocketClientController
中,创建一个方法通道。你需要创造一个通量。为了实现这一点,您创建了3个Mono通知项,一个延迟为0秒(notification0),一个延迟为2秒(notification2),一个延迟为5秒(notification5)。您可以使用刚刚创建的Mono的组合来创建流量通知。每次通量发出通知时,您都会将其记录下来,以便能够跟踪正在发生的事情。最后,将流量发送到RSocket通道,并以Long流量的形式检索响应,然后将其返回给调用者。
@GetMapping("/channel")
public ResponseEntity<Flux<Long>> channel() {
Mono<Notification> notification0 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model"));
Mono<Notification> notification2 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model")).delayElement(Duration.ofSeconds(2));
Mono<Notification> notification5 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model")).delayElement(Duration.ofSeconds(5));
Flux<Notification> notifications = Flux.concat(notification0, notification5, notification0, notification2, notification2, notification2)
.doOnNext(d -> logger.info("Send notification for my-channel"));
Flux<Long> numberOfNotifications = this.rSocketRequester
.route("my-channel")
.data(notifications)
.retrieveFlux(Long.class);
return ResponseEntity.ok().contentType(MediaType.TEXT_EVENT_STREAM).body(numberOfNotifications);
}
启动服务器和客户端并调用URL:
$ curl http://localhost:8080/channel
data:1
data:1
data:1
data:1
data:3
data:3
data:4
data:4
data:5
data:5
data:6
data:6
...
结果如预期。首先发送通知0,5秒(通知5)后,以下通知与通知0一起发送,2秒后发送通知2,2秒后发送新通知,最后2秒后发送最后一个通知。在最后一个结果之后,流量将继续发送计数为6的消息。在客户端和服务器的日志记录中,您可以验证发送和接收消息。这一次,包括时间戳,完整的日志记录包含更多的信息,为了简洁起见,这些信息被省略了。您应该在自己运行示例时更仔细地查看它。重要的是要注意每秒出现的onNext
日志语句,它们与服务器发送的响应相对应。
客户端:
17:01:19.820 Send notification for my-channel
17:01:24.849 Send notification for my-channel
17:01:24.879 Send notification for my-channel
17:01:26.881 Send notification for my-channel
17:01:28.908 Send notification for my-channel
17:01:30.935 Send notification for my-channel
服务端:
17:01:19.945 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:19.947 Return flux with count: 1
17:01:20.949 onNext(1)
17:01:21.947 onNext(1)
17:01:22.947 onNext(1)
17:01:23.947 onNext(1)
17:01:24.881 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:24.882 Return flux with count: 2
17:01:24.884 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:24.885 Return flux with count: 3
17:01:25.886 onNext(3)
17:01:26.886 onNext(3)
17:01:26.909 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:26.909 Return flux with count: 4
17:01:27.910 onNext(4)
17:01:28.910 onNext(4)
17:01:28.936 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:28.937 Return flux with count: 5
17:01:29.937 onNext(5)
17:01:30.937 onNext(5)
17:01:30.963 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:30.964 Return flux with count: 6
17:01:31.964 onNext(6)
17:01:32.964 onNext(6)
...
测试方
测试与客户机代码类似。将消息发送到通道并验证结果计数。为了简洁起见,测试中的重复元素被省略了,完整的测试可以在GitHub上获得。
@Test
void testChannel() {
Mono<Notification> notification0 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model"));
Mono<Notification> notification2 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model")).delayElement(Duration.ofSeconds(2));
Mono<Notification> notification5 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model")).delayElement(Duration.ofSeconds(5));
Flux<Notification> notifications = Flux.concat(notification0, notification5, notification0, notification2, notification2, notification2);
// Send a request message
Flux<Long> result = rSocketRequester
.route("my-channel")
.data(notifications)
.retrieveFlux(Long.class);
// Verify that the response messages contain the expected data
StepVerifier
.create(result)
.consumeNextWith(count -> {
assertThat(count).isEqualTo(1);
})
...
.consumeNextWith(count -> {
assertThat(count).isEqualTo(6);
})
.thenCancel()
.verify();
}
结论
您已经学习了如何为RSocket通信模型Fire-and-Forget、Request-Streams和Channel创建服务器、客户端和单元测试。到现在为止,你已经掌握了一些基本知识,可以自己进行一些探索、实验。
除特别注明外,本站所有文章均为老K的Java博客原创,转载请注明出处来自https://javakk.com/2276.html
暂无评论