在本文中,您将学习如何使用Kafka和Spring Cloud在Knative上运行事件应用程序。我将向您展示什么是Knative Eventing,以及如何将其与Kafka broker集成。我们将在Spring Cloud函数和Spring Cloud Stream的基础上构建我们的应用程序。所有这些解决方案似乎都是完美的搭配。
如果你想自己试试,你可以随时看看我的源代码。为此,您需要克隆我的GitHub存储库:https://github.com/piomin/sample-spring-boot-saga-eventing.git
今天,我们将基于符合最终一致性模式的简单体系结构。它也被称为传奇模式。那到底是什么?示例系统由三个服务组成。订单服务创建与客户和产品相关的新订单。这个命令被发送到卡夫卡主题。然后,我们的另外两个应用程序客户服务和产品服务接收订单事件。之后,他们进行预订。客户服务在客户账户上保留订单金额。同时,产品服务部保留了订单中指定的一些产品。这两个服务都通过Kafka主题向order服务发送响应。如果订单服务从两个服务接收到肯定的预订,它将确认订单。然后,它发送一个包含该信息的事件。客户服务部和产品服务部都会收到活动并确认预订。您可以在下面的图片中进行验证。
先决条件
在开始之前,我们首先需要在Kubernetes集群上安装Knative。我用的是Kubernetes的本地实例。但你也可以用任何遥控器,比如GKE。但是,最新版本的Knative需要Kubernetes cluster v1.17或更高版本。当然,我们需要同时安装服务和事件组件。您可以在这里找到详细的安装说明。
还不止这些。我们还需要安装Kafka事件代理。这是发布网站的链接。它包括几个部署和crd。您应该特别注意KafkaSource和KafkaBinding crd,因为我们稍后将使用它们。
最后,我们需要在Kubernetes上安装Kafka集群。建议使用Strimzi运算符来执行此操作。Strimzi为在Kubernetes上运行Kafka提供了容器映像和操作符。它还附带了一组用于管理Kafka集群的crd。安装后,您可以继续下一步。我把它安装在Kafka的名字空间里。以下是跑步舱的列表。
步骤1:创建和配置Knative Kafka Broker
在第一步中,我们将使用Strimzi CRD创建一个Kafka集群。为了简化,我们将不使用任何更高级的配置设置。例如,我使用了临时存储,这在生产中是不推荐的。我设置了三个Zookeeper实例。我听说Kafka终于打算辞去Zookeeper的职务,但现在的版本仍然是基于这个。
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
replicas: 1
listeners:
plain: {}
tls: {}
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
log.message.format.version: "2.4"
storage:
type: ephemeral
zookeeper:
replicas: 3
storage:
type: ephemeral
entityOperator:
topicOperator: {}
userOperator: {}
Knative代理允许将事件路由到不同的事件接收器或使用者。我们可以使用不同的代理提供者。当事件被发送到代理时,除了CloudEvent数据和上下文属性之外的所有请求元数据都被剥离。事件传递机制对事件生产者和使用者隐藏事件路由的细节。默认的代理类是MTChannelBasedBroker。我们要把它改成Kafka。
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
annotations:
eventing.knative.dev/broker.class: Kafka
name: default
spec:
config:
apiVersion: v1
kind: ConfigMap
name: kafka-broker-config
namespace: knative-eventing
在本文中,我们不会直接使用Kafka代理。相反,我们将使用KafkaSource对象,它从特定主题获取事件并将它们发送给订阅者。如果您想使用代理,您需要定义引用它的Knative触发器。
代理引用ConfigMap kafka代理config。最重要的是设置kafka集群的地址。如果您没有更改默认kafka安装文件中的任何内容,那么它是${KAFKA_CLUSTER_NAME}-kafka-bootstrap
和端口9092。
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-broker-config
namespace: knative-eventing
data:
default.topic.partitions: "10"
default.topic.replication.factor: "1"
bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"
步骤2:使用Spring Cloud Stream创建应用程序
让我们从依赖性开始。我们的每个应用程序都使用内存中的H2数据库。它们使用Spring Data JPA存储库模式与数据库集成。不过,最重要的是,它们都是基于Spring Cloud Stream与 Kafka Topic进行互动的。Spring Cloud Stream需要向类路径添加一个具体的绑定器实现。这就是为什么我们添加了Spring Cloud开始Kafka开始。一段时间以来,Spring Cloud Stream编程模型是建立在Spring Cloud函数之上的。幸运的是,我们可以很容易地将函数导出为HTTP端点。这个功能以后会对我们有用。现在,让我们看看包含的依赖项列表。
org.springframework.boot
spring-boot-starter-data-jpa
org.springframework.cloud
spring-cloud-starter-function-web
org.springframework.cloud
spring-cloud-stream
org.springframework.cloud
spring-cloud-starter-stream-kafka
org.projectlombok
lombok
1.18.16
com.h2database
h2
runtime
下面是订单服务的模型类。一旦订单被创建并保存在数据库中,订单服务就会将其发送到输出Kafka主题。
@Entity
@Table(name = "orders")
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Order {
@Id
private Integer id;
private Integer customerId;
private Integer productId;
private int amount;
private int productCount;
@Enumerated
private OrderStatus status = OrderStatus.NEW;
}
在order服务应用程序主类中有三个函数。其中两个连续地向输出目的地发送事件。另一方面,它们中的第三个confirm()
等待传入事件。我们稍后再讨论。orderEventSupplier
函数代表了我们场景中的第一步。它用测试数据创建一个新的订单,在发送之前将其保存在数据库中。
@SpringBootApplication
@Slf4j
public class OrderSagaApplication {
public static void main(String[] args) {
SpringApplication.run(OrderSagaApplication.class, args);
}
private static int num = 0;
private BlockingQueue queue = new LinkedBlockingQueue<>();
@Bean
public Supplier orderEventSupplier() {
return () -> repository.save(new Order(++num, num%10+1, num%10+1, 100, 1, OrderStatus.NEW));
}
@Bean
public Supplier orderConfirmSupplier() {
return () -> queue.poll();
}
@Bean
public Consumer> confirm() {
return this::doConfirm;
}
@Autowired
OrderRepository repository;
private void doConfirm(Message msg) {
Order o = msg.getPayload();
log.info("Order received: {}", o);
Order order = repository.findById(o.getId()).orElseThrow();
if (order.getStatus() == OrderStatus.NEW) {
order.setStatus(OrderStatus.IN_PROGRESS);
} else if (order.getStatus() == OrderStatus.IN_PROGRESS) {
order.setStatus(OrderStatus.CONFIRMED);
log.info("Order confirmed : {}", order);
queue.offer(order);
}
repository.save(order);
}
}
输出Kafka主题的名称是order events
。我们使用Spring Cloud Stream绑定模式为这两个供应商函数设置它。另一方面,Consumer函数不会直接从Kafka主题接收事件。为什么?因为它是Knative事件处理过程的一部分,我将在后面的步骤中解释它。目前,使用spring.cloud.function.definition
属性指定只有供应商绑定到外部目标非常重要。
spring.application.name: order-saga
spring.cloud.stream.bindings.orderEventSupplier-out-0.destination: order-events
spring.cloud.stream.bindings.orderConfirmSupplier-out-0.destination: order-events
spring.kafka.bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
spring.cloud.function.definition: orderEventSupplier;orderConfirmSupplier
最后,我们需要创建KafkaBinding,它将Kafka引导信息注入到应用程序容器中(通过Knative服务)。然后,应用程序可以作为KAFKA_BOOTSTRAP_SERVERS
环境变量访问它。
apiVersion: bindings.knative.dev/v1beta1
kind: KafkaBinding
metadata:
name: kafka-binding-order-saga
spec:
subject:
apiVersion: serving.knative.dev/v1
kind: Service
name: order-saga
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
步骤3:创建Kafka源和Spring Cloud Function端点
好的,我们已经创建了一个函数,负责生成订单并将订单发送到订单服务中的Kafka主题。所以,现在我们的目标是从客户服务和产品服务两方面来接收和处理。我们的应用程序不会直接监听卡Kafka题的传入事件。澄清一下,Knative事件处理的基本假设是应用程序不关心事件是如何发布的。它将以HTTP POST的形式接收事件。卡夫卡资源对象来了。它将输入主题列表和目标接收器作为参数。在我们的例子中,它从order
事件获取消息,并将其作为httppost发送到customer saga Knative
服务的端点/customers/reserve
。
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source
spec:
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
topics:
- order-events
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: customer-saga
uri: /customers/reserve
下面是customer saga
应用程序的实现。由于SpringCloudFunctionWeb
,它自动将reserve
函数导出为带有路径/reserve的HTTP端点。一旦消费者收到事件,它就执行其余的业务逻辑。如果输入订单有一个新的状态,customer saga将为客户帐户上的特定金额创建预订。然后它向顺序saga发送事件响应。换句话说,它首先将事件放入BlockingQueue
。我们还使用一个Supplier
函数将事件发送到Kafka主题。这一次,supplier
函数从BlockingQueue获取Order
对象。最后,如果我们的应用程序从order saga收到确认订单,它将通过删除保留金额来提交整个事务。
@SpringBootApplication
@Slf4j
public class CustomerSagaApplication {
public static void main(String[] args) {
SpringApplication.run(CustomerSagaApplication.class, args);
}
private BlockingQueue queue = new LinkedBlockingQueue<>();
@Autowired
private CustomerRepository repository;
@Bean
public Supplier orderEventSupplier() {
return () -> queue.poll();
}
@Bean
public Consumer> reserve() {
return this::doReserve;
}
private void doReserve(Message msg) {
Order order = msg.getPayload();
log.info("Body: {}", order);
Customer customer = repository.findById(order.getCustomerId()).orElseThrow();
log.info("Customer: {}", customer);
if (order.getStatus() == OrderStatus.NEW) {
customer.setAmountReserved(customer.getAmountReserved() + order.getAmount());
customer.setAmountAvailable(customer.getAmountAvailable() - order.getAmount());
order.setStatus(OrderStatus.IN_PROGRESS);
queue.offer(order);
} else if (order.getStatus() == OrderStatus.CONFIRMED) {
customer.setAmountReserved(customer.getAmountReserved() - order.getAmount());
}
repository.save(customer);
}
}
我们还可以使用spring.cloud.function.web.path
属性为HTTP端点设置基本上下文路径。因此,目标端点的最终路径是/customers/reserver
。它与KafkaSource定义中定义的地址相同。
spring.cloud.function.web.path: /customers
下面是application.yml
文件中客户传奇的配置。
spring.application.name: customer-saga
spring.cloud.function.web.path: /customers
spring.cloud.stream.bindings.orderEventSupplier-out-0.destination: reserve-events
spring.kafka.bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
spring.cloud.function.definition: orderEventSupplier
产品业务逻辑的实现与客户逻辑非常相似。只有一个用户函数接收订单,一个供应商负责发送订单响应。
@SpringBootApplication
@Slf4j
public class ProductSagaApplication {
public static void main(String[] args) {
SpringApplication.run(ProductSagaApplication.class, args);
}
@Autowired
private ProductRepository repository;
private BlockingQueue queue = new LinkedBlockingQueue<>();
@Bean
public Supplier orderEventSupplier() {
return () -> queue.poll();
}
@Bean
public Consumer> reserve() {
return this::doReserve;
}
private void doReserve(Message msg) {
Order order = msg.getPayload();
log.info("Body: {}", order);
Product product = repository.findById(order.getProductId()).orElseThrow();
log.info("Product: {}", product);
if (order.getStatus() == OrderStatus.NEW) {
product.setReservedItems(product.getReservedItems() + order.getProductsCount());
product.setAvailableItems(product.getAvailableItems() - order.getProductsCount());
order.setStatus(OrderStatus.IN_PROGRESS);
queue.offer(order);
} else if (order.getStatus() == OrderStatus.CONFIRMED) {
product.setReservedItems(product.getReservedItems() - order.getProductsCount());
}
repository.save(product);
}
}
第4步:在Knative Eventing和Kafka上运行应用程序
下面是我们应用程序的Knative服务的典型定义。我使用的是dev.local
选项,但是如果您运行远程集群,则可以用Docker用户名或任何其他存储库帐户替换它。
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: order-saga
spec:
template:
spec:
containers:
- image: dev.local/order-saga
我使用Skaffold
和jibmaven
插件在Knative上构建和部署应用程序。我的目标命名空间是无服务器的。使用tail
选项,您可以在部署后观察日志。当然,您也可以使用skaffold dev
命令。
$ skaffold run --tail -n serverless
在使用Kafka在Knative eventing上运行所有应用程序之后,我们可以使用kncli
验证服务列表。
然后,我们可以验证所有的KafkaBindings都被创建了。为此,让我们执行以下kubectl
命令。
下一个重要的组成部分是KafkaSource。我们已经创建了三个源,每个应用程序一个。
启动之后,order-saga
应用程序每秒不断生成并发送一个新订单。product-saga
和customer-saga
都接收事件并发送响应。多亏了这一点,交通交换没有任何中断。除了应用程序pods
,我们有三个与卡夫卡来源豆荚。
让我们看看应用程序日志。这是骑士团传奇的日志。正如您所看到的,它接收来自customer-saga
和product-saga
的订单预订。之后,它确认顺序并将响应发送回Kafka上的order事件主题。基本上,这就是我们想要达到的目标。
最后的想法
Knative仍然是一个相对较新的解决方案。我想在不久的将来我们可能会看到一些新的有趣的特性。对于Knative事件,您可以使用Kafka以外的其他事件源。就我个人而言,我正在等待与RabbitMQ的集成,它现在正在开发中。
原文地址:https://piotrminkowski.com/2021/03/12/knative-eventing-with-kafka-and-spring-cloud/
除特别注明外,本站所有文章均为老K的Java博客原创,转载请注明出处来自https://javakk.com/2131.html
暂无评论