3年前 (2021-04-14)  相关技术 |   抢沙发  999 
文章评分 0 次,平均分 0.0

响应式服务通信协议RSocket

分布式系统中的通信问题

微服务无处不在。我们经历了一段漫长的旅程,从难以部署和维护的单片应用程序,到完全分布式、微型、可扩展的微服务。这样的架构设计有很多优点,但是也有缺点,值得一提。首先,为了向最终客户提供价值,服务必须交换成吨的数据。在单片应用程序中,这不是问题,因为整个通信都发生在单个JVM中。在微服务体系结构中,服务部署在单独的容器中,并通过内部或外部网络进行通信,网络是一流的公民。如果您决定在云中运行应用程序,事情会变得更加复杂,在云中,网络问题和延迟时间的增加是您无法完全避免的。与其试图解决网络问题,不如让您的体系结构具有弹性,即使在动荡时期也能完全运行。

让我们深入探讨一下微服务、数据、通信和云的概念。作为一个例子,我们将讨论企业级系统,它可以通过网站和移动应用程序访问,也可以与小型外部设备(如家用加热器控制器)通信。这个系统由多个微服务组成,大部分是用Java编写的,它有一些Python和Java语言Node.js组件。显然,它们都是跨多个可用性区域复制的,以确保整个系统的高可用性。

为了与IaaS提供商无关并改善开发人员的体验,应用程序运行在PaaS之上。我们在这里有各种各样的可能性:cloudfoundry、Kubernetes或两者结合在Cloudboostr中都是合适的。在服务之间的通信方面,设计很简单。每个组件都公开了普通的restapi–如下图所示。

响应式服务通信协议RSocket
乍一看,这样的架构看上去并不差。组件被分离并在云中运行-会出什么问题?实际上,有两个主要的问题——都与沟通有关。

第一个问题是HTTP的请求/响应交互模型。虽然它有很多用例,但它不是为机器到机器的通信而设计的。微服务将一些数据发送到另一个组件而不关心操作的结果(触发并忘记)或在数据可用时自动流化数据(数据流化)的情况并不少见。使用请求/响应交互模型很难以优雅、高效的方式实现这些通信模式。即使执行简单的fire-and-forget操作也有副作用——服务器必须将响应发送回客户机,即使客户机对处理它不感兴趣。

第二个问题是性能。假设我们的系统被客户大量使用,流量增加,并且我们已经注意到我们正在努力处理每秒超过几百个请求。多亏了容器和云,我们能够轻松地扩展我们的服务。但是,如果我们进一步跟踪资源消耗,我们会注意到,当内存耗尽时,vm的cpu几乎处于空闲状态。这个问题来自通常与HTTP1.x一起使用的每请求线程模型,其中每个请求都有自己的堆栈内存。在这种情况下,我们可以利用反应式编程模型和非阻塞IO。它将显著减少内存使用,但不会减少延迟。http1.x是一个基于文本的协议,因此需要传输的数据的大小比二进制协议要大得多。

在机器对机器的通信中,我们不应该局限于HTTP(尤其是1.x),它的请求/响应交互模型和糟糕的性能。市场上有许多更合适、更强大的解决方案。基于RabbitMQ、gRPC甚至http2的消息传递,以及对多路复用和二进制有效负载的支持,在性能和效率方面将比普通的http1.x做得更好。

响应式服务通信协议RSocket
通过使用多种协议,我们可以在给定的场景中以最有效、最合适的方式链接微服务。然而,多种协议的采用迫使我们一次又一次地重新发明轮子。我们必须用与安全相关的额外信息来丰富我们的数据,并创建多个适配器来处理协议之间的转换。在某些情况下,运输需要高度可用的外部资源(经纪人、服务等)。额外的资源需要额外的成本,即使我们所需要的只是简单的、基于消息的fire-and-forget操作。此外,许多不同的协议可能会带来与应用程序管理相关的严重问题,特别是当我们的系统由数百个微服务组成时。

上面提到的问题是RSocket被发明的核心原因,也是它可能会彻底改变云中通信的原因。通过其反应性和内置的健壮交互模型,RSocket可以应用于各种业务场景,并最终统一我们在分布式系统中使用的通信模式。

RSocket

RSocket是一种新的、消息驱动的二进制协议,它标准化了云中的通信方法。它有助于以一致的方式解决常见的应用程序问题,并支持多种语言(如java、js、python)和传输层(TCP、WebSocket、Aeron)。

在下面的部分中,我们将深入研究协议内部并讨论交互模型。

框架和消息驱动

RSocket中的交互被分解为帧。每个帧由一个帧头组成,其中包含流id、帧类型定义和特定于帧类型的其他数据。帧头后面是元数据和有效载荷–这些部分携带用户指定的数据。

响应式服务通信协议RSocket
有多种类型的框架表示交互模型的不同操作和可用方法。我们不打算涵盖所有这些问题,因为它们在官方文件中有大量描述(http://rsocket.io/docs/Protocol). 然而,值得注意的却很少。其中之一是客户端在通信开始时发送给服务器的设置框架。可以自定义此框架,以便您可以在连接初始化期间添加自己的安全规则或其他所需信息。需要注意的是,在连接设置阶段之后,RSocket并不区分客户机和服务器。每一方都可以开始向另一方发送数据——这使得协议几乎完全对称。

性能

帧以字节流的形式发送。它使RSocket方式比典型的基于文本的协议更有效。从开发人员的角度来看,当json在网络中来回运行时,调试系统更容易,但对性能的影响使这种方便性受到质疑。该协议没有强加任何特定的序列化/反序列化机制,它将帧视为可以转换为任何内容的比特包。这使得使用JSON序列化或Protobuf或AVRO等更有效的解决方案成为可能。

对RSocket性能有巨大影响的第二个因素是复用。该协议在单个物理连接的顶部创建逻辑流(通道)。每个流都有其唯一的ID,在某种程度上,可以将其解释为我们从消息传递系统知道的队列。这样的设计处理了HTTP1.x中已知的主要问题——按请求连接模型和“流水线”的弱性能。而且,RSocket本机支持传输大型有效负载。在这样的场景中,有效负载帧被分割成多个帧,并带有一个额外的标志——给定片段的序号。

反应性和流量控制

RSocket协议完全包含反应宣言中所述的原则。它的异步特性和资源节约有助于减少最终用户所经历的延迟和基础设施的成本。多亏了流媒体,我们不需要将数据从一个服务拉到另一个服务,而是在数据可用时推送数据。这是一个非常强大的机制,但也可能有风险。让我们考虑一个简单的场景:在我们的系统中,我们将事件从服务a流式传输到服务B。在接收方执行的操作非常重要,需要一些计算时间。如果服务A推送事件的速度快于B处理事件的速度,最终,B将耗尽资源—发送方将杀死接收方。由于RSocket使用反应器,因此它内置了对流量控制的支持,这有助于避免此类情况。

我们可以很容易地提供背压机制的实现,调整到我们的需要。接收方可以指定要消耗多少数据,并且在通知发送方它准备好处理更多数据之前,不会得到更多数据。另一方面,为了限制来自请求者的传入帧的数量,RSocket实现了一种租用机制。响应者可以指定请求者在定义的时间范围内可以发送多少请求。

API

如前一节所述,RSocket使用Reactor,因此在API级别上,我们主要操作MonoFlux对象。它还完全支持反应信号——我们可以轻松地对不同的事件实施“反应”——onNextonErroronClose等。

下面几段将介绍API和RSocket中可用的每个交互选项。讨论将以代码片段和所有示例的描述作为支持。在我们进入交互模型之前,有必要先介绍一下API基础知识,因为它将在多个代码示例中出现。

设置与RSocketFactory的连接

在对等点之间建立RSocket连接相当容易。API为factory(RSocketFactory)提供工厂方法receive和connect,分别在客户端和服务器端创建RSocket和CloseableChannel实例。通信双方(请求者和响应者)中的第二个公共属性是传输。RSocket可以使用多种解决方案作为传输层(TCP、WebSocket、Aeron)。无论您选择哪个API,它都提供工厂方法,允许您调整和调优连接。

RSocketFactory.receive()
        .acceptor(new HelloWorldSocketAcceptor())
        .transport(TcpServerTransport.create(HOST, PORT))
        .start()
        .subscribe();

RSocketFactory.connect()
        .transport(TcpClientTransport.create(HOST, PORT))
        .start()
        .subscribe();

此外,对于响应程序,我们必须创建一个套接字接受程序实例。SocketAcceptor是一个接口,它在对等方之间提供契约。它有一个accept方法,它接受RSocket来发送请求,并返回一个RSocket实例来处理来自对等方的请求。除了提供契约外,SocketAcceptor还允许我们访问设置框架内容。在API级别上,它由ConnectionSetupPayload对象反映。

public interface SocketAcceptor {
   Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket);
}

如上所示,在对等点之间建立连接相对容易,特别是对于以前使用WebSockets的人来说——就API而言,这两种解决方案非常相似。

交互作用模型

在建立了连接之后,我们就可以进入交互模型了。RSocket支持以下操作:

响应式服务通信协议RSocket
fireforget以及元数据推送都是为了将数据从发送方推送到接收方。在这两种情况下,发送方都不关心操作的结果——它反映在返回类型(Mono)的API级别上。这些动作的区别就在画面上。在fire and forget的情况下,完全成熟的帧被发送到接收器,而对于元数据推送操作,帧没有有效负载-它只包括报头和元数据。这样的轻量级消息可用于向物联网设备的移动或对等通信发送通知。

RSocket还能够模拟HTTP行为。它支持请求-响应语义,这可能是您将使用RSocket的主要交互类型。在streams上下文中,这样的操作可以表示为由单个对象组成的流。在这种情况下,客户机正在等待响应帧,但它是以完全非阻塞的方式进行的。

在云应用程序中更有趣的是请求流和请求通道交互,它们在数据流上运行,通常是无限的。在请求流操作的情况下,请求者向响应者发送一个帧并取回数据流。这种交互方法使服务能够从拉数据策略切换到推数据策略。请求者可以订阅流并对传入的数据做出反应,而不是定期向响应者发送请求—当数据可用时,它将自动到达。

多亏了多路复用和双向数据传输支持,我们可以使用请求通道方法更进一步。RSocket能够使用单个物理连接将数据从请求者流到响应者,反之亦然。当请求者更新订阅(例如,更改订阅条件)时,这种交互可能很有用。如果没有双向通道,客户端将不得不取消流并使用新参数重新请求它。

在API中,交互模型的所有操作都由如下所示的RSocket接口的方法表示。

public interface RSocket extends Availability, Closeable {
 
    Mono<Void> fireAndForget(Payload payload);
 
    Mono<Payload> requestResponse(Payload payload);
 
    Flux<Payload> requestStream(Payload payload);
 
    Flux<Payload> requestChannel(Publisher<Payload> payloads);
 
    Mono<Void> metadataPush(Payload payload);
 
}

为了提高开发人员的体验,避免实现RSocket接口的每个方法的必要性,API提供了我们可以扩展的抽象RSocket。通过将SocketAcceptor和AbstractRSocket放在一起,我们得到了服务器端的实现,在基本场景中可能如下所示:

@Slf4j
public class HelloWorldSocketAcceptor implements SocketAcceptor {
 
    @Override
    public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
        log.info("Received connection with setup payload: [{}] and meta-data: [{}]", setup.getDataUtf8(), setup.getMetadataUtf8());
        return Mono.just(new AbstractRSocket() {
            @Override
            public Mono<Void> fireAndForget(Payload payload) {
                log.info("Received 'fire-and-forget' request with payload: [{}]", payload.getDataUtf8());
                return Mono.empty();
            }
 
            @Override
            public Mono<Payload> requestResponse(Payload payload) {         
                log.info("Received 'request response' request with payload: [{}] ", payload.getDataUtf8());
                return Mono.just(DefaultPayload.create("Hello " + payload.getDataUtf8()));
            }
 
            @Override
            public Flux<Payload> requestStream(Payload payload) {
                log.info("Received 'request stream' request with payload: [{}] ", payload.getDataUtf8());
                return Flux.interval(Duration.ofMillis(1000))
                        .map(time -> DefaultPayload.create("Hello " + payload.getDataUtf8() + " @ " + Instant.now()));
            }
 
            @Override
            public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
                return Flux.from(payloads)
                        .doOnNext(payload -> {
                            log.info("Received payload: [{}]", payload.getDataUtf8());
                        })
                        .map(payload -> DefaultPayload.create("Hello " + payload.getDataUtf8() + " @ " + Instant.now()))
                        .subscribeOn(Schedulers.parallel());
            }
 
            @Override
            public Mono<Void> metadataPush(Payload payload) {
                log.info("Received 'metadata push' request with metadata: [{}]", payload.getMetadataUtf8());
                return Mono.empty();
            }
        });
    }
}

在发送方,使用交互模型非常简单,我们所需要做的就是在使用RSocketFactory创建的RSocket实例上调用一个特定的方法。

socket.fireAndForget(DefaultPayload.create("Hello world!"));

发送方更有趣的是背压机制的实现。让我们考虑以下请求方实现的示例:

public class RequestStream {
 
    public static void main(String[] args) {
 
        RSocket socket = RSocketFactory.connect()
                .transport(TcpClientTransport.create(HOST, PORT))
                .start()
                .block();
        socket.requestStream(DefaultPayload.create("Jenny", "example-metadata"))
                .subscribe(new BackPressureSubscriber());
 
        socket.dispose();
}
 
    @Slf4j
    private static class BackPressureSubscriber implements Subscriber<Payload> {
 
        private static final Integer NUMBER_OF_REQUESTED_ITEMS = 5;
        private Subscription subscription;
        int receivedItems;
 
        @Override
        public void onSubscribe(Subscription s) {
            this.subscription = s;
            subscription.request(NUMBER_OF_REQUESTED_ITEMS);
        }
 
        @Override
        public void onNext(Payload payload) {
            receivedItems++;
            if (receivedItems % NUMBER_OF_REQUESTED_ITEMS == 0) {
                log.info("Requesting next [{}] elements", NUMBER_OF_REQUESTED_ITEMS);
                subscription.request(NUMBER_OF_REQUESTED_ITEMS);
            }
        }
 
        @Override
        public void onError(Throwable t) {
            log.error("Stream subscription error [{}]", t);
        }
 
        @Override
        public void onComplete() {
            log.info("Completing subscription");
        }
    }
     
}

在本例中,我们请求数据流,但为了确保传入的帧不会杀死请求者,我们设置了背压机制。为了实现这个机制,我们使用请求框架,它在API级别上由订阅.请求(n) 方法。在订阅[onSubscribe(subscription s)]开始时,我们请求5个对象,然后在onNext(Payload)中计算接收到的项目。当所有预期的帧到达请求者时,我们正在请求接下来的5个对象–同样使用subscription.request(n)方法。该用户的流程如下图所示:

响应式服务通信协议RSocket
本节介绍的背压机制的实现是非常基本的。在生产中,我们应该基于更精确的指标(例如预测/平均计算时间)提供更复杂的解决方案。毕竟,背压机制并不能使生产过剩的反应器问题消失。它将问题转移到响应方,以便更好地处理。

小结

在本文中,我们将讨论微服务体系结构中的通信问题,以及如何使用RSocket解决这些问题。我们用简单的“helloworld”示例和基本的背压机制实现介绍了它的API和交互模型。

本文完整代码示例:https://github.com/b3rnoulli/rsocket-examples

 

除特别注明外,本站所有文章均为老K的Java博客原创,转载请注明出处来自https://javakk.com/1820.html

关于

发表评论

表情 格式

暂无评论

登录

忘记密码 ?

切换登录

注册