pulsar 中的 exactly once 语义

Posted 回归心灵

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了pulsar 中的 exactly once 语义相关的知识,希望对你有一定的参考价值。

Exactly once 语义

在使用 Pulsar 过程中任何节点都有可能出现异常甚至宕机,当 Producer 生产消息时,pulsar 集群可能会发生 Broker 或 Bookie 异常不可用,或者网络突然中断等异常情况。根据在发生异常时 Producer 处理消息的方式,系统可以具备以下三种消息语义。

At-least-once (至少一次)语义

Producer 通过接收 Broker 的 ACK (消息确认)通知来确保消息成功写入 Pulsar Topic。然而,当 Producer 接收 ACK 通知超时,或者收到 Broker 出错信息时,会尝试重新发送消息。如果 Broker 正好在成功把消息写入到 Topic,但还没有给 Producer 发送 ACK 时宕机,Producer 重新发送的消息会被再次写入到 Topic,最终导致消息被重复分发至 Consumer。

At-most-once (最多一次)语义

当 Producer 在接收 ACK 超时,或者收到 Broker 出错信息时不重发消息,那就有可能导致这条消息丢失,没有写入到 Topic 中,也不会被 Consumer 消费到。在某些场景下,为了避免发生重复消费,我们可以容许消息丢失的发生。

Exactly-once (精确一次)语义

Exactly-once 语义保证了即使 Producer 多次发送同一条消息到服务端,服务端也仅仅会记录一次。

pulsar 中 Exactly-once 语义并不包含 consumer 端只消费一次的场景。因为真正意义上的Exactly-Once依赖消息系统的服务端、消息系统的客户端和用户消费逻辑这三者状态的协调。消息系统不可能保证 consumer 只会接收一次消息,在 consumer 由于网络等原因未及时 ack 消息时,pulsar broker 就会重复投递消息给 consumer,这时候需要做的是保证 consumer 消费幂等。可能使用 “有效一次” 来描述更恰当些。

单个 Topic 的 Exactly-once 语义

从 Pulsar 1.20.0-incubating 版本开始可以通过幂等性 Producer 和 pulsar server 端消息去重来保证单个 Topic 上的 Exactly-once 语义。

什么是幂等性 Producer ?幂等性就是指对于同一操作发起的一次或者多次请求的结果是一致的,不会因为多次操作而产生不同的结果。当出现由于异常导致 Producer 重发消息时,重复的消息只会在 Broker 中写入一次。

可以通过以下方式来开启消息去重和设置幂等性 producer:

  • 在 Cluster 级别(针对所有 Namespace 下的 Topic 有效),Namespace 级别(针对该 Namespace下的 Topic 有效)或者 Topic 级别 (针对单个 Topic 有效)开启消息去重:
bin/pulsar-admin namespaces set-deduplication \\
  public/default \\
  --enable # or just -e
  • 为 Producer 设置任意的名称并且设置消息超时时间为 0
PulsarClient pulsarClient = PulsarClient.builder()
        .serviceUrl("pulsar://localhost:6650")
        .build();
Producer producer = pulsarClient.newProducer()
        .producerName("producer-1")
        .topic("persistent://public/default/topic-1")
        .sendTimeout(0, TimeUnit.SECONDS)
        .create();

实现原理:每条发送给 Pulsar 的消息都会带有一个唯一的序列号,Pulsar Broker 利用这个序列号来判断和去除重复的消息,当接收的消息的 sequenceId 小于等于 pulsar 记录的最大 sequenceId,即为重复消息。 Pulsar 会把消息体中的序列号保存到 Topic 中,并且记录最新接收到的序列号。所以哪怕 Broker 节点出现异常宕机了,另一个重新接管处理该 Topic 的 Broker 节点也可以判断消息是否重复。

多个Topic 的 Exactly-once 语义

幂等性 Producer 只能保证单个 topic 上的 exactly-once 语义,当一条消息要发送到多个 topic 时,就不能保证多个操作的原子性。

Pulsar 2.8.0 引入事务消息,我们可以通过事务 API 来实现多个 topic 发送消息的原子操作,要么都成功要么都失败。也可以在一个事务中对多个 topic 上的消息进行 ACK 确认。

在流处理系统中,常见的操作是 read-process-write。即从一个或多个 topic 中读取消息,进过程序加工处理后得出结果,最后把结果写入另一个 topic。在这个过程中如果不使用事务消息,就有可能出现结果重复或者消息丢失的情况。

  • 如果执行流程是 producer 先发送消息,然后 consumer 再 ACK 消息,程序在 ACK 前发生异常,consumer 未 ACK 成功,程序恢复后会再次消费消息,发送新计算的结果给 topic,这就会导致消息重复。

  • 如果执行流程是 consumer 先 ACK 消息,然后 producer 再发送消息,程序在 producer 发送前发生异常,程序恢复后由于消息已经 ACK ,消息将不会再次消费,这就会导致消息丢失问题。

利用事务消息实现端到端的 Exactly-once 语义。

//start
PulsarClient pulsarClient = PulsarClient.builder()
        .serviceUrl("pulsar://localhost:6650")
        .enableTransaction(true)
        .build();
Transaction txn = pulsarClient
        .newTransaction()
        .withTransactionTimeout(1, TimeUnit.MINUTES)
        .build()
        .get();

//receive
Message<String> message1 = consumer1.receive();
Message<String> message2 = consumer2.receive();

//process
String result = message1.getData() + " " + message1.getData();
System.out.println(result);

//publish
producer.newMessage(txn).value(result.getBytes()).send();

//ack
consumer1.acknowledgeAsync(message1.getMessageId(), txn).get();
consumer2.acknowledgeAsync(message2.getMessageId(), txn).get();

//commit
txn.commit().get();

pulsar 事务消息主要概念

事务协调器

事务协调器(TC)是运行在 Pulsar Broker 中的一个模块。

  • 它维护事务的整个生命周期,并防止事务进入错误状态。
  • 它处理事务超时,并确保事务在事务超时后中止。

事务日志

所有事务元数据都保存在事务日志中。 事务日志由 Pulsar 主题记录。 如果事务协调器崩溃,它可以从事务日志恢复事务元数据。

事务缓存

向事务内的主题分区生成的消息存储在该主题分区的事务缓冲区(TB)中。 在提交事务之前,事务缓冲区中的消息对消费者不可见。 提交事务后,事务缓冲区中的消息对消费者可见。 当事务中止时,事务缓冲区中的消息将被丢弃。

事务 ID

事务ID(TxnID)标识Pulsar中的唯一事务。 事务 ID 长度是 128-bit。 最高 16 位保留给事务协调器的 ID,其余位用于每个事务协调器中单调递增的数字。

待确认状态

待确认状态是消息 ack 后,事务未 commit 前的状态。

pulsar 事务原理浅析

1、开启事务

2、生产事务消息

3、确认事务消息

4、结束事务请求

5、完成事务

引用

1、https://segmentfault.com/a/1190000041013432
2、https://pulsar.apache.org/docs/zh-CN/txn-how/

以上是关于pulsar 中的 exactly once 语义的主要内容,如果未能解决你的问题,请参考以下文章

pulsar 中的 exactly once 语义

21.Flink-高级特性-新特性-Exactly-Once数据一致性语义分类如何实现局部的Exactly-Once分布式快照/Checkpoint

21.Flink-高级特性-新特性-Exactly-Once数据一致性语义分类如何实现局部的Exactly-Once分布式快照/Checkpoint

流处理引擎(SPE)中的的分布式一致性语义之Exactly-Once和Effectively-Onece区别

SparkStreaming实现Exactly-Once语义

Kafka 是如何实现exactly once语义的?