2周前 (07-16)  相关技术 |   抢沙发  14 
文章评分 0 次,平均分 0.0
[收起] 文章目录

Kafka和Spring Cloud在Knative上运行Eventing

在本文中,您将学习如何使用Kafka和Spring Cloud在Knative上运行事件应用程序。我将向您展示什么是Knative Eventing,以及如何将其与Kafka broker集成。我们将在Spring Cloud函数和Spring Cloud Stream的基础上构建我们的应用程序。所有这些解决方案似乎都是完美的搭配。

如果你想自己试试,你可以随时看看我的源代码。为此,您需要克隆我的GitHub存储库:https://github.com/piomin/sample-spring-boot-saga-eventing.git

今天,我们将基于符合最终一致性模式的简单体系结构。它也被称为传奇模式。那到底是什么?示例系统由三个服务组成。订单服务创建与客户和产品相关的新订单。这个命令被发送到卡夫卡主题。然后,我们的另外两个应用程序客户服务和产品服务接收订单事件。之后,他们进行预订。客户服务在客户账户上保留订单金额。同时,产品服务部保留了订单中指定的一些产品。这两个服务都通过Kafka主题向order服务发送响应。如果订单服务从两个服务接收到肯定的预订,它将确认订单。然后,它发送一个包含该信息的事件。客户服务部和产品服务部都会收到活动并确认预订。您可以在下面的图片中进行验证。

Kafka和Spring Cloud在Knative上运行Eventing

先决条件

在开始之前,我们首先需要在Kubernetes集群上安装Knative。我用的是Kubernetes的本地实例。但你也可以用任何遥控器,比如GKE。但是,最新版本的Knative需要Kubernetes cluster v1.17或更高版本。当然,我们需要同时安装服务和事件组件。您可以在这里找到详细的安装说明。

还不止这些。我们还需要安装Kafka事件代理。这是发布网站的链接。它包括几个部署和crd。您应该特别注意KafkaSource和KafkaBinding crd,因为我们稍后将使用它们。

最后,我们需要在Kubernetes上安装Kafka集群。建议使用Strimzi运算符来执行此操作。Strimzi为在Kubernetes上运行Kafka提供了容器映像和操作符。它还附带了一组用于管理Kafka集群的crd。安装后,您可以继续下一步。我把它安装在Kafka的名字空间里。以下是跑步舱的列表。

Kafka和Spring Cloud在Knative上运行Eventing

步骤1:创建和配置Knative Kafka Broker

在第一步中,我们将使用Strimzi CRD创建一个Kafka集群。为了简化,我们将不使用任何更高级的配置设置。例如,我使用了临时存储,这在生产中是不推荐的。我设置了三个Zookeeper实例。我听说Kafka终于打算辞去Zookeeper的职务,但现在的版本仍然是基于这个。

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    replicas: 1
    listeners:
      plain: {}
      tls: {}
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      log.message.format.version: "2.4"
    storage:
      type: ephemeral
  zookeeper:
    replicas: 3
    storage:
      type: ephemeral
  entityOperator:
    topicOperator: {}
    userOperator: {}

Knative代理允许将事件路由到不同的事件接收器或使用者。我们可以使用不同的代理提供者。当事件被发送到代理时,除了CloudEvent数据和上下文属性之外的所有请求元数据都被剥离。事件传递机制对事件生产者和使用者隐藏事件路由的细节。默认的代理类是MTChannelBasedBroker。我们要把它改成Kafka。

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    eventing.knative.dev/broker.class: Kafka
  name: default
spec:
  config:
    apiVersion: v1
    kind: ConfigMap
    name: kafka-broker-config
    namespace: knative-eventing

在本文中,我们不会直接使用Kafka代理。相反,我们将使用KafkaSource对象,它从特定主题获取事件并将它们发送给订阅者。如果您想使用代理,您需要定义引用它的Knative触发器。

代理引用ConfigMap kafka代理config。最重要的是设置kafka集群的地址。如果您没有更改默认kafka安装文件中的任何内容,那么它是${KAFKA_CLUSTER_NAME}-kafka-bootstrap和端口9092。

apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-broker-config
  namespace: knative-eventing
data:
  default.topic.partitions: "10"
  default.topic.replication.factor: "1"
  bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"

步骤2:使用Spring Cloud Stream创建应用程序

让我们从依赖性开始。我们的每个应用程序都使用内存中的H2数据库。它们使用Spring Data JPA存储库模式与数据库集成。不过,最重要的是,它们都是基于Spring Cloud Stream与 Kafka Topic进行互动的。Spring Cloud Stream需要向类路径添加一个具体的绑定器实现。这就是为什么我们添加了Spring Cloud开始Kafka开始。一段时间以来,Spring Cloud Stream编程模型是建立在Spring Cloud函数之上的。幸运的是,我们可以很容易地将函数导出为HTTP端点。这个功能以后会对我们有用。现在,让我们看看包含的依赖项列表。

org.springframework.boot
    spring-boot-starter-data-jpa


    org.springframework.cloud
    spring-cloud-starter-function-web


    org.springframework.cloud
    spring-cloud-stream


    org.springframework.cloud
    spring-cloud-starter-stream-kafka


    org.projectlombok
    lombok
    1.18.16


    com.h2database
    h2
    runtime

下面是订单服务的模型类。一旦订单被创建并保存在数据库中,订单服务就会将其发送到输出Kafka主题。

@Entity
@Table(name = "orders")
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Order {

    @Id
    private Integer id;
    private Integer customerId;
    private Integer productId;
    private int amount;
    private int productCount;
    @Enumerated
    private OrderStatus status = OrderStatus.NEW;
}

在order服务应用程序主类中有三个函数。其中两个连续地向输出目的地发送事件。另一方面,它们中的第三个confirm()等待传入事件。我们稍后再讨论。orderEventSupplier函数代表了我们场景中的第一步。它用测试数据创建一个新的订单,在发送之前将其保存在数据库中。

@SpringBootApplication
@Slf4j
public class OrderSagaApplication {

    public static void main(String[] args) {
        SpringApplication.run(OrderSagaApplication.class, args);
    }

    private static int num = 0;
    private BlockingQueue queue = new LinkedBlockingQueue<>();

    @Bean
    public Supplier orderEventSupplier() {
        return () -> repository.save(new Order(++num, num%10+1, num%10+1, 100, 1, OrderStatus.NEW));
    }

    @Bean
    public Supplier orderConfirmSupplier() {
        return () -> queue.poll();
    }

    @Bean
    public Consumer> confirm() {
        return this::doConfirm;
    }

    @Autowired
    OrderRepository repository;

    private void doConfirm(Message msg) {
        Order o = msg.getPayload();
        log.info("Order received: {}", o);
        Order order = repository.findById(o.getId()).orElseThrow();
        if (order.getStatus() == OrderStatus.NEW) {
            order.setStatus(OrderStatus.IN_PROGRESS);
        } else if (order.getStatus() == OrderStatus.IN_PROGRESS) {
            order.setStatus(OrderStatus.CONFIRMED);
            log.info("Order confirmed : {}", order);
            queue.offer(order);
        }
        repository.save(order);
    }

}

输出Kafka主题的名称是order events。我们使用Spring Cloud Stream绑定模式为这两个供应商函数设置它。另一方面,Consumer函数不会直接从Kafka主题接收事件。为什么?因为它是Knative事件处理过程的一部分,我将在后面的步骤中解释它。目前,使用spring.cloud.function.definition属性指定只有供应商绑定到外部目标非常重要。

spring.application.name: order-saga
spring.cloud.stream.bindings.orderEventSupplier-out-0.destination: order-events
spring.cloud.stream.bindings.orderConfirmSupplier-out-0.destination: order-events
spring.kafka.bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
spring.cloud.function.definition: orderEventSupplier;orderConfirmSupplier

最后,我们需要创建KafkaBinding,它将Kafka引导信息注入到应用程序容器中(通过Knative服务)。然后,应用程序可以作为KAFKA_BOOTSTRAP_SERVERS环境变量访问它。

apiVersion: bindings.knative.dev/v1beta1
kind: KafkaBinding
metadata:
  name: kafka-binding-order-saga
spec:
  subject:
    apiVersion: serving.knative.dev/v1
    kind: Service
    name: order-saga
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092

步骤3:创建Kafka源和Spring Cloud Function端点

好的,我们已经创建了一个函数,负责生成订单并将订单发送到订单服务中的Kafka主题。所以,现在我们的目标是从客户服务和产品服务两方面来接收和处理。我们的应用程序不会直接监听卡Kafka题的传入事件。澄清一下,Knative事件处理的基本假设是应用程序不关心事件是如何发布的。它将以HTTP POST的形式接收事件。卡夫卡资源对象来了。它将输入主题列表和目标接收器作为参数。在我们的例子中,它从order事件获取消息,并将其作为httppost发送到customer saga Knative服务的端点/customers/reserve

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topics:
    - order-events
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: customer-saga
    uri: /customers/reserve

下面是customer saga应用程序的实现。由于SpringCloudFunctionWeb,它自动将reserve函数导出为带有路径/reserve的HTTP端点。一旦消费者收到事件,它就执行其余的业务逻辑。如果输入订单有一个新的状态,customer saga将为客户帐户上的特定金额创建预订。然后它向顺序saga发送事件响应。换句话说,它首先将事件放入BlockingQueue。我们还使用一个Supplier函数将事件发送到Kafka主题。这一次,supplier函数从BlockingQueue获取Order对象。最后,如果我们的应用程序从order saga收到确认订单,它将通过删除保留金额来提交整个事务。

@SpringBootApplication
@Slf4j
public class CustomerSagaApplication {

    public static void main(String[] args) {
        SpringApplication.run(CustomerSagaApplication.class, args);
    }

    private BlockingQueue queue = new LinkedBlockingQueue<>();

    @Autowired
    private CustomerRepository repository;
    
    @Bean
    public Supplier orderEventSupplier() {
        return () -> queue.poll();
    }

    @Bean
    public Consumer> reserve() {
        return this::doReserve;
    }

    private void doReserve(Message msg) {
        Order order = msg.getPayload();
        log.info("Body: {}", order);
        Customer customer = repository.findById(order.getCustomerId()).orElseThrow();
        log.info("Customer: {}", customer);
        if (order.getStatus() == OrderStatus.NEW) {
            customer.setAmountReserved(customer.getAmountReserved() + order.getAmount());
            customer.setAmountAvailable(customer.getAmountAvailable() - order.getAmount());
            order.setStatus(OrderStatus.IN_PROGRESS);
            queue.offer(order);
        } else if (order.getStatus() == OrderStatus.CONFIRMED) {
            customer.setAmountReserved(customer.getAmountReserved() - order.getAmount());
        }
        repository.save(customer);
    }
}

我们还可以使用spring.cloud.function.web.path属性为HTTP端点设置基本上下文路径。因此,目标端点的最终路径是/customers/reserver。它与KafkaSource定义中定义的地址相同。

spring.cloud.function.web.path: /customers

下面是application.yml文件中客户传奇的配置。

spring.application.name: customer-saga
spring.cloud.function.web.path: /customers
spring.cloud.stream.bindings.orderEventSupplier-out-0.destination: reserve-events
spring.kafka.bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
spring.cloud.function.definition: orderEventSupplier

产品业务逻辑的实现与客户逻辑非常相似。只有一个用户函数接收订单,一个供应商负责发送订单响应。

@SpringBootApplication
@Slf4j
public class ProductSagaApplication {

    public static void main(String[] args) {
        SpringApplication.run(ProductSagaApplication.class, args);
    }

    @Autowired
    private ProductRepository repository;

    private BlockingQueue queue = new LinkedBlockingQueue<>();

    @Bean
    public Supplier orderEventSupplier() {
        return () -> queue.poll();
    }

    @Bean
    public Consumer> reserve() {
        return this::doReserve;
    }

    private void doReserve(Message msg) {
        Order order = msg.getPayload();
        log.info("Body: {}", order);
        Product product = repository.findById(order.getProductId()).orElseThrow();
        log.info("Product: {}", product);
        if (order.getStatus() == OrderStatus.NEW) {
            product.setReservedItems(product.getReservedItems() + order.getProductsCount());
            product.setAvailableItems(product.getAvailableItems() - order.getProductsCount());
            order.setStatus(OrderStatus.IN_PROGRESS);
            queue.offer(order);
        } else if (order.getStatus() == OrderStatus.CONFIRMED) {
            product.setReservedItems(product.getReservedItems() - order.getProductsCount());
        }
        repository.save(product);
    }
}

第4步:在Knative Eventing和Kafka上运行应用程序

下面是我们应用程序的Knative服务的典型定义。我使用的是dev.local选项,但是如果您运行远程集群,则可以用Docker用户名或任何其他存储库帐户替换它。

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: order-saga
spec:
  template:
    spec:
      containers:
        - image: dev.local/order-saga

我使用Skaffoldjibmaven插件在Knative上构建和部署应用程序。我的目标命名空间是无服务器的。使用tail选项,您可以在部署后观察日志。当然,您也可以使用skaffold dev命令。

$ skaffold run --tail -n serverless

在使用Kafka在Knative eventing上运行所有应用程序之后,我们可以使用kncli验证服务列表。

Kafka和Spring Cloud在Knative上运行Eventing

然后,我们可以验证所有的KafkaBindings都被创建了。为此,让我们执行以下kubectl命令。

Kafka和Spring Cloud在Knative上运行Eventing

下一个重要的组成部分是KafkaSource。我们已经创建了三个源,每个应用程序一个。

Kafka和Spring Cloud在Knative上运行Eventing

启动之后,order-saga应用程序每秒不断生成并发送一个新订单。product-sagacustomer-saga都接收事件并发送响应。多亏了这一点,交通交换没有任何中断。除了应用程序pods,我们有三个与卡夫卡来源豆荚。

Kafka和Spring Cloud在Knative上运行Eventing

让我们看看应用程序日志。这是骑士团传奇的日志。正如您所看到的,它接收来自customer-sagaproduct-saga的订单预订。之后,它确认顺序并将响应发送回Kafka上的order事件主题。基本上,这就是我们想要达到的目标。

最后的想法

Knative仍然是一个相对较新的解决方案。我想在不久的将来我们可能会看到一些新的有趣的特性。对于Knative事件,您可以使用Kafka以外的其他事件源。就我个人而言,我正在等待与RabbitMQ的集成,它现在正在开发中。

原文地址:https://piotrminkowski.com/2021/03/12/knative-eventing-with-kafka-and-spring-cloud/

 

除特别注明外,本站所有文章均为老K的Java博客原创,转载请注明出处来自https://javakk.com/2131.html

关于

发表评论

表情 格式

暂无评论

登录

忘记密码 ?

切换登录

注册