Java Apache Kafka生产者元数据更新器和重试逻辑

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java Apache Kafka生产者元数据更新器和重试逻辑相关的知识,希望对你有一定的参考价值。

我正在使用Spring用于Apache Kafka,并通过Spring的KafkaTemplate创建了一个使用Kafka生产者(org.apache.kafka.clients.producer)的服务,以将消息发送到主题。在目标Kafka群集上,我禁用了自动主题创建。使用此处列出的生产者配置的组合https://kafka.apache.org/documentation/#producerconfigs,我成功控制了请求重试的次数,重试之间的时间等。

如果我提供的主题不存在,则请求在我期望的时间内超时(达到max.block.ms时)。但是,在超时后,我继续以设置为[[retry.backoff.ms的时间间隔获取日志条目(例如下面的条目),直到达到300000 ms / 5分钟

我无法确定可以更改生产者或代理上的哪个配置属性,以阻止生产者检查5分钟以查看主题是否已创建。

有人可以指出正确的设置,使我可以减少此设置,或者在请求超时后停止检查吗?

日志输入示例

WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater: [Producer clientId=producer-1] Error while fetching metadata with correlation id 9 : <specified_topic>=UNKNOWN_TOPIC_OR_PARTITION

使用的生产者配置:

    delivery.timeout.ms = 5000
  • linger.ms = 1000
  • 最大块。毫秒= 8000
  • request.timeout.ms = 4000
  • 最大重试次数= 0
  • retry.backoff.ms = 2000
答案
Kafka Producer在第一个send之前检索和缓存主题/分区元数据。然后,它会定期尝试刷新此元数据,这就是您在日志中观察到的内容。如果在metadata.max.age.ms时间(默认值= 5分钟)内未使用主题,则会将该主题从生产者的缓存中删除,因此将不再尝试查询其元数据。

metadata.max.age.ms来源中的注释似乎是从version 2.3.1继承过来的(顺便说一下,尽管代码进行了相当大的改动?)但是很遗憾,除了清楚当前主题的到期逻辑是什么之外,什么都没做。

2.2

以上是关于Java Apache Kafka生产者元数据更新器和重试逻辑的主要内容,如果未能解决你的问题,请参考以下文章

scala中的Flink Kafka程序给出超时错误org.apache.kafka.common.errors.TimeoutException:60000毫秒后更新元数据失败

Java实现Kafka生产者和消费者的示例

为啥Kafka消费者连接zookeeper,生产者从broker获取元数据?

java自学网址,源码+原理+手写框架

java开发实战经典视频,面试经历分享

java面试笔试题代码,先收藏了