3年前 (2021-03-14)  相关技术 |   抢沙发  615 
文章评分 1 次,平均分 5.0

响应式协议RSocket介绍

什么是RSocket

RSocket是一种用于分布式应用程序的二进制点对点通信协议。从这个意义上说,它提供了一种替代HTTP等其他协议的方法。

RSocket和其他协议之间的全面比较超出了本文的范围。相反,我们将关注RSocket的一个关键特性:它的交互模型。

RSocket提供了四种交互模型。考虑到这一点,我们将用一个例子来探讨每一个问题。

Maven依赖项

对于我们的示例,RSocket只需要两个直接依赖项:

<dependency>
    <groupId>io.rsocket</groupId>
    <artifactId>rsocket-core</artifactId>
    <version>0.11.13</version>
</dependency>
<dependency>
    <groupId>io.rsocket</groupId>
    <artifactId>rsocket-transport-netty</artifactId>
    <version>0.11.13</version>
</dependency>

rsocket corersocket transport netty依赖项在Maven Central上提供。

一个重要的注意事项是RSocket库经常使用反应流。本文中使用了FluxMono类,因此对它们的基本理解将很有帮助。

服务器设置

首先,让我们创建服务器类:

public class Server {
    private final Disposable server;

    public Server() {
        this.server = RSocketFactory.receive()
          .acceptor((setupPayload, reactiveSocket) -> Mono.just(new RSocketImpl()))
          .transport(TcpServerTransport.create("localhost", TCP_PORT))
          .start()
          .subscribe();
    }

    public void dispose() {
        this.server.dispose();
    }

    private class RSocketImpl extends AbstractRSocket {}
}

接下来,要启动服务器,我们只需要实例化它:

Server server = new Server();

一个服务器实例可以处理多个连接。因此,只有一个服务器实例支持我们的所有示例。

完成后,dispose方法将停止服务器并释放TCP端口。

交互作用模型

请求/响应

RSocket提供了一个请求/响应模型——每个请求接收一个响应。

对于这个模型,我们将创建一个简单的服务,将消息返回给客户机。

让我们首先向AbstractRSocket的扩展RSocketImpl添加一个方法:

@Override
public Mono<Payload> requestResponse(Payload payload) {
    try {
        return Mono.just(payload); // reflect the payload back to the sender
    } catch (Exception x) {
        return Mono.error(x);
    }
}

requestResponse方法为每个请求返回一个结果,我们可以通过Mono<Payload>响应类型看到这一点。

Payload是包含消息内容和元数据的类。所有的交互模型都使用它。有效负载的内容是二进制的,但是有一些方便的方法支持基于字符串的内容。

接下来,我们可以创建客户端类:

public class ReqResClient {

    private final RSocket socket;

    public ReqResClient() {
        this.socket = RSocketFactory.connect()
          .transport(TcpClientTransport.create("localhost", TCP_PORT))
          .start()
          .block();
    }

    public String callBlocking(String string) {
        return socket
          .requestResponse(DefaultPayload.create(string))
          .map(Payload::getDataUtf8)
          .block();
    }

    public void dispose() {
        this.socket.dispose();
    }
}

客户端使用RSocketFactory.connect()方法启动与服务器的套接字连接。我们使用套接字上的requestResponse方法向服务器发送有效负载。

我们的负载包含传递到客户端的字符串。当Mono<Payload>响应到达时,我们可以使用getDataUtf8()方法访问响应的字符串内容。

最后,我们可以运行集成测试来查看请求/响应的实际情况。我们将向服务器发送一个字符串,并验证返回的字符串是否相同:

@Test
public void whenSendingAString_thenRevceiveTheSameString() {
    ReqResClient client = new ReqResClient();
    String string = "Hello RSocket";

    assertEquals(string, client.callBlocking(string));

    client.dispose();
}

Fire-and-Forget

使用fire-and-forget模型,客户端将不会收到来自服务器的响应。

在本例中,客户机将以50ms的间隔向服务器发送模拟测量。服务器将发布测量结果。

让我们在RSocketImpl类中向服务器添加一个fire-and-forget处理程序:

@Override
public Mono<Void> fireAndForget(Payload payload) {
    try {
        dataPublisher.publish(payload); // forward the payload
        return Mono.empty();
    } catch (Exception x) {
        return Mono.error(x);
    }
}

这个处理程序看起来非常类似于请求/响应处理程序。但是,fireAndForget返回Mono<Void>而不是Mono<Payload>

dataPublisherorg.reactivestreams.Publisher的实例. 因此,它使有效负载对订户可用。我们将在请求/流示例中使用它。

接下来,我们将创建fire-and-forget客户端:

public class FireNForgetClient {
    private final RSocket socket;
    private final List<Float> data;

    public FireNForgetClient() {
        this.socket = RSocketFactory.connect()
          .transport(TcpClientTransport.create("localhost", TCP_PORT))
          .start()
          .block();
    }

    /** Send binary velocity (float) every 50ms */
    public void sendData() {
        data = Collections.unmodifiableList(generateData());
        Flux.interval(Duration.ofMillis(50))
          .take(data.size())
          .map(this::createFloatPayload)
          .flatMap(socket::fireAndForget)
          .blockLast();
    }

    // ... 
}

套接字设置与以前完全相同。

sendData()方法使用流量流发送多条消息。对于每条消息,我们调用socket::fireAndForget

我们需要为每条消息订阅Mono<Void>响应。如果我们忘记订阅,那么socket::fireAndForget将不会执行。

flatMap操作符确保将Void响应传递给订阅者,而blockLast操作符充当订阅者。

我们要等到下一节才开始测试。此时,我们将创建一个请求/流客户机来接收fire-and-forget客户机推送的数据。

请求/流

request/stream 请求/流模型中,单个请求可以接收多个响应。看到这一点的行动,我们可以建立在fire-and-forget 的例子。为此,我们请求一个流来检索上一节中发送的度量。

与前面一样,让我们首先向服务器上的RSocketImpl添加一个新的侦听器:

@Override
public Flux<Payload> requestStream(Payload payload) {
    return Flux.from(dataPublisher);
}

requestStream处理程序返回一个Flux<Payload>流。正如我们在上一节中所回顾的,fireAndForget处理程序将传入的数据发布到dataPublisher。现在,我们将使用与事件源相同的dataPublisher创建一个Flux流。通过这样做,测量数据将从fire-and-forget客户端异步流到请求/流客户端。

接下来创建请求/流客户端:

public class ReqStreamClient {

    private final RSocket socket;

    public ReqStreamClient() {
        this.socket = RSocketFactory.connect()
          .transport(TcpClientTransport.create("localhost", TCP_PORT))
          .start()
          .block();
    }

    public Flux<Float> getDataStream() {
        return socket
          .requestStream(DefaultPayload.create(DATA_STREAM_NAME))
          .map(Payload::getData)
          .map(buf -> buf.getFloat())
          .onErrorReturn(null);
    }

    public void dispose() {
        this.socket.dispose();
    }
}

我们与服务器的连接方式与以前的客户机相同。

getDataStream()中,我们使用socket.requestStream()从服务器接收Flux<Payload>流。从该流中,我们从二进制数据中提取浮点值。最后,流被返回给调用者,允许调用者订阅流并处理结果。

现在让我们测试一下。我们将验证从火的往返路线,并忘记请求/流。

我们可以断言每个值的接收顺序与发送顺序相同。然后,我们可以断言接收到的值与发送的值相同:

@Test
public void whenSendingStream_thenReceiveTheSameStream() {
    FireNForgetClient fnfClient = new FireNForgetClient(); 
    ReqStreamClient streamClient = new ReqStreamClient();

    List<Float> data = fnfClient.getData();
    List<Float> dataReceived = new ArrayList<>();

    Disposable subscription = streamClient.getDataStream()
      .index()
      .subscribe(
        tuple -> {
            assertEquals("Wrong value", data.get(tuple.getT1().intValue()), tuple.getT2());
            dataReceived.add(tuple.getT2());
        },
        err -> LOG.error(err.getMessage())
      );

    fnfClient.sendData();

    // ... dispose client & subscription

    assertEquals("Wrong data count received", data.size(), dataReceived.size());
}

Channel

Channel模型提供双向通信。在这个模型中,消息流在两个方向上异步流动。

让我们创建一个简单的游戏模拟来测试这一点。在这个游戏中,通道的每一方都将成为一个玩家。当游戏运行时,这些玩家将以随机的时间间隔向对方发送消息。对方会对信息做出反应。

首先,我们将在服务器上创建处理程序。与前面一样,我们在RSocketImpl中添加了:

@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
    Flux.from(payloads)
      .subscribe(gameController::processPayload);
    return Flux.from(gameController);
}

requestChannel处理程序具有用于输入和输出的有效负载流。Publisher<Payload>输入参数是从客户机接收的有效负载流。当它们到达时,这些有效负载被传递给gameController::processPayload函数。

作为响应,我们将不同的流量流返回给客户机。这个流是从我们的gameController创建的,它也是一个发布者。

以下是GameController类的摘要:

public class GameController implements Publisher<Payload> {
    
    @Override
    public void subscribe(Subscriber<? super Payload> subscriber) {
        // send Payload messages to the subscriber at random intervals
    }

    public void processPayload(Payload payload) {
        // react to messages from the other player
    }
}

GameController收到订户时,它开始向该订户发送消息。

接下来,让我们创建客户端:

public class ChannelClient {

    private final RSocket socket;
    private final GameController gameController;

    public ChannelClient() {
        this.socket = RSocketFactory.connect()
          .transport(TcpClientTransport.create("localhost", TCP_PORT))
          .start()
          .block();

        this.gameController = new GameController("Client Player");
    }

    public void playGame() {
        socket.requestChannel(Flux.from(gameController))
          .doOnNext(gameController::processPayload)
          .blockLast();
    }

    public void dispose() {
        this.socket.dispose();
    }
}

正如我们在前面的示例中看到的,客户机以与其他客户机相同的方式连接到服务器。

客户端创建自己的GameController实例。

我们使用socket.requestChannel()将有效负载流发送到服务器。服务器用自己的有效负载流进行响应。

当从服务器接收到有效负载时,我们将它们传递给gameController::processPayload处理程序。

在我们的游戏模拟中,客户端和服务器是彼此的镜像。即,每一方正在发送有效负载流并且从另一端接收有效负载流。

流独立运行,没有同步。

最后,让我们在测试中运行模拟:

@Test
public void whenRunningChannelGame_thenLogTheResults() {
    ChannelClient client = new ChannelClient();
    client.playGame();
    client.dispose();
}

总结

在这篇介绍性文章中,我们探讨了RSocket提供的交互模型。示例的完整源代码可以在我们的Github存储库中找到:https://github.com/eugenp/tutorials/tree/master/rsocket。

 

除特别注明外,本站所有文章均为老K的Java博客原创,转载请注明出处来自https://javakk.com/1712.html

关于

发表评论

表情 格式

暂无评论

登录

忘记密码 ?

切换登录

注册