在上一篇中介绍了Apache Pulsar的相关概念和架构体现,这一篇说明如何通过客户端程序操作Apache Pulsar。
Apache Pulsar入门
现在我们对什么是Apache Pulsar以及它是如何工作的有了更好的了解,让我们从中获得一些乐趣。
1. 首先,我们要在机器上安装并部署一个独立的集群。
$ wget https://archive.apache.org/dist/pulsar/pulsar-2.4.1/apache-pulsar-2.4.1-bin.tar.gz
$ tar -xzvf apache-pulsar-2.4.1-bin.tar.gz && cd apache-pulsar-2.4.1
$ ./bin/pulsar standalone
2. 然后,让我们打开一个新的终端,使用pulsar客户端启动一个新的消费者。
$ ./bin/pulsar-client consume -s "my-first-subscription" my-first-topic -n 10
上面的命令创建一个独占使用者,在停止之前等待10条消息。
3. 最后,在另一个新终端中,我们将生成一些消息。
$> bin/pulsar-client produce my-first-topic — messages "Hello Streams World, Make sense of streams processing"
现在,如果你回到消费终端,你会看到这样的输出:
[pulsar-client-io-1–1] WARN com.scurrilous.circe.checksum.Crc32cIntChecksum — Failed to load Circe JNI library. Falling back to Java based CRC32c provider
— — — got message — — -
Hello Streams World
— — — got message — — -
Make sense of streams processing
[pulsar-timer-4–1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl — [my-first-topic] [my-first-subscription] [e81d3] Prefetched messages: 0 — — Consume throughput received: 0,03 msgs/s — — 0,00 Mbit/s — — Ack sent rate: 0,03 ack/s — — Failed messages: 0 — — Failed acks: 0
基本管理客户端命令
Apache Pulsar提供了一个丰富的管理客户机bin/pulsar-admin
,以获取有关集群状态、主题、订阅等的大量信息。让我们看看其中的一些。
列出可用群集
./bin/pulsar-admin clusters list
standalone
获取有关群集的信息
./bin/pulsar-admin clusters get standalone
{
"serviceUrl" : "http://localhost:8080",
"brokerServiceUrl" : "pulsar://locahost:6650"
}
列出在租户/命名空间下创建的所有主题:
默认情况下,主题创建为“public
”租户和“default
”命名空间下的单个分区持久主题。可以使用以下命令列出在下创建的所有主题:
$ ./bin/pulsar-admin topics list public/default
persistent://public/default/my-first-topic
让我们创建一个新的分区主题:
$ ./bin/pulsar-admin topics create-partitioned-topic --partitions 3 my-partitioned-topic
要列出分区主题,必须使用以下命令:
./bin/pulsar-admin topics list-partitioned-topics public/default
列出主题的所有订阅:
$ ./bin/pulsar-admin topics subscriptions persistent://public/default/my-first-topic
获取有关某个主题的统计信息
./bin/pulsar-admin topics stats persistent://public/default/my-first-topic
这只是对现有命令的一瞥。要了解有关可用命令的更多信息,我强烈建议您阅读官方文档:https://pulsar.apache.org/docs/en/pulsar-admin/
Java客户端
在上一部分中,我们使用pulsar客户机生成/使用了一些消息。Apache Pulsar还为java、GO和C++提供了客户端API,用于编写生产者、消费者和执行管理任务。
让我们创建一个简单的maven项目并添加Apache Pulsar客户端依赖项:
<dependencies>
<dependency>
<groupId> org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.4.1</version>
</dependency>
</dependencies>
写你的第一个Producer
首先,在实例化生产者或消费者之前,必须创建PulsArcClient
:
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
其次,可以从先前创建的客户机实例化一个新的producer客户机。请注意,生产者是附加到一个主题。
Producer<byte[]> producer = client.newProducer()
.topic("my-first-topic")
.create();
默认情况下,生产者希望值以字节数组的形式发送。但是您也可以指定一个模式来生成不同的类型。
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("my-first-topic")
.create();
然后,您可以开始生成一些消息。方法send
将阻塞,直到从代理接收到确认。
producer.send("Hello Streams Word!");
也可以使用sendAsync
方法异步发送消息:
CompletableFuture<MessageId> future = producer.sendAsync("Make sense of streams processing");
future.thenAccept(msgId -> {
System.out.printf("Message with ID %s successfully sent asynchronously\n", msgId);
});
在前面的示例中,我们通过向send/sendAsync
方法传递一个简单的值来发送记录。我们还可以使用给定的键和属性构建消息:
TypedMessageBuilder<String> message = producer.newMessage()
.key("my-key")
.value("value-message")
.property("application", "pulsar-java-quickstart")
.property("pulsar.client.version", "2.4.1");
message.send();
此外,出于性能方面的考虑,通常最好发送成批消息,以便根据吞吐量节省一些网络带宽。在创建producer客户端时可以启用b缓存。
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("my-first-topic")
.compressionType(CompressionType.SNAPPY)
.enableBatching(true)
.batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
.batchingMaxMessages(1000)
.create();
注意,启用批处理时,通常会配置压缩。
最后,永远不要忘记关闭客户端和生产者。
producer.close();
client.close();
写第一个消费者
在下面的示例中,我们将创建一个独占消费者。这意味着只有第一个使用者(对于已配置的订阅)将被分配给主题分区。尝试使用订阅的其他使用者将收到一个错误。
通过使用PulsarClient
可以简单地创建一个新的使用者实例。
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("my-first-topic")
.subscriptionName("my-first-subscription")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
然后,可以在while循环中调用receive()
方法来使用订阅主题中生成的任何消息。
while (true) {
// blocks until a message is available
Message<String> msg = consumer.receive();
try {
System.out.printf("Message received: %s", msg);
// Acknowledge the message so that it can be deleted by the message broker
consumer.acknowledge(msg);
} catch (Exception e) {
// Message failed to process, redeliver later
consumer.negativeAcknowledge(msg);
}
}
Pulsar函数
Pulsar函数是轻量级的过程,可以提交给Apache Pulsar集群来执行consumer transform-produce操作。
Pulsar函数使用来自一个或多个主题的消息,对每个记录应用一个函数,然后将结果生成一个或多个主题。
下面是一个简单的例子:
public class SplitSentenceIntoWords implements Function<String, Void> {
@Override
public Void process(String input, Context context)
throws Exception
{
String[] words = input.split(" ");
for (String word : words) {
context.newOutputMessage("split-words-topic", Schema.STRING)
.value(word)
.send();
}
return null;
}
}
请注意,函数还可以执行有状态操作。
Pulsar函数由称为函数工作者的组件执行,这些组件可以直接由代理或专用代理运行。
最后,Pulsar函数可以用Java、Python和Go编写。
Pulsar IO
Pulsar IO是一种内置功能,用于通过使用连接器将Apache Pulsar集群与数据库或其他消息传递技术等外部系统集成。
Pulsar IO定义了两种类型的连接器:
- Source:源连接器从外部系统捕获数据并将其写入Pulsar主题。
- Sink:Sink连接器使用来自Pulsar主题的消息并将它们写入外部系统。
在引擎盖下,Pulsar IO依靠Pulsar功能来实现和管理连接器。
Apache Pulsar已经为Cassandra、Aerospike、RabbitMQ等提供了连接器。
Web用户界面
对于刚开始使用Apache Pulsar的开发人员,我还推荐由Bruno Bonnin开发的Pulsar express项目(https://github.com/bbonnin/pulsar-express),它提供了一个简单的Web用户界面,用于探索主题、订阅和消费者等。
首先,要启动pulsar express,您可以使用提供的Docker图像,如下所示:
docker run -it -p 3000:3000 --network=host bbonnin/pulsar-express
然后,为独立群集创建一个新连接:http://localhost:3000/connections
最后,您可以探索在集群上创建的主题:
结论
Apache Pulsar是一个设计良好的流媒体平台,它提供了内置的企业级功能,如多租户和分层存储。
此外,Pulsar函数和Pulsar IO为实现复杂流处理应用程序和数据集成管道提供了所有必要的工具。
原文地址:https://medium.com/streamthoughts/introduction-to-apache-pulsar-concepts-architecture-java-clients-71f1a30b75d6
除特别注明外,本站所有文章均为老K的Java博客原创,转载请注明出处来自https://javakk.com/2121.html
暂无评论