Kafka Java consumer动态修改topic订阅
Posted 大数据Kafka技术分享
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka Java consumer动态修改topic订阅相关的知识,希望对你有一定的参考价值。
和KafkaProducer不同的是,KafkaConsumer不是线程安全的,所以我们不能直接在没有同步保护的机制下直接启用另一个线程调用consumer的任何方法(除了wakeup)。因此,实现这个需求有两种途径:
使用重量级的synchorinzed机制来实现线程安全
借助Java类库已有的线程安全数据结构来实现
如果是第一种方式,则无论哪个线程访问consumer都必须要配备必要的同步保护机制,代价相当大且极易出错。本文选取第二种方式,我们可以借助Java提供的ConcurrentLinkedQueue来帮助我们实现。具体的步骤为:
构建ConcurrentLinkedQueue对象分别给两个线程使用(这里并不限定于两个线程,但这个需求最可能的实际场景是consumer主线程和一个后台管理类的用户线程,而后者负责触发“动态修改订阅”逻辑)
调用KafkaConsumer.poll(timeout)来不断消费消息。经常有人问这里的timeout到底是做什么用的?这里统一回答一下:这里的timeout赋予了用户在consumer读取消息后可以执行其他一些操作的能力,比如定期的记录日志等。如果你的consumer没有这样的需求,那么调用KafkaConsumer.poll(1000)和KafkaConsumer.poll(Integer.MAX)没有任何区别。事实上, 我们更加推荐用户使用KafkaConsumer.poll(Integer.MAX) + wakeup的方式来响应后端其他逻辑!
每次poll之后尝试去探查一下ConcurrentLinkedQueue有没有新东西(如果有说明订阅topic列表发生变化),响应之
使用另一个线程往ConcurrentLinkedQueue中插入新的订阅信息
完整样例代码如下:
public
class
ConsumerTest {
以上是关于Kafka Java consumer动态修改topic订阅的主要内容,如果未能解决你的问题,请参考以下文章
SpringBoot中使用Kafka报错:Failed to construct kafka consumer
how to read from __consumer_offsets topic
kafka java producer consumer实践
如何修复 Spark Streaming Kafka Consumer 中的“java.io.NotSerializableException:org.apache.kafka.clients.con