3年前 (2021-09-26)  相关技术 |   抢沙发  4014 
文章评分 0 次,平均分 0.0
[收起] 文章目录

gRPC:Java中的同步和异步unary RPC
gRPC是用于微服务间通信的远程过程调用(RPC)框架。gRPC支持四种类型的RPC:

1. Unary RPC:客户端发送单个请求并接收单个响应。

2. Server streaming RPC(服务器流式):客户端发送单个请求,作为回报,服务器发送消息流。

3. Client streaming RPC(客户端流式):客户端发送消息流,服务器以单个消息响应。

4. Bidirectional streaming RPC(双向流):在双向流中,客户端和服务器都发送消息流。

此外,gRPC unary RPC可以是同步的或异步的。

  • 同步:客户端调用等待服务器响应。
  • 异步:客户端对服务器进行非阻塞调用,服务器异步返回响应。

在本文中,我们将看到如何在java中实现gRPC同步和异步unaryRPC。

让我们开始:

gRPC中的unary RPC是什么?

在gRPC unary RPC中,客户端发送单个请求并接收单个响应。

gRPC:Java中的同步和异步unary RPC

与许多RPC框架一样,gRPC基于定义远程服务的思想,指定可以调用哪些远程方法及其参数和返回类型。在gRPC中,我们以.proto文件中的协议缓冲区(protobuf)格式定义远程服务和方法。

例如,让我们考虑一个远程服务产品服务,它公开了远程方法来创建和查询产品信息。我们可以定义unary RPC,以获取给定id的产品信息,如下所示:

syntax = "proto3";
package dev.techdozo.product.api;

service ProductService {
  // A simple RPC.
  // returns Product for a given id
  rpc GetProduct(GetProductRequest) returns (GetProductResponse);
}

message GetProductRequest {
  string product_id = 1;
}

message GetProductResponse {
  Product product = 1;
}

message Product {
  string name = 1;
  string description = 2;
  double price = 3;
}

让我们理解上面的protobuf定义。

服务:服务是服务器公开的远程方法的逻辑集合。在protobuf中,我们使用service关键字定义服务。同样,我们可以通过使用RPC关键字来定义RPC方法is。例如,rpc GetProduct(GetProductRequest)returns(GetProductResponse)定义了一个rpc方法,该方法接受消息GetProductRequest并返回GetProductResponse

protoc编译器获取protobuf定义文件(.proto)并生成客户端和服务器存根。

消息:消息是在客户端和服务器之间交换的二进制数据结构。字段编号(如name=1)用于标识二进制编码数据中的字段。

为了简化,GetProduct RPC获取产品id并返回产品。

Protobuf代码生成

protoc编译器支持多种不同语言的代码生成,用于生成客户机和服务器代码。在本例中,我们将使用protobuf gradle插件以java生成客户机和服务器代码。

protocolbuffer插件组装protobuf编译器(protoc)命令行,并使用它从.proto文件生成Java源文件。生成的java源文件应添加到源集中,以便可以与java源一起编译。

sourceSets {
    main {
        java {
            srcDirs 'build/generated/source/proto/main/grpc'
            srcDirs 'build/generated/source/proto/main/java'
        }
    }
}

运行命令gradlew buildbuild/generated/source/proto/main/grpc和build/generated/source/proto/main/java目录中生成源代码。

通常,在gRPC中,客户端和服务器共享相同的原型文件。因此,当您运行gradlew build时,编译器会为客户端和服务器生成存根。

gRPC RPC调用流

在gRPC中,服务器实现了一组可以远程调用的方法/函数。

gRPC:Java中的同步和异步unary RPC

在典型的gRPC调用中,将执行以下步骤。

1. 这一切都从客户端工作流启动RPC调用开始。

2. 一旦RPC启动,客户端存根将以二进制格式对消息进行编码。然后,它用编码的消息创建一个HTTP POST请求。

3. 之后,编码的消息通过通道发送。gRPC通道提供到指定主机和端口上gRPC服务器的连接。

4. 在服务器端,服务器将编码的消息移交给自动生成的服务器存根。

5. 收到消息后,服务器存根将消息反序列化为特定于语言的数据结构。

6. 最后,服务器存根调用重写的服务方法并传递解析的消息。

类似地,在往返过程中,来自服务器的响应被编码并发送回客户端。

代码示例

gRPC:Java中的同步和异步unary RPC

实现服务端代码

gRPC服务器实现在proto文件中定义的服务和rpc方法,并将它们公开为rpc方法,运行.\gradlew clean build后,Gradle protobuf plugin在build/generated/source/proto/目录中生成一个服务器存根。

gRPC:Java中的同步和异步unary RPC

要启动gRPC服务器并注册服务,可以通过调用ServerBuilder作为ServerBuilder.forPort(port).addService(new ProductService()).build()来创建gRPC服务器实例。

在上述代码中,ProductService提供了protobuf中定义的gRPC服务的实现,如下所示:

service ProductService {
  rpc GetProduct(GetProductRequest) returns (GetProductResponse);
}

启动服务器和注册服务的完整代码:

public class GrpcServer {

  private final int port;
  private final Server server;

  public GrpcServer(int port) {
    this.port = port;
    this.server = ServerBuilder.forPort(port).addService(new ProductService()).build();
  }

  public void start() throws IOException {
    server.start();
    log.info("Server Started on port {} ", port);
    Runtime.getRuntime()
        .addShutdownHook(
            new Thread(
                () -> {
                  try {
                    this.stop();
                  } catch (InterruptedException e) {
                    e.printStackTrace();
                  }
                }));
  }

  private void stop() throws InterruptedException {
    if (server != null) {
      server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
    }
  }

  private void blockUntilShutDown() throws InterruptedException {
    if (this.server != null) {
      server.awaitTermination();
    }
  }

  public static void main(String[] args) throws IOException, InterruptedException {
    var productServer = new GrpcServer(8081);
    productServer.start();
    productServer.blockUntilShutDown();
  }
}

实现服务定义

实现RPC方法rpc GetProduct(GetProductRequest) returns (GetProductResponse)有四个主要步骤:

1. 实现类,比如ProductService,它从自动生成的抽象类ProductServiceGrpc.ProductServiceImplBase扩展而来。

2. 重写getProduct方法并实现业务逻辑。

3. 业务逻辑调用完成后,调用responseObserver.onNext(getProductResponse)将响应传递回客户端。最后,调用responseObserver.onCompleted()

4. 如果出现错误,请调用responseObserver.onError(new StatusException(Status.NOT_FOUND))

我们在这里试图简化错误处理,但您必须知道,在gRPC中处理错误并不是很简单。

服务实现:

public class ProductService extends ProductServiceGrpc.ProductServiceImplBase {

  private final ProductRepository productRepository;

  public ProductService() {
    this.productRepository = new ProductRepository();
  }

  @Override
  public void getProduct(
      GetProductRequest request, StreamObserver<GetProductResponse> responseObserver) {
    var productId = request.getProductId();
    //Fetch Product information from repository
    Optional<Product> optionalProduct = productRepository.get(productId);

    if (optionalProduct.isPresent()) {
      var product = optionalProduct.get();
      //If found build response
      var productResponse =
          Service.Product.newBuilder()
              .setName(product.getName())
              .setDescription(product.getDescription())
              .setPrice(product.getPrice())
              .build();
      var getProductResponse = GetProductResponse.newBuilder().setProduct(productResponse).build();

      responseObserver.onNext(getProductResponse);
      responseObserver.onCompleted();
    } else {
      responseObserver.onError(new StatusException(Status.NOT_FOUND));
    }
    log.info("Finished calling Product API service..");
  }
}

运行gRPC服务器

由于此应用程序仅用于理解概念,因此我们可以从IDE运行gRPC服务器开始。为此,可以运行dev.techdozo.product.GrpcServer的主方法。但是,对于生产环境,您可能希望将服务器部署为容器或独立应用程序。

实现客户端代码

在客户端,首先需要使用proto文件生成客户端存根。生成客户端存根后,您需要实现一个通道。通道表示与服务器的连接。创建通道后,您需要创建一个阻塞或非阻塞客户端存根,然后您可以调用传递请求消息的服务器。

实施gRPC客户端通道

您可以创建一个gRPC通道,将服务器地址和端口指定为ManagedChannelBuilder.forAddress(host, port).usePlaintext().build()。通道表示到要执行RPC的端点的虚拟连接。

创建频道的成本很高,因此请确保只创建一次频道并重用它。

您可以使用新创建的通道创建客户端存根,如下所示:

var managedChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();

gRPC支持两种类型的客户端存根:

  • 阻塞/同步存根:在此存根中,RPC调用等待服务器响应。
  • 非阻塞/异步存根:客户端对服务器进行非阻塞调用,在服务器上异步返回响应。

实现阻塞客户端存根

要创建同步/阻塞客户端存根,请使用ProductServiceGrpcnewBlockingStub静态方法。

var productServiceBlockingStub = ProductServiceGrpc.newBlockingStub(managedChannel);
var productRequest = GetProductRequest.newBuilder().setProductId("apple-123").build();

var productResponse = productServiceBlockingStub.getProduct(productRequest);

要运行阻塞客户端示例,请从IDE运行类dev.techdozo.order.client.UnaryGrpcBlockingClientmain方法。同时,确保gRPC服务器正在运行。运行后,客户端将打印日志,如:

[INFO ] 2021-09-07 20:32:53.899 [main] UnaryGrpcBlockingClient - Calling Server..
[INFO ] 2021-09-07 20:32:56.485 [main] UnaryGrpcBlockingClient - Received Product from server, info product {
  name: "Apple iPhone 12 Pro (128GB)"
  description: "Apple iPhone 12 Pro (128GB) - Graphite"
  price: 1617.29
}

从日志可以推断,请求和响应都在同一个线程[main]中。换句话说,客户机阻塞,直到服务器返回响应。

实现异步客户端存根

对于大多数用例,阻塞操作就足够了。但是,正如您所看到的,阻塞RPC会等待服务器返回响应,从而浪费CPU周期。异步客户端存根通过注册回调来解决此问题。一旦服务器发送响应,就会在另一个线程中调用此回调。同时,客户可以继续做其他工作。

要实现异步客户端存根,请使用ProductServiceGrpcnewStub静态方法。

var productServiceAsyncStub = ProductServiceGrpc.newStub(managedChannel);

并将回调注册为:

var productRequest = GetProductRequest.newBuilder().setProductId("apple-123").build();
productServiceAsyncStub.getProduct(productRequest, new ProductCallback());

其中回调定义为:

class ProductCallback implements StreamObserver<GetProductResponse> {

  @Override
  public void onNext(GetProductResponse value) {
    log.info("Received product, {}", value);
  }

  @Override
  public void onError(Throwable cause) {
    log.error("Error occurred, cause {}", cause.getMessage());
  }

  @Override
  public void onCompleted() {
    log.info("Stream completed");
  }
}

要运行异步客户端示例,请从IDE运行类dev.techdozo.order.client.unarygrpcasyncClientmain方法。运行后,客户端将打印日志,如:

[INFO ] 2021-09-10 15:05:40.188 [main] UnaryGrpcAsynClient - Calling Server..
[INFO ] 2021-09-10 15:05:42.300 [grpc-default-executor-0] UnaryGrpcAsynClient - Received product, product {
  name: "Apple iPhone 12 Pro (128GB)"
  description: "Apple iPhone 12 Pro (128GB) - Graphite"
  price: 1617.29
}

[INFO ] 2021-09-10 15:05:42.301 [grpc-default-executor-0] UnaryGrpcAsynClient - Stream completed

您是否注意到回调发生在与主线程不同的线程grpc-default-executor-0中?

对于回调,gRPC使用一个缓存的线程池,该线程池根据需要创建新线程,但在以前构造的线程可用时将重用这些线程。如果需要,您可以提供自己的线程池,如下所示:

var executorService = Executors.newFixedThreadPool(10);
var managedChannel =
    ManagedChannelBuilder.forAddress(host, port)
        .executor(executorService)
        .usePlaintext()
        .build();

实现异步 future 存根

另一个异步RPC选项是使用未来存根。要定义future sub,请将ProductServiceGrpc的静态方法newFutureStub(Channel) 调用为:

// Create a new future stub
var productServiceFutureStub = ProductServiceGrpc.newFutureStub(managedChannel);

与异步存根一样,您可以使用Futures.addCallback(..)注册回调,如下所示:

var productRequest = GetProductRequest.newBuilder().setProductId("apple-123").build();
ListenableFuture<GetProductResponse> listenableFuture =
  productServiceFutureStub.getProduct(productRequest);
Futures.addCallback(listenableFuture, new ProductCallback(), fixedThreadPool);

或者,您可以将runnable注册为:

listenableFuture.addListener(this::notifyListener, fixedThreadPool);

与期货不同,Runnable不会返回任何东西。因此,当RPC方法返回空响应并且您希望通知订阅者时,这非常有用。

例如,在删除产品的情况下,您可能希望使用ProductDeleted事件通知订户。

rpc DeleteProduct(DeleteProductRequest) returns (google.protobuf.Empty);

要运行异步客户端示例,请从IDE运行类dev.techdozo.order.client.UnaryGrpFutureClientmain方法。运行后,客户端将打印日志,如:

[INFO ] 2021-09-10 15:15:37.754 [main] UnaryGrpcFutureClient - Calling Server..
[INFO ] 2021-09-10 15:15:39.474 [pool-2-thread-1] UnaryGrpcFutureClient - Received product f1 product {
  name: "Apple iPhone 12 Pro (128GB)"
  description: "Apple iPhone 12 Pro (128GB) - Graphite"
  price: 1617.29
}

总结

gRPC是一个远程过程调用(RPC)框架,用于微服务间的通信。gRPC支持unary RPC和流式RPC。在gRPC unaryRPC中,客户端发送单个请求并接收单个响应。此外,gRPC中的RPC可以是同步的,也可以是异步的。在同步RPC中,客户端调用等待服务器响应。顾名思义,在异步RPC中,服务器异步返回响应。

原文地址:https://techdozo.dev/grpc-synchronous-and-asynchronous-unary-rpc-in-java/

 

除特别注明外,本站所有文章均为老K的Java博客原创,转载请注明出处来自https://javakk.com/2382.html

关于

发表评论

表情 格式

暂无评论

登录

忘记密码 ?

切换登录

注册