gRPC是用于微服务间通信的远程过程调用(RPC)框架。gRPC支持四种类型的RPC:
1. Unary RPC:客户端发送单个请求并接收单个响应。
2. Server streaming RPC(服务器流式):客户端发送单个请求,作为回报,服务器发送消息流。
3. Client streaming RPC(客户端流式):客户端发送消息流,服务器以单个消息响应。
4. Bidirectional streaming RPC(双向流):在双向流中,客户端和服务器都发送消息流。
此外,gRPC unary RPC可以是同步的或异步的。
- 同步:客户端调用等待服务器响应。
- 异步:客户端对服务器进行非阻塞调用,服务器异步返回响应。
在本文中,我们将看到如何在java中实现gRPC同步和异步unaryRPC。
让我们开始:
gRPC中的unary RPC是什么?
在gRPC unary RPC中,客户端发送单个请求并接收单个响应。
与许多RPC框架一样,gRPC基于定义远程服务的思想,指定可以调用哪些远程方法及其参数和返回类型。在gRPC中,我们以.proto
文件中的协议缓冲区(protobuf)格式定义远程服务和方法。
例如,让我们考虑一个远程服务产品服务,它公开了远程方法来创建和查询产品信息。我们可以定义unary RPC,以获取给定id的产品信息,如下所示:
syntax = "proto3";
package dev.techdozo.product.api;
service ProductService {
// A simple RPC.
// returns Product for a given id
rpc GetProduct(GetProductRequest) returns (GetProductResponse);
}
message GetProductRequest {
string product_id = 1;
}
message GetProductResponse {
Product product = 1;
}
message Product {
string name = 1;
string description = 2;
double price = 3;
}
让我们理解上面的protobuf定义。
服务:服务是服务器公开的远程方法的逻辑集合。在protobuf中,我们使用service
关键字定义服务。同样,我们可以通过使用RPC关键字来定义RPC方法is
。例如,rpc GetProduct(GetProductRequest)returns(GetProductResponse)
定义了一个rpc方法,该方法接受消息GetProductRequest
并返回GetProductResponse
。
protoc编译器获取protobuf定义文件(.proto
)并生成客户端和服务器存根。
消息:消息是在客户端和服务器之间交换的二进制数据结构。字段编号(如name=1
)用于标识二进制编码数据中的字段。
为了简化,GetProduct RPC获取产品id并返回产品。
Protobuf代码生成
protoc编译器支持多种不同语言的代码生成,用于生成客户机和服务器代码。在本例中,我们将使用protobuf gradle插件以java生成客户机和服务器代码。
protocolbuffer插件组装protobuf编译器(protoc
)命令行,并使用它从.proto
文件生成Java源文件。生成的java源文件应添加到源集中,以便可以与java源一起编译。
sourceSets {
main {
java {
srcDirs 'build/generated/source/proto/main/grpc'
srcDirs 'build/generated/source/proto/main/java'
}
}
}
运行命令gradlew build
在build/generated/source/proto/main/grpc和build/generated/source/proto/main/java
目录中生成源代码。
通常,在gRPC中,客户端和服务器共享相同的原型文件。因此,当您运行gradlew build时,编译器会为客户端和服务器生成存根。
gRPC RPC调用流
在gRPC中,服务器实现了一组可以远程调用的方法/函数。
在典型的gRPC调用中,将执行以下步骤。
1. 这一切都从客户端工作流启动RPC调用开始。
2. 一旦RPC启动,客户端存根将以二进制格式对消息进行编码。然后,它用编码的消息创建一个HTTP POST请求。
3. 之后,编码的消息通过通道发送。gRPC通道提供到指定主机和端口上gRPC服务器的连接。
4. 在服务器端,服务器将编码的消息移交给自动生成的服务器存根。
5. 收到消息后,服务器存根将消息反序列化为特定于语言的数据结构。
6. 最后,服务器存根调用重写的服务方法并传递解析的消息。
类似地,在往返过程中,来自服务器的响应被编码并发送回客户端。
代码示例
实现服务端代码
gRPC服务器实现在proto文件中定义的服务和rpc方法,并将它们公开为rpc方法,运行.\gradlew clean build
后,Gradle protobuf plugin在build/generated/source/proto/
目录中生成一个服务器存根。
要启动gRPC服务器并注册服务,可以通过调用ServerBuilder
作为ServerBuilder.forPort(port).addService(new ProductService()).build()
来创建gRPC服务器实例。
在上述代码中,ProductService
提供了protobuf中定义的gRPC服务的实现,如下所示:
service ProductService {
rpc GetProduct(GetProductRequest) returns (GetProductResponse);
}
启动服务器和注册服务的完整代码:
public class GrpcServer {
private final int port;
private final Server server;
public GrpcServer(int port) {
this.port = port;
this.server = ServerBuilder.forPort(port).addService(new ProductService()).build();
}
public void start() throws IOException {
server.start();
log.info("Server Started on port {} ", port);
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
try {
this.stop();
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
}
private void stop() throws InterruptedException {
if (server != null) {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
}
}
private void blockUntilShutDown() throws InterruptedException {
if (this.server != null) {
server.awaitTermination();
}
}
public static void main(String[] args) throws IOException, InterruptedException {
var productServer = new GrpcServer(8081);
productServer.start();
productServer.blockUntilShutDown();
}
}
实现服务定义
实现RPC方法rpc GetProduct(GetProductRequest) returns (GetProductResponse)
有四个主要步骤:
1. 实现类,比如ProductService
,它从自动生成的抽象类ProductServiceGrpc.ProductServiceImplBase
扩展而来。
2. 重写getProduct
方法并实现业务逻辑。
3. 业务逻辑调用完成后,调用responseObserver.onNext(getProductResponse)
将响应传递回客户端。最后,调用responseObserver.onCompleted()
。
4. 如果出现错误,请调用responseObserver.onError(new StatusException(Status.NOT_FOUND))
。
我们在这里试图简化错误处理,但您必须知道,在gRPC中处理错误并不是很简单。
服务实现:
public class ProductService extends ProductServiceGrpc.ProductServiceImplBase {
private final ProductRepository productRepository;
public ProductService() {
this.productRepository = new ProductRepository();
}
@Override
public void getProduct(
GetProductRequest request, StreamObserver<GetProductResponse> responseObserver) {
var productId = request.getProductId();
//Fetch Product information from repository
Optional<Product> optionalProduct = productRepository.get(productId);
if (optionalProduct.isPresent()) {
var product = optionalProduct.get();
//If found build response
var productResponse =
Service.Product.newBuilder()
.setName(product.getName())
.setDescription(product.getDescription())
.setPrice(product.getPrice())
.build();
var getProductResponse = GetProductResponse.newBuilder().setProduct(productResponse).build();
responseObserver.onNext(getProductResponse);
responseObserver.onCompleted();
} else {
responseObserver.onError(new StatusException(Status.NOT_FOUND));
}
log.info("Finished calling Product API service..");
}
}
运行gRPC服务器
由于此应用程序仅用于理解概念,因此我们可以从IDE运行gRPC服务器开始。为此,可以运行dev.techdozo.product.GrpcServer
的主方法。但是,对于生产环境,您可能希望将服务器部署为容器或独立应用程序。
实现客户端代码
在客户端,首先需要使用proto文件生成客户端存根。生成客户端存根后,您需要实现一个通道。通道表示与服务器的连接。创建通道后,您需要创建一个阻塞或非阻塞客户端存根,然后您可以调用传递请求消息的服务器。
实施gRPC客户端通道
您可以创建一个gRPC通道,将服务器地址和端口指定为ManagedChannelBuilder.forAddress(host, port).usePlaintext().build()
。通道表示到要执行RPC的端点的虚拟连接。
创建频道的成本很高,因此请确保只创建一次频道并重用它。
您可以使用新创建的通道创建客户端存根,如下所示:
var managedChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
gRPC支持两种类型的客户端存根:
- 阻塞/同步存根:在此存根中,RPC调用等待服务器响应。
- 非阻塞/异步存根:客户端对服务器进行非阻塞调用,在服务器上异步返回响应。
实现阻塞客户端存根
要创建同步/阻塞客户端存根,请使用ProductServiceGrpc
的newBlockingStub
静态方法。
var productServiceBlockingStub = ProductServiceGrpc.newBlockingStub(managedChannel);
var productRequest = GetProductRequest.newBuilder().setProductId("apple-123").build();
var productResponse = productServiceBlockingStub.getProduct(productRequest);
要运行阻塞客户端示例,请从IDE运行类dev.techdozo.order.client.UnaryGrpcBlockingClient
的main
方法。同时,确保gRPC服务器正在运行。运行后,客户端将打印日志,如:
[INFO ] 2021-09-07 20:32:53.899 [main] UnaryGrpcBlockingClient - Calling Server..
[INFO ] 2021-09-07 20:32:56.485 [main] UnaryGrpcBlockingClient - Received Product from server, info product {
name: "Apple iPhone 12 Pro (128GB)"
description: "Apple iPhone 12 Pro (128GB) - Graphite"
price: 1617.29
}
从日志可以推断,请求和响应都在同一个线程[main
]中。换句话说,客户机阻塞,直到服务器返回响应。
实现异步客户端存根
对于大多数用例,阻塞操作就足够了。但是,正如您所看到的,阻塞RPC会等待服务器返回响应,从而浪费CPU周期。异步客户端存根通过注册回调来解决此问题。一旦服务器发送响应,就会在另一个线程中调用此回调。同时,客户可以继续做其他工作。
要实现异步客户端存根,请使用ProductServiceGrpc
的newStub
静态方法。
var productServiceAsyncStub = ProductServiceGrpc.newStub(managedChannel);
并将回调注册为:
var productRequest = GetProductRequest.newBuilder().setProductId("apple-123").build();
productServiceAsyncStub.getProduct(productRequest, new ProductCallback());
其中回调定义为:
class ProductCallback implements StreamObserver<GetProductResponse> {
@Override
public void onNext(GetProductResponse value) {
log.info("Received product, {}", value);
}
@Override
public void onError(Throwable cause) {
log.error("Error occurred, cause {}", cause.getMessage());
}
@Override
public void onCompleted() {
log.info("Stream completed");
}
}
要运行异步客户端示例,请从IDE运行类dev.techdozo.order.client.unarygrpcasyncClient
的main
方法。运行后,客户端将打印日志,如:
[INFO ] 2021-09-10 15:05:40.188 [main] UnaryGrpcAsynClient - Calling Server..
[INFO ] 2021-09-10 15:05:42.300 [grpc-default-executor-0] UnaryGrpcAsynClient - Received product, product {
name: "Apple iPhone 12 Pro (128GB)"
description: "Apple iPhone 12 Pro (128GB) - Graphite"
price: 1617.29
}
[INFO ] 2021-09-10 15:05:42.301 [grpc-default-executor-0] UnaryGrpcAsynClient - Stream completed
您是否注意到回调发生在与主线程不同的线程grpc-default-executor-0
中?
对于回调,gRPC使用一个缓存的线程池,该线程池根据需要创建新线程,但在以前构造的线程可用时将重用这些线程。如果需要,您可以提供自己的线程池,如下所示:
var executorService = Executors.newFixedThreadPool(10);
var managedChannel =
ManagedChannelBuilder.forAddress(host, port)
.executor(executorService)
.usePlaintext()
.build();
实现异步 future 存根
另一个异步RPC选项是使用未来存根。要定义future sub
,请将ProductServiceGrpc
的静态方法newFutureStub(Channel)
调用为:
// Create a new future stub
var productServiceFutureStub = ProductServiceGrpc.newFutureStub(managedChannel);
与异步存根一样,您可以使用Futures.addCallback(..)
注册回调,如下所示:
var productRequest = GetProductRequest.newBuilder().setProductId("apple-123").build();
ListenableFuture<GetProductResponse> listenableFuture =
productServiceFutureStub.getProduct(productRequest);
Futures.addCallback(listenableFuture, new ProductCallback(), fixedThreadPool);
或者,您可以将runnable
注册为:
listenableFuture.addListener(this::notifyListener, fixedThreadPool);
与期货不同,Runnable
不会返回任何东西。因此,当RPC方法返回空响应并且您希望通知订阅者时,这非常有用。
例如,在删除产品的情况下,您可能希望使用ProductDeleted
事件通知订户。
rpc DeleteProduct(DeleteProductRequest) returns (google.protobuf.Empty);
要运行异步客户端示例,请从IDE运行类dev.techdozo.order.client.UnaryGrpFutureClient
的main
方法。运行后,客户端将打印日志,如:
[INFO ] 2021-09-10 15:15:37.754 [main] UnaryGrpcFutureClient - Calling Server..
[INFO ] 2021-09-10 15:15:39.474 [pool-2-thread-1] UnaryGrpcFutureClient - Received product f1 product {
name: "Apple iPhone 12 Pro (128GB)"
description: "Apple iPhone 12 Pro (128GB) - Graphite"
price: 1617.29
}
总结
gRPC是一个远程过程调用(RPC)框架,用于微服务间的通信。gRPC支持unary RPC和流式RPC。在gRPC unaryRPC中,客户端发送单个请求并接收单个响应。此外,gRPC中的RPC可以是同步的,也可以是异步的。在同步RPC中,客户端调用等待服务器响应。顾名思义,在异步RPC中,服务器异步返回响应。
原文地址:https://techdozo.dev/grpc-synchronous-and-asynchronous-unary-rpc-in-java/
除特别注明外,本站所有文章均为老K的Java博客原创,转载请注明出处来自https://javakk.com/2382.html
暂无评论