3年前 (2021-07-14)  相关技术 |   抢沙发  2187 
文章评分 0 次,平均分 0.0
[收起] 文章目录

在上一篇中介绍了Apache Pulsar的相关概念和架构体现,这一篇说明如何通过客户端程序操作Apache Pulsar

Apache Pulsar入门

现在我们对什么是Apache Pulsar以及它是如何工作的有了更好的了解,让我们从中获得一些乐趣。

1. 首先,我们要在机器上安装并部署一个独立的集群。

$ wget https://archive.apache.org/dist/pulsar/pulsar-2.4.1/apache-pulsar-2.4.1-bin.tar.gz
$ tar -xzvf apache-pulsar-2.4.1-bin.tar.gz && cd apache-pulsar-2.4.1
$ ./bin/pulsar standalone

2. 然后,让我们打开一个新的终端,使用pulsar客户端启动一个新的消费者。

$ ./bin/pulsar-client consume -s "my-first-subscription" my-first-topic -n 10

上面的命令创建一个独占使用者,在停止之前等待10条消息。

3. 最后,在另一个新终端中,我们将生成一些消息。

$> bin/pulsar-client produce my-first-topic — messages "Hello Streams World, Make sense of streams processing"

现在,如果你回到消费终端,你会看到这样的输出:

[pulsar-client-io-1–1] WARN com.scurrilous.circe.checksum.Crc32cIntChecksum — Failed to load Circe JNI library. Falling back to Java based CRC32c provider
 — — — got message — — -
Hello Streams World
 — — — got message — — -
 Make sense of streams processing
[pulsar-timer-4–1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl — [my-first-topic] [my-first-subscription] [e81d3] Prefetched messages: 0 — — Consume throughput received: 0,03 msgs/s — — 0,00 Mbit/s — — Ack sent rate: 0,03 ack/s — — Failed messages: 0 — — Failed acks: 0

基本管理客户端命令

Apache Pulsar提供了一个丰富的管理客户机bin/pulsar-admin,以获取有关集群状态、主题、订阅等的大量信息。让我们看看其中的一些。

列出可用群集

./bin/pulsar-admin clusters list 
standalone

获取有关群集的信息

./bin/pulsar-admin clusters get standalone
{
 "serviceUrl" : "http://localhost:8080",
 "brokerServiceUrl" : "pulsar://locahost:6650"
}

列出在租户/命名空间下创建的所有主题:

默认情况下,主题创建为“public”租户和“default”命名空间下的单个分区持久主题。可以使用以下命令列出在下创建的所有主题:

$ ./bin/pulsar-admin topics list public/default
persistent://public/default/my-first-topic

让我们创建一个新的分区主题:

$ ./bin/pulsar-admin topics create-partitioned-topic --partitions 3 my-partitioned-topic

要列出分区主题,必须使用以下命令:

./bin/pulsar-admin topics list-partitioned-topics public/default

列出主题的所有订阅:

$ ./bin/pulsar-admin topics subscriptions persistent://public/default/my-first-topic

获取有关某个主题的统计信息

./bin/pulsar-admin topics stats persistent://public/default/my-first-topic

这只是对现有命令的一瞥。要了解有关可用命令的更多信息,我强烈建议您阅读官方文档:https://pulsar.apache.org/docs/en/pulsar-admin/

Java客户端

在上一部分中,我们使用pulsar客户机生成/使用了一些消息。Apache Pulsar还为java、GO和C++提供了客户端API,用于编写生产者、消费者和执行管理任务。

让我们创建一个简单的maven项目并添加Apache Pulsar客户端依赖项:

<dependencies>
    <dependency>
        <groupId> org.apache.pulsar</groupId>
        <artifactId>pulsar-client</artifactId>
        <version>2.4.1</version>
    </dependency>
</dependencies>

写你的第一个Producer

首先,在实例化生产者或消费者之前,必须创建PulsArcClient

PulsarClient client = PulsarClient.builder()
        .serviceUrl("pulsar://localhost:6650")
        .build();

其次,可以从先前创建的客户机实例化一个新的producer客户机。请注意,生产者是附加到一个主题。

Producer<byte[]> producer = client.newProducer()
        .topic("my-first-topic")
        .create();

默认情况下,生产者希望值以字节数组的形式发送。但是您也可以指定一个模式来生成不同的类型。

Producer<String> producer = client.newProducer(Schema.STRING)
        .topic("my-first-topic")
        .create();

然后,您可以开始生成一些消息。方法send将阻塞,直到从代理接收到确认。

producer.send("Hello Streams Word!");

也可以使用sendAsync方法异步发送消息:

CompletableFuture<MessageId> future = producer.sendAsync("Make sense of streams processing");
future.thenAccept(msgId -> {
    System.out.printf("Message with ID %s successfully sent asynchronously\n", msgId);
});

在前面的示例中,我们通过向send/sendAsync方法传递一个简单的值来发送记录。我们还可以使用给定的键和属性构建消息:

TypedMessageBuilder<String> message = producer.newMessage()
        .key("my-key")
        .value("value-message")
        .property("application", "pulsar-java-quickstart")
        .property("pulsar.client.version", "2.4.1");
message.send();

此外,出于性能方面的考虑,通常最好发送成批消息,以便根据吞吐量节省一些网络带宽。在创建producer客户端时可以启用b缓存。

Producer<String> producer = client.newProducer(Schema.STRING)
        .topic("my-first-topic")
        .compressionType(CompressionType.SNAPPY)
        .enableBatching(true)
        .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
        .batchingMaxMessages(1000)
        .create();

注意,启用批处理时,通常会配置压缩。

最后,永远不要忘记关闭客户端和生产者。

producer.close();
client.close();

写第一个消费者

在下面的示例中,我们将创建一个独占消费者。这意味着只有第一个使用者(对于已配置的订阅)将被分配给主题分区。尝试使用订阅的其他使用者将收到一个错误。

通过使用PulsarClient可以简单地创建一个新的使用者实例。

Consumer<String> consumer = client.newConsumer(Schema.STRING)
        .topic("my-first-topic")
        .subscriptionName("my-first-subscription")
        .subscriptionType(SubscriptionType.Exclusive)
        .subscribe();

然后,可以在while循环中调用receive()方法来使用订阅主题中生成的任何消息。

while (true) {
    // blocks until a message is available
    Message<String> msg = consumer.receive();

    try {
        System.out.printf("Message received: %s", msg);

        // Acknowledge the message so that it can be deleted by the message broker
        consumer.acknowledge(msg);
    } catch (Exception e) {
        // Message failed to process, redeliver later
        consumer.negativeAcknowledge(msg);
    }
}

Pulsar函数

Pulsar函数是轻量级的过程,可以提交给Apache Pulsar集群来执行consumer transform-produce操作。

Pulsar函数使用来自一个或多个主题的消息,对每个记录应用一个函数,然后将结果生成一个或多个主题。

下面是一个简单的例子:

public class SplitSentenceIntoWords implements Function<String, Void> {

    @Override
    public Void process(String input, Context context) 
     throws Exception 
    {
        String[] words = input.split(" ");
        for (String word : words) {
            context.newOutputMessage("split-words-topic", Schema.STRING)
                .value(word)
                .send();
        }
        return null;
    }
}

请注意,函数还可以执行有状态操作。

Pulsar函数由称为函数工作者的组件执行,这些组件可以直接由代理或专用代理运行。

最后,Pulsar函数可以用Java、Python和Go编写。

Pulsar IO

Pulsar IO是一种内置功能,用于通过使用连接器将Apache Pulsar集群与数据库或其他消息传递技术等外部系统集成。

Pulsar IO定义了两种类型的连接器:

  • Source:源连接器从外部系统捕获数据并将其写入Pulsar主题。
  • Sink:Sink连接器使用来自Pulsar主题的消息并将它们写入外部系统。

在引擎盖下,Pulsar IO依靠Pulsar功能来实现和管理连接器。

Apache Pulsar已经为Cassandra、Aerospike、RabbitMQ等提供了连接器。

Web用户界面

对于刚开始使用Apache Pulsar的开发人员,我还推荐由Bruno Bonnin开发的Pulsar express项目(https://github.com/bbonnin/pulsar-express),它提供了一个简单的Web用户界面,用于探索主题、订阅和消费者等。

首先,要启动pulsar express,您可以使用提供的Docker图像,如下所示:

docker run -it -p 3000:3000 --network=host bbonnin/pulsar-express

然后,为独立群集创建一个新连接:http://localhost:3000/connections

Apache Pulsar系列之-客户端

最后,您可以探索在集群上创建的主题:

Apache Pulsar系列之-客户端

结论

Apache Pulsar是一个设计良好的流媒体平台,它提供了内置的企业级功能,如多租户和分层存储。

此外,Pulsar函数和Pulsar IO为实现复杂流处理应用程序和数据集成管道提供了所有必要的工具。

原文地址:https://medium.com/streamthoughts/introduction-to-apache-pulsar-concepts-architecture-java-clients-71f1a30b75d6

 

除特别注明外,本站所有文章均为老K的Java博客原创,转载请注明出处来自https://javakk.com/2121.html

关于

发表评论

表情 格式

暂无评论

登录

忘记密码 ?

切换登录

注册