Kafka consumer poll(long)与poll(Duration)的区别

Posted huxi2b

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka consumer poll(long)与poll(Duration)的区别相关的知识,希望对你有一定的参考价值。

最近在StackOverflow碰到的一个问题,即在consumer.poll之后assignment()返回为空的问题,如下面这段代码所示:

consumer.subscribe(Arrays.asList("test"));
consumer.poll(Duration.ofMillis(0));
// consumer.poll(0);
Set<TopicPartition> assignment = consumer.assignment(); // empty!

有意思的是,如果是consumer.poll(0);则assignment不为空。之前我以为poll(long)被标记为“Deprecated”之后使用poll(Duration)是相同的效果,现在看来两者还是要有差别的。为什么poll(0)就能获取到consumer分配方案,而使用poll(Duration)就不能呢?

 

调研了一番之后发现原因如下:在poll(0)中consumer会一直阻塞直到它成功获取了所需的元数据信息,之后它才会发起fetch请求去获取数据。虽然poll可以指定超时时间,但这个超时时间只适用于后面的消息获取,前面更新元数据信息不计入这个超时时间。poll(Duration)这个版本修改了这样的设计,会把元数据获取也计入整个超时时间。由于本例中使用的是0,即瞬时超时,因此consumer根本无法在这么短的时间内连接上coordinator,所以只能赶在超时前返回一个空集合。这就是为什么使用不同版本的poll命令assignment不同的原因。

 

仔细想想为什么社区要做这样的变更?poll(0)这种设计的一个问题在于如果远端的broker不可用了, 那么consumer程序会被无限阻塞下去。用户指定了超时时间但却被无限阻塞,显然这样的设计时有欠缺的。特别是对于Kafka Streams而言,这个设计可能导致的问题在于Stream Thread无法正常关闭。目前源代码中依然有一些无限阻塞的场景,比如之前处理的initTransaction,commitTransaction和abortTransaction也是无限等待。看来后面社区还是需要慢慢地将它们都替换掉,毕竟在分布式系统中没有什么场景是需要绝对地等待的。

以上是关于Kafka consumer poll(long)与poll(Duration)的区别的主要内容,如果未能解决你的问题,请参考以下文章

kafka学习总结017 --- consumer配置参数之max.poll.interval.ms

kafka 重放 重播 从某个时间点或者offset开始消费

kafka随笔

组中的 Kafka 消费者跳过分区

Kafka offset管理

kafka 心跳和 reblance