kafka消费者动态检测添加的主题
Posted
技术标签:
【中文标题】kafka消费者动态检测添加的主题【英文标题】:kafka consumer to dynamically detect topics added 【发布时间】:2016-07-09 07:18:21 【问题描述】:我正在使用 KafkaConsumer 来消费来自 Kafka 服务器(主题)的消息..
它适用于在启动消费者代码之前创建的主题...但问题是,如果主题是动态创建的(我的意思是在消费者代码启动之后),它将无法工作,但 API 表示它将支持动态主题创建..这是供您参考的链接..
使用的Kafka版本:0.9.0.1
https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
这里是JAVA代码...
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
Pattern r = Pattern.compile("siddu(\\d)*");
consumer.subscribe(r, new HandleRebalance());
try
while(true)
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions())
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords)
System.out.println(partition.partition() + ": " +record.offset() + ": " + record.value());
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
finally
consumer.close();
注意:我的主题名称与正则表达式匹配。 如果我重新启动消费者,那么它将开始阅读推送到主题的消息......
非常感谢任何帮助...
【问题讨论】:
【参考方案1】:这是使用 KafkaConsumer api 为我工作的解决方案。这是它的Java代码。
private static Consumer<Long, String> createConsumer(String topic)
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG,
"KafkaExampleConsumer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
// Create the consumer using props.
final Consumer<Long, String> consumer =
new KafkaConsumer<>(props);
// Subscribe to the topic.
consumer.subscribe(Collections.singletonList(topic));
return consumer;
public static void runConsumer(String topic) throws InterruptedException
final Consumer<Long, String> consumer = createConsumer(topic);
ConsumerRecords<Long, String> records = consumer.poll(100);
for (ConsumerRecord<Long, String> record : records)
System.out.printf("hiiiii offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
consumer.commitAsync();
consumer.close();
//System.out.println("DONE");
使用它,我们可以使用来自动态创建的主题的消息。
【讨论】:
【参考方案2】:您可以连接到 Zookeeper。查看the sample code。本质上,您将在 Zookeeper 节点/brokers/topics
上创建一个观察者。当在这里添加新的孩子时,这是一个正在添加的新主题,您的观察者将被触发。
请注意,这个答案和另一个答案之间的区别在于,这个答案是一个触发器,另一个是轮询 - 这个将尽可能接近实时,另一个将在您的轮询间隔内是最好的。
【讨论】:
感谢您的回复和帮助...基本上我想使用 KafkaConsumer api 来实现这一点,我自己解决了.. @madlad 在下面看到我的回答。 '示例代码'链接无效,问题也是关于消费消息,而不仅仅是了解新主题....新主题将在 consumer.listTopics().keySet( ) 无论如何 链接已修复 -- 还添加了关于两种方法之间差异的行。【参考方案3】:在 apache kafka 邮件档案中有一个答案。我在下面复制它:
消费者支持配置选项“metadata.max.age.ms” 它基本上控制获取主题元数据的频率。经过 默认情况下,这设置得相当高(5 分钟),这意味着它需要 最多 5 分钟来发现与您的常规主题相匹配的新主题 表达。您可以将其设置得较低以更快地发现主题。
所以在你的道具中你可以:
props.put("metadata.max.age.ms", 5000);
这将使您的消费者每 5 秒发现一次新主题。
【讨论】:
这还取决于您如何设置“auto.offset.reset”消费者属性。如果它是“最新的”,它们将选择来自已知主题(在消费者开始之后)的最新/[之前未消费]消息,但不是动态主题。如果您将其设置为“最早”并且还放置了 consumer.seekToBeginning(consumer.assignment());在 poll 之前 - 只做一次,然后它会识别动态/新主题,但它也会每次都从头开始获取所有记录 我们能以某种方式强制获取元数据请求吗?例如consumer.fetchMeta() 什么的?以上是关于kafka消费者动态检测添加的主题的主要内容,如果未能解决你的问题,请参考以下文章