Apache pulsar producer接口详解

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache pulsar producer接口详解相关的知识,希望对你有一定的参考价值。

参考技术A 生产者以同步或者异步的方式将消息发送给broker,pulsar提供以下三种消息发送的接口:

关闭一个producer,提供以下两种方式:

拦截器接口主要提供以下三个方法:

为了降低网络带宽,消息发送的时候,会对消息进行压缩,pulsar提供了如下三种消息压缩的方式:

给生产者指定一个名字,如果没有手动指定,系统将会为该producer生成一个全局唯一的名字。
注意:在指定名称时,用户需要确保对于给定的topic,生产者名称在所有Pulsar的集群中是唯一的。 broker将强制执行只有一个给定名称的producer可以在topic上进行publish。

添加producer用来加密数据密钥的public key.
当producer创建时,Pulsar客户端会检查是否有密钥添加到encryptionKeys中。 如果找到要添加的key,则针对每个key调用回调函数getKey(String keyName)来获取key的值。 应用程序应实现这个回调并返回pkcs8格式的密钥。 如果启用了压缩功能,则压缩后会对消息进行加密。 如果启用了批处理消息传递,则批处理消息也将被加密。

指定该producer将要把消息publish到哪一个topic

当输出(outgoing)队列已满时,是否停止相应的操作

设置包含待处理消息的队列的最大大小,以便从broker接收确认。
当队列已满时,默认的,所有的调用都会失败,除非blockIfQueueFull设置为true。可以使用blockIfQueueFull来改变这个行为。

设置所有分区中的最大挂起消息数,此设置将用于降低每个分区的最大挂起消息

为producer生产的消息设置最开始的sequenceID,如果没有额外指定,那么接下来生产的第一条消息的sequenceID就是initialSequenceId + 1,第二条消息的sequenceID依次递增。

基于当前的producer,copy一个producer出来。比如我们需要创建多个producer,并且这些producer有一些相同的属性可以复用,那么我们就可以基于这些相同的属性进行copy,然后对copy后的producer在做定制化的配置

获取producer publish的最后一个sequenceID。
如果系统中有两个名字一样的producer(原则上,这是不被允许的),该函数将返回在上一个producer发布的最后一条消息的sequenceID,如果没有消息被publish,则返回-1。

pulsar 中的 exactly once 语义

Exactly once 语义

在使用 Pulsar 过程中任何节点都有可能出现异常甚至宕机,当 Producer 生产消息时,pulsar 集群可能会发生 Broker 或 Bookie 异常不可用,或者网络突然中断等异常情况。根据在发生异常时 Producer 处理消息的方式,系统可以具备以下三种消息语义。

At-least-once (至少一次)语义

Producer 通过接收 Broker 的 ACK (消息确认)通知来确保消息成功写入 Pulsar Topic。然而,当 Producer 接收 ACK 通知超时,或者收到 Broker 出错信息时,会尝试重新发送消息。如果 Broker 正好在成功把消息写入到 Topic,但还没有给 Producer 发送 ACK 时宕机,Producer 重新发送的消息会被再次写入到 Topic,最终导致消息被重复分发至 Consumer。

At-most-once (最多一次)语义

当 Producer 在接收 ACK 超时,或者收到 Broker 出错信息时不重发消息,那就有可能导致这条消息丢失,没有写入到 Topic 中,也不会被 Consumer 消费到。在某些场景下,为了避免发生重复消费,我们可以容许消息丢失的发生。

Exactly-once (精确一次)语义

Exactly-once 语义保证了即使 Producer 多次发送同一条消息到服务端,服务端也仅仅会记录一次。

pulsar 中 Exactly-once 语义并不包含 consumer 端只消费一次的场景。因为真正意义上的Exactly-Once依赖消息系统的服务端、消息系统的客户端和用户消费逻辑这三者状态的协调。消息系统不可能保证 consumer 只会接收一次消息,在 consumer 由于网络等原因未及时 ack 消息时,pulsar broker 就会重复投递消息给 consumer,这时候需要做的是保证 consumer 消费幂等。可能使用 “有效一次” 来描述更恰当些。

单个 Topic 的 Exactly-once 语义

从 Pulsar 1.20.0-incubating 版本开始可以通过幂等性 Producer 和 pulsar server 端消息去重来保证单个 Topic 上的 Exactly-once 语义。

什么是幂等性 Producer ?幂等性就是指对于同一操作发起的一次或者多次请求的结果是一致的,不会因为多次操作而产生不同的结果。当出现由于异常导致 Producer 重发消息时,重复的消息只会在 Broker 中写入一次。

可以通过以下方式来开启消息去重和设置幂等性 producer:

  • 在 Cluster 级别(针对所有 Namespace 下的 Topic 有效),Namespace 级别(针对该 Namespace下的 Topic 有效)或者 Topic 级别 (针对单个 Topic 有效)开启消息去重:
bin/pulsar-admin namespaces set-deduplication \\
  public/default \\
  --enable # or just -e
  • 为 Producer 设置任意的名称并且设置消息超时时间为 0
PulsarClient pulsarClient = PulsarClient.builder()
        .serviceUrl("pulsar://localhost:6650")
        .build();
Producer producer = pulsarClient.newProducer()
        .producerName("producer-1")
        .topic("persistent://public/default/topic-1")
        .sendTimeout(0, TimeUnit.SECONDS)
        .create();

实现原理:每条发送给 Pulsar 的消息都会带有一个唯一的序列号,Pulsar Broker 利用这个序列号来判断和去除重复的消息,当接收的消息的 sequenceId 小于等于 pulsar 记录的最大 sequenceId,即为重复消息。 Pulsar 会把消息体中的序列号保存到 Topic 中,并且记录最新接收到的序列号。所以哪怕 Broker 节点出现异常宕机了,另一个重新接管处理该 Topic 的 Broker 节点也可以判断消息是否重复。

多个Topic 的 Exactly-once 语义

幂等性 Producer 只能保证单个 topic 上的 exactly-once 语义,当一条消息要发送到多个 topic 时,就不能保证多个操作的原子性。

Pulsar 2.8.0 引入事务消息,我们可以通过事务 API 来实现多个 topic 发送消息的原子操作,要么都成功要么都失败。也可以在一个事务中对多个 topic 上的消息进行 ACK 确认。

在流处理系统中,常见的操作是 read-process-write。即从一个或多个 topic 中读取消息,进过程序加工处理后得出结果,最后把结果写入另一个 topic。在这个过程中如果不使用事务消息,就有可能出现结果重复或者消息丢失的情况。

  • 如果执行流程是 producer 先发送消息,然后 consumer 再 ACK 消息,程序在 ACK 前发生异常,consumer 未 ACK 成功,程序恢复后会再次消费消息,发送新计算的结果给 topic,这就会导致消息重复。

  • 如果执行流程是 consumer 先 ACK 消息,然后 producer 再发送消息,程序在 producer 发送前发生异常,程序恢复后由于消息已经 ACK ,消息将不会再次消费,这就会导致消息丢失问题。

利用事务消息实现端到端的 Exactly-once 语义。

//start
PulsarClient pulsarClient = PulsarClient.builder()
        .serviceUrl("pulsar://localhost:6650")
        .enableTransaction(true)
        .build();
Transaction txn = pulsarClient
        .newTransaction()
        .withTransactionTimeout(1, TimeUnit.MINUTES)
        .build()
        .get();

//receive
Message<String> message1 = consumer1.receive();
Message<String> message2 = consumer2.receive();

//process
String result = message1.getData() + " " + message1.getData();
System.out.println(result);

//publish
producer.newMessage(txn).value(result.getBytes()).send();

//ack
consumer1.acknowledgeAsync(message1.getMessageId(), txn).get();
consumer2.acknowledgeAsync(message2.getMessageId(), txn).get();

//commit
txn.commit().get();

pulsar 事务消息主要概念

事务协调器

事务协调器(TC)是运行在 Pulsar Broker 中的一个模块。

  • 它维护事务的整个生命周期,并防止事务进入错误状态。
  • 它处理事务超时,并确保事务在事务超时后中止。

事务日志

所有事务元数据都保存在事务日志中。 事务日志由 Pulsar 主题记录。 如果事务协调器崩溃,它可以从事务日志恢复事务元数据。

事务缓存

向事务内的主题分区生成的消息存储在该主题分区的事务缓冲区(TB)中。 在提交事务之前,事务缓冲区中的消息对消费者不可见。 提交事务后,事务缓冲区中的消息对消费者可见。 当事务中止时,事务缓冲区中的消息将被丢弃。

事务 ID

事务ID(TxnID)标识Pulsar中的唯一事务。 事务 ID 长度是 128-bit。 最高 16 位保留给事务协调器的 ID,其余位用于每个事务协调器中单调递增的数字。

待确认状态

待确认状态是消息 ack 后,事务未 commit 前的状态。

pulsar 事务原理浅析

1、开启事务

2、生产事务消息

3、确认事务消息

4、结束事务请求

5、完成事务

引用

1、https://segmentfault.com/a/1190000041013432
2、https://pulsar.apache.org/docs/zh-CN/txn-how/

以上是关于Apache pulsar producer接口详解的主要内容,如果未能解决你的问题,请参考以下文章

在 Apache Pulsar 中使用注册模式发布到主题

Pulsar 介绍

Pulsar Reader 例子

pulsar 中的 exactly once 语义

pulsar 中的 exactly once 语义

pulsar 中的 exactly once 语义