防止 Confluent Kafka 在生产时丢失消息

Posted

技术标签:

【中文标题】防止 Confluent Kafka 在生产时丢失消息【英文标题】:Prevent Confluent Kafka from losing messages when producing 【发布时间】:2019-05-14 11:40:02 【问题描述】:

Confluent Kafka 库(本例中为 python 版本)有一个 producer 方法,该方法接受一个传递回调函数:

kafka_producer.produce(topic=topic,
                            key=key,
                            value=value,
                            on_delivery=delivery_callback)

无论消息是否成功发送都会调用此回调:

def delivery_callback(err, msg):

如果消息失败,我在此函数中没有任何重试逻辑,因为文档说它是异步的。

每 100 条左右的消息,我依靠 flush() 告诉我是否有任何消息没有成功生成:

messages_outstanding = kafka_producer.flush()
if messages_outstanding == 0:
   //continue to the next batch of 100
else:
   //produce the batch again

flush() 会为任何未能生成的消息负责吗? (在delivery_callback报错)

换句话说,我可以确定flush() 不会在任何消息失败时返回零

【问题讨论】:

【参考方案1】:

确认以下结果:

即使消息无法生成,调用.flush() 也绝对可以返回零。此方法似乎要等到所有消息的所有传递回调都完成(回调可以简单地报告消息传递失败)。

从我们的角度来看,整个事情出奇地尴尬。如果您无法承受丢失消息的后果,则需要检测传递回调何时失败,并实施某种形式的重试逻辑来覆盖失败的消息。

【讨论】:

以上是关于防止 Confluent Kafka 在生产时丢失消息的主要内容,如果未能解决你的问题,请参考以下文章

kafkacelery与kafka的联用问题

Kafka的消息可靠性(防止消息丢失)

关于kafka数据丢失场景的一次激烈讨论....

python confluent kafka客户端-无法使用SSL访问GKE上的Kafka

Kafka Connect - 无法刷新,等待生产者刷新未完成的消息时超时

Confluent 平台 Kafka Connect 在 137 号出口崩溃