Kafka 是如何实现exactly once语义的?

Posted YaoYong_BigData

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka 是如何实现exactly once语义的?相关的知识,希望对你有一定的参考价值。

一、消息语义概述

在分布式系统中,构成系统的任何节点都是被定义为可以彼此独立失败的。比如在 Kafka中,broker可能会crash,在producer推送数据至topic的过程中也可能会遇到网络问题。根据producer处理此类故障所采取的提交策略类型,我们可以获得不同的语义:

  • 至少一次(at-least-once):消息不会丢失,但有可能被重复发送。如果producer收到来自Kafka broker的确认(ack)或者acks = all,则表示该消息已经写入到Kafka。但如果producer ack超时或收到错误,则可能会重试发送消息,客户端会认为该消息未写入Kafka。如果broker在发送Ack之前失败,但在消息成功写入Kafka之后,此重试将导致该消息被写入两次,因此消息会被不止一次地传递给最终consumer,这种策略可能导致重复的工作和不正确的结果。
  • 最多一次(at-most-once):消息可能会丢失,但绝不会被重复发送。如果在ack超时或返回错误时producer不重试,则该消息可能最终不会写入Kafka,因此不会传递给consumer。在大多数情况下,这样做是为了避免重复的可能性,业务上必须接收数据传递可能的丢失。
  • 精确一次(exactly-once):消息不会丢失,也不会被重复发送。即使producer重试发送消息,消息也会保证最多一次地传递给最终consumer。该语义是最理想的,但也难以实现,这是因为它需要消息系统本身与生产和消费消息的应用程序进行协作。例如如果在消费消息成功后,将Kafka consumer的偏移量rollback,我们将会再次从该偏移量开始接收消息。这表明消息传递系统和客户端应用程序必须配合调整才能实现excactly-once。

目前,Kafka 默认提供的交付可靠性保障是第一种,即至少一次

这样虽然不出丢失消息,但是会导致消息重复发送

Kafka 也可以提供最多一次交付保障,只需要让 Producer 禁止重试即可。

这样一来肯定不会重复发送,但是可能会丢失消息。

Kafka 分别通过 幂等性(Idempotence)和事务(Transaction)这两种机制实现了 精确一次(exactly once)语义。

二、必须处理的常见灾难场景

为了清楚描述实现 exactly-once delivery语义的挑战,我们来看一个简单的例子。
假设有某个单进程producer应用在发送"Hello Kafka"到某个单partition topic,有一个运行在其他节点的单实例consumer从topic里拉数据并进行打印。理想情况下如果没有任何灾难发生的话,"Hello Kafka"将会被exactly-once传递,consumer获取消息进行消费并提交commit到Kafka去完成这一次消息处理。即使在这之后consumer挂了或者被重启,也不会再收到这条消息。
然而生产环境错综复杂,灾难场景是无法避免的:

  1. Broker失败:Kafka,作为一个高可用、持久化系统,保证每条消息被持久化并且冗余多份(假设是n份),所以理论上Kafka可以容忍n-1台broker宕机。Kafka的备份机制保证了一旦消息被成功写入leader replica,将会把数据同步到其他所有replica。
  2. Producer到Broker的RPC失败:Kafka的durability特性是基于producer从broker收到的ack的,而没有收到ack并不代表请求肯定失败。Broker可能会在消息被写入之后返回ack之前宕机,同时也可能会在消息被写入topic之前宕机。因为producer没有任何途径可以得知失败的真实原因,而只会尝试重试。在一些场景下,下游consumer会收到若干的重复数据。
  3. 客户端也可能会失败:Exactly-once delivery也必须考虑客户端失败的情况。但是我们如何去区分客户端是真的挂了(永久性宕机)还是说只是暂时丢失心跳?追求正确性的话,broker应该丢弃由zombie producer发送的消息。 consumer也是如此,一旦新的客户端实例已经启动,它必须能够从失败实例的任何状态中恢复,并从安全点(safe checkpoint)开始处理,这意味着消费的偏移量必须始终与生成的输出保持同步。

三、幂等性(Idempotence)---partition内部的exactly-once顺序语义

幂等这个词原是数学领域中的概念,指的是某些操作或函数能够被执行多次,但每次得到的结果都是不变的。

幂等性最大的优势在于我们可以安全地重试任何幂等性操作,反正它们也不会破坏我们的系统状态。

在 Kafka 中,Producer 默认不是幂等性的,但我们可以创建幂等性 Producer。它其实是 0.11.0.0 版本引入的新功能。指定 Producer 幂等性的方法很简单,仅需要设置一个参数即可,即 props.put(“enable.idempotence”, ture),或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)。

enable.idempotence 被设置成 true 后,Producer 自动升级成幂等性 Producer,其他所有的代码逻辑都不需要改变。Kafka 自动帮你做消息的重复去重。

为了实现生产者的幂等性,Kafka引入了 Producer ID(PID)和 Sequence Number的概念。

  1.  PID:每个Producer在初始化时,都会分配一个唯一的PID,作为每个Producer会话的唯一标识,这个PID对用户来说,是透明的。
  2.  Sequence Number:针对每个生产者(对应PID)发送到指定主题分区的消息都对应一个从0开始递增的Sequence Number。

Kafka Broker 收到消息后会以 ProducerID 为单位存储 SequenceNumber,也就是说即使 Producer 重复发送了, Broker 端也会将其过滤掉。

由于引入了幂等性,在每条消息中附带了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber发送给Broker,而之前Broker缓存过之前发送的相同的消息,那么在消息流中的消息就只有一条,不会出现重复发送的情况。

Broker端也会为每个<PID, Topic, Partition>维护一个序号,并且每次Commit一条消息时将其对应序号递增。对于接收的每条消息,如果其序号比Broker维护的序号(即最后一次Commit的消息的序号)大一,则Broker会接受它,否则将其丢弃:

  • 如果消息序号比Broker维护的序号大一以上,说明中间有数据尚未写入,也即乱序,此时Broker拒绝该消息,Producer抛出InvalidSequenceNumber
  • 如果消息序号小于等于Broker维护的序号,说明该消息已被保存,即为重复消息,Broker直接丢弃该消息,Producer抛出DuplicateSequenceNumber

上述设计解决了0.11.0.0之前版本中的两个问题:

  • Broker保存消息后,发送ACK前宕机,Producer认为消息未发送成功并重试,造成数据重复;
  • 前一条消息发送失败,后一条消息发送成功,前一条消息重试后成功,造成数据乱序。

实现比较简单,同样的限制也比较大:

        首先,它只能保证单分区上的幂等性。即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。因为 SequenceNumber 是以 Topic + Partition 为单位单调递增的,如果一条消息被发送到了多个分区必然会分配到不同的 SequenceNumber ,导致重复问题。
        其次,它只能实现单会话上的幂等性。不能实现跨会话的幂等性。当你重启 Producer 进程之后,这种幂等性保证就丧失了。重启 Producer 后会分配一个新的 ProducerID,相当于之前保存的 SequenceNumber 就丢失了。

四、事务性:跨partition的原子性写操作

上述幂等设计只能保证单个 Producer 对于同一个 <Topic, Partition> 的 Exactly Once 语义。

Kafka 现在通过新的事务 API 支持跨分区原子写入。这将允许一个生产者发送一批到不同分区的消息,这些消息要么全部对任何一个消费者可见,要么对任何一个消费者都不可见。这个特性也允许在一个事务中处理消费数据和提交消费偏移量,从而实现端到端的精确一次语义。

为了实现这种效果,应用程序必须提供一个稳定的(重启后不变)唯一的 ID,也即Transaction ID 。 Transactin ID 与 PID 可能一一对应。区别在于 Transaction ID 由用户提供,将生产者的 transactional.id 配置项设置为某个唯一ID。而 PID 是内部的实现对用户透明。

另外,为了保证新的 Producer 启动后,旧的具有相同 Transaction ID 的 Producer 失效,每次 Producer 通过 Transaction ID 拿到 PID 的同时,还会获取一个单调递增的 epoch。由于旧的 Producer 的 epoch 比新 Producer 的 epoch 小,Kafka 可以很容易识别出该 Producer 是老的 Producer 并拒绝其请求。

下面是的代码片段演示了事务 API 的使用:

Producer<String, String> producer = new KafkaProducer<String, String>(props);
// 初始化事务,包括结束该Transaction ID对应的未完成的事务(如果有)
// 保证新的事务在一个正确的状态下启动
producer.initTransactions();
// 开始事务
producer.beginTransaction();
// 消费数据
ConsumerRecords<String, String> records = consumer.poll(100);
try
    // 发送数据
    producer.send(new ProducerRecord<String, String>("Topic", "Key", "Value"));
    // 发送消费数据的Offset,将上述数据消费与数据发送纳入同一个Transaction内
    producer.sendOffsetsToTransaction(offsets, "group1");
    // 数据发送及Offset发送均成功的情况下,提交事务
    producer.commitTransaction();
 catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) 
    // 数据发送或者Offset发送出现异常时,终止事务
    producer.abortTransaction();
 finally 
    // 关闭Producer和Consumer
    producer.close();
    consumer.close();

 

需要注意的是,上述的事务保证是从Producer的角度去考虑的。从Consumer的角度来看,该保证会相对弱一些。尤其是不能保证所有被某事务Commit过的所有消息都被一起消费,因为:

  • 对于压缩的Topic而言,同一事务的某些消息可能被其它版本覆盖;
  • 事务包含的消息可能分布在多个Segment中(即使在同一个Partition内),当老的Segment被删除时,该事务的部分数据可能会丢失;
  • Consumer在一个事务内可能通过seek方法访问任意Offset的消息,从而可能丢失部分消息
  • Consumer可能并不需要消费某一事务内的所有Partition,因此它将永远不会读取组成该事务的所有消息。

以上是关于Kafka 是如何实现exactly once语义的?的主要内容,如果未能解决你的问题,请参考以下文章

Kafka 是如何实现exactly once语义的?

Kafka Exactly Once语义与事务机制原理

Kafka设计解析- Exactly Once语义与事务机制原理

kafka实现无消息丢失与精确一次语义(exactly once)处理

Flink是如何实现exactly-once语义的

SparkStreaming实现Exactly-Once语义