在本文中,您将学习如何使用Kafka和Quarkus在Knative上运行事件应用程序。之前,我描述了Kafka和Spring Cloud的相同方法。我们将部署完全相同的体系结构。然而,我们将使用Quarkus Funqy代替SpringCloud函数。此外,Spring Cloud流可能会被Quarkus Kafka所取代。在我们开始之前,让我们澄清一些事情。
Knative 和 Quarkus概念
Quarkus以几种方式支持Knative。首先,我们可以使用Quarkus Kubernetes模块简化在Knative上的部署。我们还可以使用Quarkus Funqy Knative事件扩展在函数中路由和处理云事件。还不止这些。Quarkus支持serverless无服务器功能样式。使用Quarkus Funqy模块,我们可以编写可部署到各种FAA(包括Knative)的函数。这些函数可以通过HTTP调用。最后,我们可以使用来自Quarkus Kafka扩展的注释将我们的应用程序与Kafka主题集成。
Quarkus Funqy Knative事件模块基于Knative代理和触发器。因为我们将使用Kafka源代码而不是代理和触发器,所以我们不包括该模块。然而,我们仍然可以利用Quarkus Funqy和HTTP绑定。
源代码
如果您想自己尝试,您可以随时查看我的源代码:https://github.com/piomin/sample-quarkus-serverless-kafka.git,为此,您需要克隆我的GitHub存储库。那你应该按照我的指示去做。
我们将实现一个最终的一致性模式(也称为SAGA模式)。它是如何工作的?示例系统由三个服务组成。订单服务order-service
创建与客户和产品相关的新订单。该命令被发送到Kafka主题。然后,我们的另外两个应用程序客户服务customer-service
和产品服务product-service
接收订单事件。之后,他们进行预订。客户服务在客户账户上保留订单金额。同时,产品服务部保留订单中指定的一些产品。这两个服务都通过Kafka主题向订单服务发送响应。如果订单服务收到来自两个服务的肯定预订,则确认订单。然后,它发送一个包含该信息的事件。客户服务和产品服务均接收活动并确认预订。您可以在下图中进行验证。
先决条件
在开始之前,我们需要遵守几个要求:
1. 至少具有1.17版本的Kubernetes群集。我正在使用本地群集。如果使用远程群集,请将映像名中的dev.local
替换为Docker帐户名
2. 在集群上安装Knative服务和事件。您可以在这里(https://knative.dev/docs/install/any-kubernetes-cluster/)找到详细的安装说明。
3. 安装Kafka事件代理。这是发布网站的链接:https://github.com/knative-sandbox/eventing-kafka-broker/releases。您不需要所有东西–我们将使用KafkaSource和Kafkabind CRD
4. 使用Strimzi操作器安装Kafka群集。我把它安装在卡夫卡的命名空间中。我的群集的名称是我的群集。
步骤1. 安装Kafka Knative组件
假设您已经安装了在Kubernetes集群上运行Knative Eventing所需的所有元素,我们可能会创建一些专用于应用程序的组件。您可以在每个应用程序目录的k8s目录中找到带有对象声明的YAML清单。首先,让我们创建一个KafkaBinding。它负责将Kafka集群的地址注入到应用程序容器中。由于kafkabind,该地址在容器中作为KAFKA_BOOTSTRAP_SERVERS
环境变量可见。下面是customer-saga
应用程序的YAML声明示例。我们应该为另外两个应用程序创建类似的对象。
apiVersion: bindings.knative.dev/v1beta1
kind: KafkaBinding
metadata:
name: kafka-binding-customer-saga
spec:
subject:
apiVersion: serving.knative.dev/v1
kind: Service
name: customer-saga
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
在下一步中,我们将创建KafkaSource对象。它从特定主题读取事件并将其传递给使用者。它调用应用程序公开的HTTP POST端点。我们可以覆盖HTTP端点的默认上下文路径。对于客户传奇,目标URL为/reserve。它应该从order-events
主题接收事件。因为customer-saga
和product-saga
都会侦听order-events
主题中的事件,所以我们也需要为product-saga
创建一个类似的KafkaSource
对象。
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source-orders-customer
spec:
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
topics:
- order-events
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: customer-saga
uri: /reserve
另一方面,order-saga
侦听reserve-events
主题上的事件。如果您想再次验证我们的场景,请参考源代码部分中的图表。这次目标URL为/confirm
。
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source-orders-confirm
spec:
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
topics:
- reserve-events
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: order-saga
uri: /confirm
让我们来验证Kafka的来源列表。在我们的例子中,每个应用程序只有一个KafkaSource
。在将我们的Quarkus应用程序部署到Knative上之前,您的Kafka源尚未准备就绪。
步骤2. 将Quarkus与Kafka集成
为了将Quarkus与Apache Kafka集成,我们可以使用SmallRye反应式消息传递库。因此,我们可以使用注释为每个方法定义输入和输出主题。消息被序列化为JSON。我们还可以在运行状况检查中自动公开Kafka连接状态。下面是我们需要包含在Maven pom.xml
中的依赖项列表。
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-health</artifactId>
</dependency>
在开始编写源代码之前,我们需要在application.properties
文件中提供一些配置设置。当然,Kafka需要到集群的连接URL。我们使用Kafkabind
对象注入的环境变量。此外,还应配置输出主题名称。下面是order-saga
应用程序所需属性的列表。
kafka.bootstrap.servers = ${KAFKA_BOOTSTRAP_SERVERS}
mp.messaging.outgoing.order-events.connector = smallrye-kafka
mp.messaging.outgoing.order-events.topic = order-events
mp.messaging.outgoing.order-events.value.serializer = io.quarkus.kafka.client.serialization.ObjectMapperSerializer
最后,我们可以切换到代码。让我们从order-saga
开始。它将持续向order-events
主题发送订单。customer-saga
和product-saga
应用程序都会收到这些事件。负责生成和发送事件的方法使用Mutiny Multi返回反应流。它每秒发送一个事件。我们需要使用@Outgoing
注释对方法进行注释,该注释传递应用程序属性中定义的输出名称。另外,@Broadcast
注释表示事件已分派给所有订阅者。在发送之前,每个订单都需要保存在一个数据库中(我们使用H2内存数据库)。
@ApplicationScoped
@Slf4j
public class OrderPublisher {
private static int num = 0;
@Inject
private OrderRepository repository;
@Inject
private UserTransaction transaction;
@Outgoing("order-events")
@Broadcast
public Multi<Order> orderEventsPublish() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(tick -> {
Order o = new Order(++num, num%10+1, num%10+1, 100, 1, OrderStatus.NEW);
try {
transaction.begin();
repository.persist(o);
transaction.commit();
} catch (Exception e) {
log.error("Error in transaction", e);
}
log.info("Order published: {}", o);
return o;
});
}
}
步骤3. 使用Quarkus Funqy处理Knative事件
好的,在前面的步骤中,我们已经实现了负责将事件发送到Kafka主题的部分代码。我们还有KafkaSource
,它负责将事件从Kafka主题发送到应用程序HTTP端点。现在,我们只需要处理它们。Quarkus Funqy非常简单。它允许我们根据无服务器Faas方法创建函数。但我们也可以使用Quarkus Funqy HTTP扩展轻松地将每个函数绑定到HTTP端点。让我们将其包含在依赖项中。
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-funqy-http</artifactId>
</dependency>
为了用Quarkus Funqy创建函数,我们只需要用@Funq
注释特定的方法。该方法的名称为reserve
,因此它会自动绑定到HTTP端点POST/reserve
。它接受一个表示传入订单的输入参数。它是从JSON自动反序列化的。
在下面可见的代码片段中,我们在customer-saga
应用程序中实现订单处理。一旦收到订单,它将对客户帐户执行预订。然后它需要发送一个对order-saga
的响应。为此,我们可以再次使用Quarkus反应式消息传递支持。我们定义发射器对象,它允许我们向主题发送单个事件。我们可以在一个方法中使用,该方法不返回任何应该发送到主题的输出(带有@Outgoing
)。发射器bean应该用@Channel
注释。它的工作原理类似于@Outgoing
。我们还需要定义与频道名称相关的输出主题。
@Slf4j
public class OrderReserveFunction {
@Inject
private CustomerRepository repository;
@Inject
@Channel("reserve-events")
Emitter<Order> orderEmitter;
@Funq
public void reserve(Order order) {
log.info("Received order: {}", order);
doReserve(order);
}
private void doReserve(Order order) {
Customer customer = repository.findById(order.getCustomerId());
log.info("Customer: {}", customer);
if (order.getStatus() == OrderStatus.NEW) {
customer.setAmountReserved(customer.getAmountReserved() + order.getAmount());
customer.setAmountAvailable(customer.getAmountAvailable() - order.getAmount());
order.setStatus(OrderStatus.IN_PROGRESS);
log.info("Order reserved: {}", order);
orderEmitter.send(order);
} else if (order.getStatus() == OrderStatus.CONFIRMED) {
customer.setAmountReserved(customer.getAmountReserved() - order.getAmount());
}
repository.persist(customer);
}
}
以下是Kafka和Emitter
之间集成的配置属性。应为customer-saga
和product-saga
创建相同的配置属性。
kafka.bootstrap.servers = ${KAFKA_BOOTSTRAP_SERVERS}
mp.messaging.outgoing.reserve-events.connector = smallrye-kafka
mp.messaging.outgoing.reserve-events.topic = reserve-events
mp.messaging.outgoing.reserve-events.value.serializer = io.quarkus.kafka.client.serialization.ObjectMapperSerializer
最后,让我们看一下product-saga
应用程序中Quarkus函数的实现。它还使用Emitter
对象向reserve-events
主题发送响应。它处理传入订单,并为请求的产品数量执行预订。
@Slf4j
public class OrderReserveFunction {
@Inject
private ProductRepository repository;
@Inject
@Channel("reserve-events")
Emitter<Order> orderEmitter;
@Funq
public void reserve(Order order) {
log.info("Received order: {}", order);
doReserve(order);
}
private void doReserve(Order order) {
Product product = repository.findById(order.getProductId());
log.info("Product: {}", product);
if (order.getStatus() == OrderStatus.NEW) {
product.setReservedItems(product.getReservedItems() + order.getProductsCount());
product.setAvailableItems(product.getAvailableItems() - order.getProductsCount());
order.setStatus(OrderStatus.IN_PROGRESS);
orderEmitter.send(order);
} else if (order.getStatus() == OrderStatus.CONFIRMED) {
product.setReservedItems(product.getReservedItems() - order.getProductsCount());
}
repository.persist(product);
}
}
步骤4. 在Knative上部署Quarkus应用程序
最后,我们可以在Knative上部署所有应用程序。为了简化该过程,我们可以使用Quarkus Kubernetes支持。它能够根据源代码和应用程序属性自动生成部署清单。Quarkus还支持使用Jib构建图像。首先,让我们向Maven pom.xml
添加以下依赖项。
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kubernetes</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-container-image-jib</artifactId>
</dependency>
在下一步中,我们需要向application.properties
文件添加一些配置设置。要在Kubernetes上启用自动部署,属性quarkus.Kubernetes.deploy
必须设置为true
。然后,我们应该将目标平台更改为Knative。由于Quarkus将生成Knative服务,而不是标准的Kubernetes部署。最后一个属性quarkus.container-image.group
负责设置映像所有者组的名称。对于Knative的本地开发,我们应该在那里设置dev.local
值。
quarkus.kubernetes.deploy = true
quarkus.kubernetes.deployment-target = knative
quarkus.container-image.group = dev.local
在设置了上面所有可见的值之后,我们只需要执行Maven build来部署应用程序。
$ mvn clean package
在为所有应用程序运行Maven build之后,让我们验证一个Knative服务列表。
一旦order-saga
应用程序启动,它就开始连续发送订单。它还接收customer-saga
和product-saga
发送的订单事件。这些事件由Quarkus函数处理。这是由order-saga
打印的日志。
最后的想法
如您所见,我们可以轻松地在Knative上实现和部署Quarkus应用程序。Quarkus提供了几个扩展,简化了与Knative事件模型和Kafka代理的集成。我们可以使用Quarkus Funqy实现serverless无服务器FaaS方法,或者使用SmallRye反应式消息传递与Apache Kafka集成
原文地址:https://piotrminkowski.com/2021/03/31/knative-eventing-with-kafka-and-quarkus/
除特别注明外,本站所有文章均为老K的Java博客原创,转载请注明出处来自https://javakk.com/2221.html
暂无评论