Kafka重复消费数据

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka重复消费数据相关的知识,希望对你有一定的参考价值。

参考技术A 从消息发送和消息消费两个方面去说。

「 ACK 」

0:producer不等待broker的ack,这一操作提供了一个最低的延迟,broker一接收到还没有写入磁盘就已经返回,当broker故障时有可能丢失数据。

1:producer等待broker的ack,partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据。

-1:producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack。但是如果在follower同步完成后,broker发送ack之前,leader发生故障,那么会造成数据重复。

「消费」

写入sink没有提交offset,下次消费还从处理过的offset消费。这个时候要么做到幂等不影响下游。要么就是事务+两阶段提交。

在消费者处理数据慢的时候(消费能力低),消费者会重复消费某个局部数据。在消费能力不变的情况下,陷入死循环。只有在消费能力增强后,才会跳出这个重复消费的死循环。

原理解析:

上图就是完整的kafka消费的过程,在consumer里面配置了一个超时时间。如果步骤2处理消息超时,那么consumer进行第3步会失败。这时候再次进入步骤1拉取重复的数据,如此往复。

Kafka重复消费和丢失数据研究


Kafka重复消费原因

底层根本原因:已经消费了数据,但是offset没提交。

原因1:强行kill线程,导致消费后的数据,offset没有提交。

原因2:设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费。例如:

try {

consumer.unsubscribe();

} catch (Exception e) {

}

try {

consumer.close();

} catch (Exception e) {

}

上面代码会导致部分offset没提交,下次启动时会重复消费。


Kafka Consumer丢失数据原因

猜测:设置offset为自动定时提交,当offset被自动定时提交时,数据还在内存中未处理,此时刚好把线程kill掉,那么offset已经提交,但是数据未处理,导致这部分内存中的数据丢失。



 

记录offset和恢复offset的方案

 

理论上记录offset,下一个group consumer可以接着记录的offset位置继续消费。

offset记录方案:

每次消费时更新每个topic+partition位置的offset在内存中,

Map<key, value>,key=topic+‘-‘+partition,value=offset

当调用关闭consumer线程时,把上面Map的offset数据记录到 文件中*(分布式集群可能要记录到redis中)。

 

下一次启动consumer,需要读取上一次的offset信息,方法是 以当前的topic+partition为key,从上次的Map中去寻找offset。

然后使用consumer.seek()方法指定到上次的offset位置。

 

说明:

1、该方案针对单台服务器比较简单,直接把offset记录到本地文件中即可,但是对于多台服务器集群,offset也要记录到同一个地方,并且需要做去重处理。

      如果线上程序是由多台服务器组成的集群,是否可以用一台服务器来支撑?应该可以,只是消费慢一点,没多大影响。


2、如何保证接着offset消费的数据正确性

为了确保consumer消费的数据一定是接着上一次consumer消费的数据,

consumer消费时,记录第一次取出的数据,将其offset和上次consumer最后消费的offset进行对比,如果相同则继续消费。如果不同,则停止消费,检查原因。






以上是关于Kafka重复消费数据的主要内容,如果未能解决你的问题,请参考以下文章

Kafka重复消费和丢失数据研究

kafka重复消费问题

kafka重复消费问题

FLinkFlink 消费 kafka 消费组 死掉 Marking the coordinator dead for group 造成数据重复消费

kafka重复消费的问题

Kafka的重复、丢数据及顺序消费等问题