如何正确使用 Kafka 消费者“寻找”以返回所有分区的未提交偏移量?
Posted
技术标签:
【中文标题】如何正确使用 Kafka 消费者“寻找”以返回所有分区的未提交偏移量?【英文标题】:How to use Kafka consumer "seek" properly to go back to uncommitted offsets for all partitions? 【发布时间】:2020-09-23 00:02:07 【问题描述】:对于 Java Kafka Consumer seek()
函数,它要求我们传入 TopicPartion
和 Offest
。但是,我认为这个 seek 方法会为我的消费者获取订阅的 TopicPartitions 集合。
这是我正在尝试处理的示例。
消费者 A 订阅了主题“test-topic”分区 1 和 2。当我调用 poll()
时,我从每个分区读取消息。我处理了一些消息,但我的应用程序出现异常。我不打电话给commitSync()
。现在我想回退到我在上次poll()
上检索到的那些偏移量,并尝试重新处理它们。那么我该怎么做呢?我是否需要检查每个主题分区的最后提交的偏移量并为每个分区调用seek()
?多次调用seek()
是否只接受最后一次调用的seek()
?正如我所说,我想确保我的消费者返回所有分区,这样我就不会丢失任何已分配分区上的任何数据。
【问题讨论】:
【参考方案1】:我处理了一些消息,但我的应用程序出现异常。我不 调用 commitSync()
如果您不调用commitSync()
,则不会提交消息。如果假设异常杀死了您的程序,那么在重新启动后,消费者通常会从最后提交的偏移量中读取它。
您可能还想检查auto.offset.reset 并将其设置为earliest
。
检查您的消息是否自动提交,因为您正在执行commitSync()
,所以您不需要自动提交,即enable.auto.commit
可以设置为false
(在 Confluent Kafka 中默认为true
)
如果你的程序没有被异常终止,你总是有消耗的记录。您可以重试处理每条记录,然后提交。
ConsumerRecords records = consumer.poll(Duration.ofSeconds(10));
for(ConsumerRecord record: records)
tryProcess(record, 3);
consumer.commitSync();
void tryProcess(ConsumerRecord record, int maxRetries)
if(maxRetries < 1)
log.warn("max retries exhausted for record");
return;
try
process(record);
catch(Exception ex)
tryProcess(record, --maxRetries);
您也可以尝试通过重试处理批量记录,而不是像tryProcess(records, 3)
这样的记录对应于ConsumerRecords
的每条记录,并且该批重试3 次。我觉得没必要找。
我仍然对 seek() api 的用法感到好奇
seek()
可以用于例如,当我们不使用订阅时,即consumer.subscribe()
,而是consumer.assign()
,当我们只是想偷看(查看)主题中的消息时通常会这样做,例如控制台消费者。有时,我们可能需要在某个偏移量之后查看一些消息,或者最后 n 条消息等,但实际上并没有对它们做任何事情,而只是显示。
【讨论】:
我的应用程序没有崩溃 - 我需要让消费者保持活动状态,所以我不会重新启动应用程序,它必须是自我修复的。 那么,你看到上面的代码sn-p有什么问题吗?解决方案对您来说可行吗? 是的,如果我需要它继续重试直到成功,我想这会起作用。不过,我仍然对seek()
api 的使用感到好奇。
@alex 我用seek()
的可能用例更新了答案以上是关于如何正确使用 Kafka 消费者“寻找”以返回所有分区的未提交偏移量?的主要内容,如果未能解决你的问题,请参考以下文章