3年前 (2021-08-28)  相关技术 |   抢沙发  504 
文章评分 0 次,平均分 0.0

RSocket入门系列之二

在本博客中,您将继续第1部分https://javakk.com/2174.html之后的内容。您将探索RSocket通信模型Fire-and-Forget、Request-Stream和Channel。对于所有这些模型,您将创建服务器、客户机和单元测试。

在第1部分https://javakk.com/2174.html中,您学习了RSocket通信协议的基础知识。建议在继续第2部分之前先阅读第1部分。请记住,RSocket提供了4种通信模式:

  • Request-Response (a stream of 1)
  • Fire-and-Forget (no response)
  • Request-Stream (a stream of many)
  • Channel (bi-directional streams)

您在第1部分中介绍了请求-响应,其他内容将在第2部分中介绍。

本文中使用的源代码当然可以在GitHub上找到:https://github.com/mydeveloperplanet/myrsocketplanet

Fire-and-Forget模型

Fire-and-Forget模型与请求-响应模型非常相似。唯一的区别是,您不希望您的请求得到响应。

服务器端

RSocketServerController中,创建一个方法fireAndForget。由于请求不返回任何内容,因此该方法的返回类型为void。同样,使用注释@MessageMapping可以定义路由的名称。与请求-响应示例一样,服务器接收一条通知消息。为了在收到消息时看到发生的事情,只需记录通知消息。

@MessageMapping("my-fire-and-forget")
public void fireAndForget(Notification notification) {
    logger.info("Received notification: " + notification);
}

客户端

RSocketClientController中,创建一个方法fireAndForget。除了预期的返回类型之外,该实现与请求-响应示例相同。这里使用retrieveMono(Void.class)而不是retrieveMono(Notification.class)

@GetMapping("/fire-and-forget")
public Mono<Void> fireAndForget() {
    Notification notification = new Notification(CLIENT, SERVER, "Test the Fire-And-Forget interaction model");
    logger.info("Send notification for my-fire-and-forget: " + notification);
    return rSocketRequester
            .route("my-fire-and-forget")
            .data(notification)
            .retrieveMono(Void.class);
}

启动服务器和客户端并调用URL:

$ http://localhost:8080/fire-and-forget

如您所见,没有返回响应。在客户端和服务器的日志记录中,您可以验证发送和接收消息。

客户端:

Send notification for my-fire-and-forget: Notification{source='Client', destination='Server', text='Test the Fire-And-Forget interaction model'}方

服务端:

Received notification: Notification{source='Client', destination='Server', text='Test the Fire-And-Forget interaction model'}

测试方

测试同样与客户机代码和请求-响应示例非常相似。为了验证Mono是否发出任何数据,调用verifyComplete就足够了。您不需要使用Next调用ConsumerWithNext。如果Mono确实发出数据,测试应该失败。但是,将路由my-fire-and-forget替换为my-request-response,并不会使测试失败。目前尚不清楚为什么这项测试不会失败。

@Test
void testFireAndForget() {
    // Send a fire-and-forget message
    Mono<Void> result = rSocketRequester
            .route("my-fire-and-forget")
            .data(new Notification(CLIENT, SERVER, "Test the Fire-And-Forget interaction model"))
            .retrieveMono(Void.class);
 
    // Assert that the result is a completed Mono.
    StepVerifier
            .create(result)
            .verifyComplete();
}

Request-Stream模型

使用Request-Stream请求流模型,您向服务器发送一个请求,并将收到一个通知消息流。

服务器端

RSocketServerController中,创建一个方法requestStream。这一次,服务器将返回大量通知消息。同样,使用注释@MessageMapping可以定义路由的名称。在本例中,在收到通知消息后,将返回一个通量,该通量每3秒发出一个新通知。

@MessageMapping("my-request-stream")
Flux<Notification> requestStream(Notification notification) {
    logger.info("Received notification for my-request-stream: " + notification);
    return Flux
            .interval(Duration.ofSeconds(3))
            .map(i -> new Notification(notification.getDestination(), notification.getSource(), "In response to: " + notification.getText()));
}

客户端

RSocketClientController中,创建一个方法requestStream。除了预期的返回类型之外,该实现与请求-响应示例相同。这里使用retrieveFlux(Notification.class)而不是retrieveMono(Notification.class)

@GetMapping("/request-stream")
public ResponseEntity<Flux<Notification>> requestStream() {
    Notification notification = new Notification(CLIENT, SERVER, "Test the Request-Stream interaction model");
    logger.info("Send notification for my-request-stream: " + notification);
    Flux<Notification> notificationFlux = rSocketRequester
            .route("my-request-stream")
            .data(notification)
            .retrieveFlux(Notification.class);
    return ResponseEntity.ok().contentType(MediaType.TEXT_EVENT_STREAM).body(notificationFlux);
}

启动服务器和客户端并调用URL:

$ curl http://localhost:8080/request-stream
data:{"source":"Server","destination":"Client","text":"In response to: Test the Request-Stream interaction model"}
 
data:{"source":"Server","destination":"Client","text":"In response to: Test the Request-Stream interaction model"}
 
data:{"source":"Server","destination":"Client","text":"In response to: Test the Request-Stream interaction model"}
 
data:{"source":"Server","destination":"Client","text":"In response to: Test the Request-Stream interaction model"}
...

如您所见,服务器每3秒发出一次通知。在客户端和服务器的日志记录中,您可以验证发送和接收消息。

客户端:

Send notification for my-request-stream: Notification{source='Client', destination='Server', text='Test the Request-Stream interaction model'}

服务端:

Received notification for my-request-stream: Notification{source='Client', destination='Server', text='Test the Request-Stream interaction model'}

测试方

测试同样类似于客户端代码。在验证过程中,先验证收到的第一条消息,然后验证是否收到5条消息,最后验证最后一条消息。

@Test
void testRequestStream() {
    // Send a request message
    Flux<Notification> result = rSocketRequester
            .route("my-request-stream")
            .data(new Notification(CLIENT, SERVER, "Test the Request-Stream interaction model"))
            .retrieveFlux(Notification.class);
 
    // Verify that the response messages contain the expected data
    StepVerifier
      .create(result)
      .consumeNextWith(notification -> {
         assertThat(notification.getSource()).isEqualTo(SERVER);
         assertThat(notification.getDestination())
                                             .isEqualTo(CLIENT);
         assertThat(notification.getText()).isEqualTo("In response to: Test the Request-Stream interaction model");})
      .expectNextCount(5)
      .consumeNextWith(notification -> {
         assertThat(notification.getSource()).isEqualTo(SERVER);
         assertThat(notification.getDestination())
                                             .isEqualTo(CLIENT);
         assertThat(notification.getText()).isEqualTo("In response to: Test the Request-Stream interaction model");})
      .thenCancel()
      .verify();
}

Channel模型

通道模型比您目前看到的其他模型要复杂一些。在这里,您将发送一个通量,作为响应,将返回一个通量。这使您能够像在聊天对话中那样来回发送消息。

服务器端

RSocketServerController中,创建一个方法通道。收到通知后,您将增加一个计数器,并且每隔一秒钟计数器notificationCount的结果将发送给客户端。为了能够跟踪正在发生的事情,您可以在接收通知和返回结果时添加日志记录。

@MessageMapping("my-channel")
public Flux<Long> channel(Flux<Notification> notifications) {
    final AtomicLong notificationCount = new AtomicLong(0);
    return notifications
        .doOnNext(notification -> {
            logger.info("Received notification for channel: " + notification);
            notificationCount.incrementAndGet();
         })
        .switchMap(notification -> 
Flux.interval(Duration.ofSeconds(1)).map(new Object() {
                private Function<Long, Long> numberOfMessages(AtomicLong notificationCount) {
                    long count = notificationCount.get();
                    logger.info("Return flux with count: " + count);
                    return i -> count;
                }
         }.numberOfMessages(notificationCount))).log();
}

客户端

RSocketClientController中,创建一个方法通道。你需要创造一个通量。为了实现这一点,您创建了3个Mono通知项,一个延迟为0秒(notification0),一个延迟为2秒(notification2),一个延迟为5秒(notification5)。您可以使用刚刚创建的Mono的组合来创建流量通知。每次通量发出通知时,您都会将其记录下来,以便能够跟踪正在发生的事情。最后,将流量发送到RSocket通道,并以Long流量的形式检索响应,然后将其返回给调用者。

@GetMapping("/channel")
public ResponseEntity<Flux<Long>> channel() {
    Mono<Notification> notification0 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model"));
    Mono<Notification> notification2 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model")).delayElement(Duration.ofSeconds(2));
    Mono<Notification> notification5 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model")).delayElement(Duration.ofSeconds(5));
         
    Flux<Notification> notifications = Flux.concat(notification0, notification5, notification0, notification2, notification2, notification2)
            .doOnNext(d -> logger.info("Send notification for my-channel"));
 
    Flux<Long> numberOfNotifications = this.rSocketRequester
            .route("my-channel")
            .data(notifications)
            .retrieveFlux(Long.class);
 
    return  ResponseEntity.ok().contentType(MediaType.TEXT_EVENT_STREAM).body(numberOfNotifications);
}

启动服务器和客户端并调用URL:

$ curl http://localhost:8080/channel
data:1
data:1
data:1
data:1
data:3
data:3
data:4
data:4
data:5
data:5
data:6
data:6
...

结果如预期。首先发送通知0,5秒(通知5)后,以下通知与通知0一起发送,2秒后发送通知2,2秒后发送新通知,最后2秒后发送最后一个通知。在最后一个结果之后,流量将继续发送计数为6的消息。在客户端和服务器的日志记录中,您可以验证发送和接收消息。这一次,包括时间戳,完整的日志记录包含更多的信息,为了简洁起见,这些信息被省略了。您应该在自己运行示例时更仔细地查看它。重要的是要注意每秒出现的onNext日志语句,它们与服务器发送的响应相对应。

客户端:

17:01:19.820 Send notification for my-channel
17:01:24.849 Send notification for my-channel
17:01:24.879 Send notification for my-channel
17:01:26.881 Send notification for my-channel
17:01:28.908 Send notification for my-channel
17:01:30.935 Send notification for my-channel

服务端:

17:01:19.945 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:19.947 Return flux with count: 1
17:01:20.949 onNext(1)
17:01:21.947 onNext(1)
17:01:22.947 onNext(1)
17:01:23.947 onNext(1)
17:01:24.881 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:24.882 Return flux with count: 2
17:01:24.884 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:24.885 Return flux with count: 3
17:01:25.886 onNext(3)
17:01:26.886 onNext(3)
17:01:26.909 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:26.909 Return flux with count: 4
17:01:27.910 onNext(4)
17:01:28.910 onNext(4)
17:01:28.936 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:28.937 Return flux with count: 5
17:01:29.937 onNext(5)
17:01:30.937 onNext(5)
17:01:30.963 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:30.964 Return flux with count: 6
17:01:31.964 onNext(6)
17:01:32.964 onNext(6)
...

测试方

测试与客户机代码类似。将消息发送到通道并验证结果计数。为了简洁起见,测试中的重复元素被省略了,完整的测试可以在GitHub上获得。

@Test
void testChannel() {
    Mono<Notification> notification0 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model"));
    Mono<Notification> notification2 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model")).delayElement(Duration.ofSeconds(2));
    Mono<Notification> notification5 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model")).delayElement(Duration.ofSeconds(5));
 
    Flux<Notification> notifications = Flux.concat(notification0, notification5, notification0, notification2, notification2, notification2);
    // Send a request message
    Flux<Long> result = rSocketRequester
            .route("my-channel")
            .data(notifications)
            .retrieveFlux(Long.class);
 
    // Verify that the response messages contain the expected data
    StepVerifier
            .create(result)
            .consumeNextWith(count -> {
                assertThat(count).isEqualTo(1);
                })
...
            .consumeNextWith(count -> {
                assertThat(count).isEqualTo(6);
                })
            .thenCancel()
            .verify();
}

结论

您已经学习了如何为RSocket通信模型Fire-and-ForgetRequest-StreamsChannel创建服务器、客户端和单元测试。到现在为止,你已经掌握了一些基本知识,可以自己进行一些探索、实验。

 

除特别注明外,本站所有文章均为老K的Java博客原创,转载请注明出处来自https://javakk.com/2276.html

关于

发表评论

表情 格式

暂无评论

登录

忘记密码 ?

切换登录

注册