3年前 (2021-07-29)  相关技术 |   抢沙发  1273 
文章评分 0 次,平均分 0.0
[收起] 文章目录

RSocket入门系列之一

在这篇文章中,您将学习RSocket的基础知识,RSocket是一个支持反应流的二进制应用程序协议。在介绍之后,您将学习如何将RSocket与Spring Boot结合使用。好好享受!

简介

RSocket是在TCP或WebSockets之上使用的二进制协议。RSocket是一种包含反应式原理的通信协议。这意味着RSocket使用异步通信。它也适用于推送通知。例如,当使用HTTP时,需要进行轮询以检查是否有新消息可用。这会导致不必要的网络负载。RSocket提供了一个解决方案。有4种通信模式可供使用:

  • 请求-响应(1的流)
  • 开火然后忘记(没有回应)
  • 请求流(多个流)
  • 通道(双向流)

RSocket位于OSI第5/6层,因此位于TCP/IP模型的应用层。

在下一节中,您将找到每个通信模型的示例:服务器端、客户端和单元测试。本文中使用的源代码当然可以在GitHub上找到:https://github.com/mydeveloperplanet/myrsocketplanet

请求-响应模型

请求-响应模型将允许您发送一个请求并接收一个响应作为回报。首先要做的是建立一个基本的Spring引导应用程序。导航到springinitializer网站,添加依赖关系RSocket并创建可以在您喜爱的IDE中打开的项目。在检查pom时,您注意到添加了spring-boot-starter-rsocket依赖项和reactor-test。第一个将在SpringBoot应用程序中启用RSocket支持,第二个用于测试目的。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
...
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
</dependency>

GitHub的源代码分为两个Maven模块,一个用于服务器,一个用于客户端。

为了在客户机和服务器之间交换信息,您创建了一个通知数据类,它将是要通过RSocket传输的项。通知类包含一个源、一个目标和一些自由文本。toString实现将用于日志记录目的。

public class Notification {
    private String source;
    private String destination;
    private String text;
 
    public Notification() {
        super();
    }
 
    public Notification(String source, String destination, String text) {
        this.source = source;
        this.destination = destination;
        this.text = text;
    }
 
    public String getSource() {
        return source;
    }
 
    public String getDestination() {
        return destination;
    }
 
    public String getText() {
        return text;
    }
 
    @Override
    public String toString() {
        return "Notification{" +
                "source='" + source + '\'' +
                ", destination='" + destination + '\'' +
                ", text='" + text + '\'' +
                '}';
    }
}

服务器端

创建一个RsocketServerController并用@Controller对其进行注释。为了创建第一个RSocket Request-Response示例,您只需添加一个方法requestResponse,该方法接受一个通知,记录接收到的通知,并返回一个新的通知,在该通知中交换接收到的源和目标,并向其添加一个简单的文本。为了使它成为一个RSocket请求,您需要用@MessageMapping注释这个方法,并给它一个名称,例如my request response。

@Controller
public class RsocketServerController {
 
    Logger logger = LoggerFactory.getLogger(RsocketServerController.class);
 
    @MessageMapping("my-request-response")
    public Notification requestResponse(Notification notification) {
        logger.info("Received notification for my-request-response: " + notification);
        return new Notification(notification.getDestination(), notification.getSource(), "In response to: " + notification.getText());
    }
}

为了确保RSocket服务器已启动,还需要将端口添加到application.properties文件中:

spring.rsocket.server.port=7000

启动服务器:

$ mvn spring-boot:run

在日志记录中,您注意到Netty web服务器已经启动。Netty是Jetty web服务器的反应式对应物。

Netty RSocket started on port(s): 7000

客户端

客户端稍微复杂一点。您将再次创建一个SpringBoot应用程序,它将向服务器发送通知消息。发送消息将通过http调用调用。因此,将依赖关系spring-boot-starter-webflux添加到客户端pom中。请注意,您不能使用spring-boot-starter-web,您需要使用被动webflux变体。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

确保您没有在application.properties中定义端口,否则将启动RSocket服务器,而这不是您的客户机所需要的。执行此操作时,控制台中将出现以下错误。

2021-01-02 12:04:58.853 ERROR 19058 --- [           main] o.s.boot.SpringApplication               : Application run failed
org.springframework.context.ApplicationContextException: Failed to start bean 'rSocketServerBootstrap'; nested exception is reactor.netty.ChannelBindException: Failed to bind on [0.0.0.0:7000]
...
Caused by: reactor.netty.ChannelBindException: Failed to bind on [0.0.0.0:7000]
    Suppressed: java.lang.Exception: #block terminated with an error
...

创建一个RsocketClientController并用@RestController对其进行注释。接下来,您需要创建一个RSocketRequester实例,以便能够连接到RSocket服务器。在requestResponse方法中,创建通知消息(为了方便使用,只需从服务器模块复制通知类,并确保它也存在于客户端),并使用rSocketRequester实例,指定要将消息发送到的路由(名称等于服务器端的@MessageMapping注释所指定的名称)、要发送的数据,最后是期望的响应。响应将是Mono,这意味着您希望从服务器得到一个响应,而响应需要是一个通知消息。消息本身被返回给调用者。

@RestController
public class RsocketClientController {
 
    private static final String CLIENT = "Client";
    private static final String SERVER = "Server";
 
    private final RSocketRequester rSocketRequester;
 
    Logger logger = LoggerFactory.getLogger(RsocketClientController.class);
 
    public RsocketClientController(@Autowired RSocketRequester.Builder builder) {
        this.rSocketRequester = builder.tcp("localhost", 7000);
    }
 
    @GetMapping("/request-response")
    public Mono<Notification> requestResponse() {
        Notification notification = new Notification(CLIENT, SERVER, "Test the Request-Response interaction model");
        logger.info("Send notification for my-request-response: " + notification);
        return rSocketRequester
                .route("my-request-response")
                .data(notification)
                .retrieveMono(Notification.class);
    }
}

启动服务器和客户端并调用URL:

$ curl http://localhost:8080/request-response
{"source":"Server","destination":"Client","text":"In response to: Test the Request-Response interaction model"}

如您所见,将返回服务器响应。在客户机和服务器的日志记录中,您可以验证消息的发送和接收。

客户端:

Send notification for my-request-response: Notification{source='Client', destination='Server', text='Test the Request-Response interaction model'}

服务端:

Received notification for my-request-response: Notification{source='Client', destination='Server', text='Test the Request-Response interaction model'}

验证

为服务器代码创建测试与创建客户机代码非常相似。需要创建RSocketRequester才能设置连接。发送消息与客户端代码相同,只是这次将响应放入Mono<Notification>类型的结果变量中。您可以在StepVerifier中使用此结果变量来验证接收到的响应。使用StepVerifier,您可以在单元测试中验证被动响应。

@SpringBootTest
class MyRsocketServerPlanetApplicationTests {
 
    private static final String CLIENT = "Client";
    private static final String SERVER = "Server";
 
    private static RSocketRequester rSocketRequester;
 
    @BeforeAll
    public static void setupOnce(@Autowired RSocketRequester.Builder builder, @Value("${spring.rsocket.server.port}") Integer port) {
        rSocketRequester = builder.tcp("localhost", port);
    }
 
    @Test
    void testRequestResponse() {
        // Send a request message
        Mono<Notification> result = rSocketRequester
                .route("my-request-response")
                .data(new Notification(CLIENT, SERVER, "Test the Request-Response interaction model"))
                .retrieveMono(Notification.class);
 
        // Verify that the response message contains the expected data
        StepVerifier
                .create(result)
                .consumeNextWith(notification -> {
                    assertThat(notification.getSource()).isEqualTo(SERVER);
                    assertThat(notification.getDestination()).isEqualTo(CLIENT);
                    assertThat(notification.getText()).isEqualTo("In response to: Test the Request-Response interaction model");
                })
                .verifyComplete();
    }
}

原文地址:https://mydeveloperplanet.com/2021/02/03/getting-started-with-rsocket-part-1/

 

除特别注明外,本站所有文章均为老K的Java博客原创,转载请注明出处来自https://javakk.com/2174.html

关于

发表评论

表情 格式

暂无评论

登录

忘记密码 ?

切换登录

注册