3年前 (2021-08-13)  相关技术 |   抢沙发  547 
文章评分 0 次,平均分 0.0
[收起] 文章目录

基于Kafka和Quarkus的Knative Eventing应用
在本文中,您将学习如何使用Kafka和Quarkus在Knative上运行事件应用程序。之前,我描述了Kafka和Spring Cloud的相同方法。我们将部署完全相同的体系结构。然而,我们将使用Quarkus Funqy代替SpringCloud函数。此外,Spring Cloud流可能会被Quarkus Kafka所取代。在我们开始之前,让我们澄清一些事情。

Knative 和 Quarkus概念

Quarkus以几种方式支持Knative。首先,我们可以使用Quarkus Kubernetes模块简化在Knative上的部署。我们还可以使用Quarkus Funqy Knative事件扩展在函数中路由和处理云事件。还不止这些。Quarkus支持serverless无服务器功能样式。使用Quarkus Funqy模块,我们可以编写可部署到各种FAA(包括Knative)的函数。这些函数可以通过HTTP调用。最后,我们可以使用来自Quarkus Kafka扩展的注释将我们的应用程序与Kafka主题集成。

Quarkus Funqy Knative事件模块基于Knative代理和触发器。因为我们将使用Kafka源代码而不是代理和触发器,所以我们不包括该模块。然而,我们仍然可以利用Quarkus Funqy和HTTP绑定。

源代码

如果您想自己尝试,您可以随时查看我的源代码:https://github.com/piomin/sample-quarkus-serverless-kafka.git,为此,您需要克隆我的GitHub存储库。那你应该按照我的指示去做。

我们将实现一个最终的一致性模式(也称为SAGA模式)。它是如何工作的?示例系统由三个服务组成。订单服务order-service创建与客户和产品相关的新订单。该命令被发送到Kafka主题。然后,我们的另外两个应用程序客户服务customer-service和产品服务product-service接收订单事件。之后,他们进行预订。客户服务在客户账户上保留订单金额。同时,产品服务部保留订单中指定的一些产品。这两个服务都通过Kafka主题向订单服务发送响应。如果订单服务收到来自两个服务的肯定预订,则确认订单。然后,它发送一个包含该信息的事件。客户服务和产品服务均接收活动并确认预订。您可以在下图中进行验证。

基于Kafka和Quarkus的Knative Eventing应用

先决条件

在开始之前,我们需要遵守几个要求:

1. 至少具有1.17版本的Kubernetes群集。我正在使用本地群集。如果使用远程群集,请将映像名中的dev.local替换为Docker帐户名

2. 在集群上安装Knative服务和事件。您可以在这里(https://knative.dev/docs/install/any-kubernetes-cluster/)找到详细的安装说明。

3. 安装Kafka事件代理。这是发布网站的链接:https://github.com/knative-sandbox/eventing-kafka-broker/releases。您不需要所有东西–我们将使用KafkaSource和Kafkabind CRD

4. 使用Strimzi操作器安装Kafka群集。我把它安装在卡夫卡的命名空间中。我的群集的名称是我的群集。

步骤1. 安装Kafka Knative组件

假设您已经安装了在Kubernetes集群上运行Knative Eventing所需的所有元素,我们可能会创建一些专用于应用程序的组件。您可以在每个应用程序目录的k8s目录中找到带有对象声明的YAML清单。首先,让我们创建一个KafkaBinding。它负责将Kafka集群的地址注入到应用程序容器中。由于kafkabind,该地址在容器中作为KAFKA_BOOTSTRAP_SERVERS环境变量可见。下面是customer-saga应用程序的YAML声明示例。我们应该为另外两个应用程序创建类似的对象。

apiVersion: bindings.knative.dev/v1beta1
kind: KafkaBinding
metadata:
  name: kafka-binding-customer-saga
spec:
  subject:
    apiVersion: serving.knative.dev/v1
    kind: Service
    name: customer-saga
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092

在下一步中,我们将创建KafkaSource对象。它从特定主题读取事件并将其传递给使用者。它调用应用程序公开的HTTP POST端点。我们可以覆盖HTTP端点的默认上下文路径。对于客户传奇,目标URL为/reserve。它应该从order-events主题接收事件。因为customer-sagaproduct-saga都会侦听order-events主题中的事件,所以我们也需要为product-saga创建一个类似的KafkaSource对象。

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source-orders-customer
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topics:
    - order-events
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: customer-saga
    uri: /reserve

另一方面,order-saga侦听reserve-events主题上的事件。如果您想再次验证我们的场景,请参考源代码部分中的图表。这次目标URL为/confirm

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source-orders-confirm
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topics:
    - reserve-events
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: order-saga
    uri: /confirm

让我们来验证Kafka的来源列表。在我们的例子中,每个应用程序只有一个KafkaSource。在将我们的Quarkus应用程序部署到Knative上之前,您的Kafka源尚未准备就绪。

基于Kafka和Quarkus的Knative Eventing应用

步骤2. 将Quarkus与Kafka集成

为了将Quarkus与Apache Kafka集成,我们可以使用SmallRye反应式消息传递库。因此,我们可以使用注释为每个方法定义输入和输出主题。消息被序列化为JSON。我们还可以在运行状况检查中自动公开Kafka连接状态。下面是我们需要包含在Maven pom.xml中的依赖项列表。

<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-jackson</artifactId>
</dependency>
<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>
<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-smallrye-health</artifactId>
</dependency>

在开始编写源代码之前,我们需要在application.properties文件中提供一些配置设置。当然,Kafka需要到集群的连接URL。我们使用Kafkabind对象注入的环境变量。此外,还应配置输出主题名称。下面是order-saga应用程序所需属性的列表。

kafka.bootstrap.servers = ${KAFKA_BOOTSTRAP_SERVERS}

mp.messaging.outgoing.order-events.connector = smallrye-kafka
mp.messaging.outgoing.order-events.topic = order-events
mp.messaging.outgoing.order-events.value.serializer = io.quarkus.kafka.client.serialization.ObjectMapperSerializer

最后,我们可以切换到代码。让我们从order-saga开始。它将持续向order-events主题发送订单。customer-sagaproduct-saga应用程序都会收到这些事件。负责生成和发送事件的方法使用Mutiny Multi返回反应流。它每秒发送一个事件。我们需要使用@Outgoing注释对方法进行注释,该注释传递应用程序属性中定义的输出名称。另外,@Broadcast注释表示事件已分派给所有订阅者。在发送之前,每个订单都需要保存在一个数据库中(我们使用H2内存数据库)。

@ApplicationScoped
@Slf4j
public class OrderPublisher {

   private static int num = 0;

   @Inject
   private OrderRepository repository;
   @Inject
   private UserTransaction transaction;

   @Outgoing("order-events")
   @Broadcast
   public Multi<Order> orderEventsPublish() {
      return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
           .map(tick -> {
              Order o = new Order(++num, num%10+1, num%10+1, 100, 1, OrderStatus.NEW);
              try {
                 transaction.begin();
                 repository.persist(o);
                 transaction.commit();
              } catch (Exception e) {
                 log.error("Error in transaction", e);
              }

              log.info("Order published: {}", o);
              return o;
           });
   }

}

步骤3. 使用Quarkus Funqy处理Knative事件

好的,在前面的步骤中,我们已经实现了负责将事件发送到Kafka主题的部分代码。我们还有KafkaSource,它负责将事件从Kafka主题发送到应用程序HTTP端点。现在,我们只需要处理它们。Quarkus Funqy非常简单。它允许我们根据无服务器Faas方法创建函数。但我们也可以使用Quarkus Funqy HTTP扩展轻松地将每个函数绑定到HTTP端点。让我们将其包含在依赖项中。

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-funqy-http</artifactId>
 </dependency>

为了用Quarkus Funqy创建函数,我们只需要用@Funq注释特定的方法。该方法的名称为reserve,因此它会自动绑定到HTTP端点POST/reserve。它接受一个表示传入订单的输入参数。它是从JSON自动反序列化的。

在下面可见的代码片段中,我们在customer-saga应用程序中实现订单处理。一旦收到订单,它将对客户帐户执行预订。然后它需要发送一个对order-saga的响应。为此,我们可以再次使用Quarkus反应式消息传递支持。我们定义发射器对象,它允许我们向主题发送单个事件。我们可以在一个方法中使用,该方法不返回任何应该发送到主题的输出(带有@Outgoing)。发射器bean应该用@Channel注释。它的工作原理类似于@Outgoing。我们还需要定义与频道名称相关的输出主题。

@Slf4j
public class OrderReserveFunction {

   @Inject
   private CustomerRepository repository;
   @Inject
   @Channel("reserve-events")
   Emitter<Order> orderEmitter;

   @Funq
   public void reserve(Order order) {
      log.info("Received order: {}", order);
      doReserve(order);
   }

   private void doReserve(Order order) {
      Customer customer = repository.findById(order.getCustomerId());
      log.info("Customer: {}", customer);
      if (order.getStatus() == OrderStatus.NEW) {
         customer.setAmountReserved(customer.getAmountReserved() + order.getAmount());
         customer.setAmountAvailable(customer.getAmountAvailable() - order.getAmount());
         order.setStatus(OrderStatus.IN_PROGRESS);
         log.info("Order reserved: {}", order);
         orderEmitter.send(order);
      } else if (order.getStatus() == OrderStatus.CONFIRMED) {
         customer.setAmountReserved(customer.getAmountReserved() - order.getAmount());
      }
      repository.persist(customer);
   }
}

以下是Kafka和Emitter之间集成的配置属性。应为customer-sagaproduct-saga创建相同的配置属性。

kafka.bootstrap.servers = ${KAFKA_BOOTSTRAP_SERVERS}

mp.messaging.outgoing.reserve-events.connector = smallrye-kafka
mp.messaging.outgoing.reserve-events.topic = reserve-events
mp.messaging.outgoing.reserve-events.value.serializer = io.quarkus.kafka.client.serialization.ObjectMapperSerializer

最后,让我们看一下product-saga应用程序中Quarkus函数的实现。它还使用Emitter对象向reserve-events主题发送响应。它处理传入订单,并为请求的产品数量执行预订。

@Slf4j
public class OrderReserveFunction {

   @Inject
   private ProductRepository repository;

   @Inject
   @Channel("reserve-events")
   Emitter<Order> orderEmitter;

   @Funq
   public void reserve(Order order) {
      log.info("Received order: {}", order);
      doReserve(order);
   }

   private void doReserve(Order order) {
      Product product = repository.findById(order.getProductId());
      log.info("Product: {}", product);
      if (order.getStatus() == OrderStatus.NEW) {
         product.setReservedItems(product.getReservedItems() + order.getProductsCount());
         product.setAvailableItems(product.getAvailableItems() - order.getProductsCount());
         order.setStatus(OrderStatus.IN_PROGRESS);
         orderEmitter.send(order);
      } else if (order.getStatus() == OrderStatus.CONFIRMED) {
         product.setReservedItems(product.getReservedItems() - order.getProductsCount());
      }
      repository.persist(product);
   }
}

步骤4. 在Knative上部署Quarkus应用程序

最后,我们可以在Knative上部署所有应用程序。为了简化该过程,我们可以使用Quarkus Kubernetes支持。它能够根据源代码和应用程序属性自动生成部署清单。Quarkus还支持使用Jib构建图像。首先,让我们向Maven pom.xml添加以下依赖项。

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-kubernetes</artifactId>
 </dependency>
 <dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-container-image-jib</artifactId>
 </dependency>

在下一步中,我们需要向application.properties文件添加一些配置设置。要在Kubernetes上启用自动部署,属性quarkus.Kubernetes.deploy必须设置为true。然后,我们应该将目标平台更改为Knative。由于Quarkus将生成Knative服务,而不是标准的Kubernetes部署。最后一个属性quarkus.container-image.group负责设置映像所有者组的名称。对于Knative的本地开发,我们应该在那里设置dev.local值。

quarkus.kubernetes.deploy = true
quarkus.kubernetes.deployment-target = knative
quarkus.container-image.group = dev.local

在设置了上面所有可见的值之后,我们只需要执行Maven build来部署应用程序。

$ mvn clean package

在为所有应用程序运行Maven build之后,让我们验证一个Knative服务列表。

基于Kafka和Quarkus的Knative Eventing应用

一旦order-saga应用程序启动,它就开始连续发送订单。它还接收customer-sagaproduct-saga发送的订单事件。这些事件由Quarkus函数处理。这是由order-saga打印的日志。

基于Kafka和Quarkus的Knative Eventing应用

最后的想法

如您所见,我们可以轻松地在Knative上实现和部署Quarkus应用程序。Quarkus提供了几个扩展,简化了与Knative事件模型和Kafka代理的集成。我们可以使用Quarkus Funqy实现serverless无服务器FaaS方法,或者使用SmallRye反应式消息传递与Apache Kafka集成

原文地址:https://piotrminkowski.com/2021/03/31/knative-eventing-with-kafka-and-quarkus/

 

除特别注明外,本站所有文章均为老K的Java博客原创,转载请注明出处来自https://javakk.com/2221.html

关于

发表评论

表情 格式

暂无评论

登录

忘记密码 ?

切换登录

注册