吃透Kafka三:详细分析生产和消费对消息的保障
Posted 吃透Java
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了吃透Kafka三:详细分析生产和消费对消息的保障相关的知识,希望对你有一定的参考价值。
一,kafka消息传递的三种语义
- At most once:最多一次,消息可能会丢失,但不会重复。
- At least once:最少一次,消息不会丢失,可能会重复。
- Exactly once:只且一次,消息不丢失不重复,只且消费一次。
我们使用kafka肯定是希望Exactly once,要保证Exactly once效率上面需要做出一定的牺牲,那么是要Exactly once还是效率就要结合具体业务来分析了。但是整体的消息投递语义需要 Producer 端和 Consumer 端两者来保证。
二,Producer 消息生产端
当 producer 向 broker 发送一条消息,这时网络出错了,producer 无法得知 broker 是否接受到了这条消息。网络出错可能是发生在消息传递的过程中,也可能发生在 broker 已经接受到了消息,并返回 ack 给 producer 的过程中。
这时,producer 会有两种选择:
- 不管了(At most once),消息可能会丢失
- 再发一次(At least once),消息可能会重复
那么如何保证Exactly once,就要结合Ack和send返回结果来保证了。
1,ack确认机制:
- Ack=0,相当于异步发送,意味着producer不等待broker同步完成,消息发送完毕继续发送下一批信息。提供了最低延迟,但持久性最弱,当服务器发生故障时很可能发生数据丢失。如果leader死亡,producer继续发送消息,broker接收不到数据就会造成数据丢失。
- Ack=1,producer要等待leader成功收到消息并确认,才发送下一条message。提供较低的延迟性以及较好的持久性。但是如果partition下的leader死亡,而follower尚未复制数据,数据就会丢失。
- Ack=-1,leader收到所有消息,且follower同步完数据,才发送下一条数据。延迟性最差,持久性最好(即可靠性最好)。
2,send发送函数:
Kafka为生产者生产消息提供了一个 send(producerRecord)
方法,另有一个重载的方法send(producerRecord, callback)
。
1,通过send方法返回的future,同步阻塞拿到结果。
Future<RecordMetadata> future = producer.send(record)
上面的示例代码也可以看到,send
返回的是一个 Future
,也就是说其实你是可以 Future.get()
获取返回值,但是future是同步阻塞的,这种效率比较低。
2,通过send方法提供的异步回到拿到结果。
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("topic", key, value);
producer.send(myRecord,
new Callback()
public void onCompletion(RecordMetadata metadata, Exception e)
);
send(msg, callback)该方法可以将一条消息发送出去,并且可以从callback
回调中得到该条消息的发送结果, 并且callback
是异步回调,所以在兼具性能的情况下,也对消息具有比较好的掌控。
3,幂等producer
所谓幂等producer指producer.send的逻辑是幂等的,即发送相同的Kafka消息,broker端不会重复写入消息。同一条消息Kafka保证底层日志中只会持久化一次,既不会丢失也不会重复。幂等性可以极大地减轻下游consumer系统实现消息去重的工作负担,因此是非常实用的功能。值得注意的是,幂等producer提供的语义保证是有条件的:
- 单分区幂等性:幂等producer无法实现多分区上的幂等性。如前所述,若要实现多分区上的原子性,需要引入事务。
- 单会话幂等性:幂等producer无法跨会话实现幂等性。即使同一个producer宕机并重启也无法保证消息的EOS语义。
虽然有上面两个限制,幂等producer依然是一个非常实用的新功能。下面我们来讨论下它的设计原理。如果要实现幂等性, 通常都需要花费额外的空间来保存状态以执行消息去重。Kafka的幂等producer整体上也是这样的思想。
首先,producer对象引入了一个新的字段:Producer ID(下称PID),它唯一标识一个producer,当producer启动时Kafka会为每个producer分配一个PID(64位整数),因此PID的生成和分配对用户来说是完全透明的,用户无需考虑PID的事情,甚至都感受不到PID的存在。
其次,0.11 Kafka重构了消息格式,引入了序列号字段(sequence number,下称seq number)来标识某个PID producer发送的消息。和consumer端的offset类似,seq number从0开始计数并严格单调增加。同时在broker端会为每个PID(即每个producer)保存该producer发送过来的消息batch的某些元信息,比如PID信息、消息batch的起始seq number及结束seq number等。这样每当该PID发送新的消息batch时,Kafka broker就会对比这些信息,如果发生冲突(比如起始seq number和结束seq number与当前缓存的相同),那么broker就会拒绝这次写入请求。倘若没有冲突,那么broker端就会更新这部分缓存然后再开始写入消息。
这就是Kafka实现幂等producer的设计思路:
- 为每个producer设置唯一的PID。
- 引入seq number以及broker端seq number缓存更新机制来去重。
要启用幂等性,只需要将 Producer 的参数中 enable.idompotence 设置为 true 即可。
4,producer事务
事务可以支持多分区的数据完整性,原子性。并且支持跨会话的exactly once处理语义,也就是说如果producer宕机重启,依旧能保证数据只处理一次。
开启事务也很简单,首先需要开启幂等性,即设置enable.idempotence为true。然后对producer发送代码做一些小小的修改。
//初始化事务
producer.initTransactions();
try
//开启一个事务
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
//提交
producer.commitTransaction();
catch (KafkaException e)
//出现异常的时候,终止事务
producer.abortTransaction();
所以无论开启幂等还是事务的特性,都会对性能有一定影响,这是必然的。所以kafka默认也并没有开启这两个特性,而是交由开发者根据自身业务特点进行处理。
三,Consumer 消息消费端
1,at-most-once
最多一次消费语义是kafka消费者的默认实现。配置这种消费者最简单的方式是:
- enable.auto.commit设置为true。
- auto.commit.interval.ms设置为一个较低的时间范围。
由于上面的配置,此时kafka会有一个独立的线程负责按照指定间隔提交offset。
消费者的offset已经提交,但是消息还在处理中(还没有处理完),这个时候程序挂了,导致数据没有被成 功处理,再重启的时候会从上次提交的offset处消费,导致上次没有被成功处理的消息就丢失了。
2,at-least-once
实现最少一次消费语义的消费者也很简单:
- 设置enable.auto.commit为false。
- 消息处理完之后手动调用consumer.commitSync()。
这种方式是在消费数据之后,手动调用函数consumer.commitSync()异步提交offset,有可能处理多次的场景是消费者的消息处理完并输出到结果库,但是offset还没提交,这个时候消费者挂掉了,再重启的时候会重新消费并处理消息,所以至少会处理一次
3,exactly-once
这种语义可以保证数据只被消费处理一次。
- 将enable.auto.commit设置为false。
- 使用consumer.seek(topicPartition,offset)来指定offset。
- 在处理消息的时候,要同时保存住每个消息的offset。
以原子事务的方式保存offset和处理的消息结果,这个时候相当于自己保存offset信息了,把offset和具体的数据绑定到一块,数据真正处理成功的时候才会保存offset信息,这样就可以保证数据仅被处理一次了。
4,exactly-once实现
从kafka的消费机制,我们可以得到是否能够精确的消费关键在消费进度信息的准确性,如果能够保证消费进度的准确性,也就保证了消费数据的准确性:
- 数据有状态:可以根据数据信息进行确认数据是否重复消费,这时候可以使用手动提交的最少一次消费语义实现,即使消费的数据有重复,可以通过状态进行数据去重,以达到幂等的效果
- 存储数据容器具备幂等性:在数据存入的容器具备天然的幂等(比如ElasticSearch的put操作具备幂等性,相同的数据多次执行Put操作和一次执行Put操作的结果是一致的),这样的场景也可以使用手动提交的最少一次消费语义实现,由存储数据端来进行数据去重。
但是当数据无状态,并且存储容器不具备幂等时,这种场景需要自行控制offset的准确性:
- 利用consumer.seek(topicPartition,offset)方法可以指定offset进行消费,在启动消费者时查询数据库中记录的offset信息,如果是第一次启动,那么数据库中将没有offset信息,需要进行消费的元数据插入,然后从offset=0开始消费。
- 关系型数据库具备事务的特性,当数据入库时,同时也将offset信息更新,借用关系型数据库事务的特性保证数据入库和修改offset记录这两个操作是在同一个事务中进行
- 使用ConsumerRebalanceListener来完成在分配分区时和Relalance时作出相应的处理逻辑
要弄清楚的是,我们在消费的时候,关闭了自动提交,我们也没有通过consumer.commitAsync()手动提交我们的位移信息,而是在每次启动一个新的consumer的时候,触发rebalance时,读取数据库中的位移信息,从该位移中开始读取partition的信息(初始化的时候为0),在没有出现异常的情况下,我们的consumer会不断从producer读取信息,这个位移是最新的那个消息位移,而且会同时把这个位移更新到数据库中,但是,当出现了rebalance时,那么consumer就会从数据库中读取开始的位移。
以上是关于吃透Kafka三:详细分析生产和消费对消息的保障的主要内容,如果未能解决你的问题,请参考以下文章