kafka消费者动态检测添加的主题

Posted

技术标签:

【中文标题】kafka消费者动态检测添加的主题【英文标题】:kafka consumer to dynamically detect topics added 【发布时间】:2016-07-09 07:18:21 【问题描述】:

我正在使用 KafkaConsumer 来消费来自 Kafka 服务器(主题)的消息..

它适用于在启动消费者代码之前创建的主题...

但问题是,如果主题是动态创建的(我的意思是在消费者代码启动之后),它将无法工作,但 API 表示它将支持动态主题创建..这是供您参考的链接..

使用的Kafka版本:0.9.0.1

https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

这里是JAVA代码...

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "false");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    Pattern r = Pattern.compile("siddu(\\d)*");

    consumer.subscribe(r, new HandleRebalance());
    try 
         while(true) 
             ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
             for (TopicPartition partition : records.partitions()) 
                 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                 for (ConsumerRecord<String, String> record : partitionRecords) 
                     System.out.println(partition.partition()  + ": "  +record.offset() + ": " + record.value());
                 
                 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();

                 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
             
         
      finally 
       consumer.close();
     

注意:我的主题名称与正则表达式匹配。 如果我重新启动消费者,那么它将开始阅读推送到主题的消息......

非常感谢任何帮助...

【问题讨论】:

【参考方案1】:

这是使用 KafkaConsumer api 为我工作的解决方案。这是它的Java代码。

private static Consumer<Long, String> createConsumer(String topic) 
    final Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.GROUP_ID_CONFIG,
            "KafkaExampleConsumer");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
    // Create the consumer using props.
    final Consumer<Long, String> consumer =
            new KafkaConsumer<>(props);
    // Subscribe to the topic.
    consumer.subscribe(Collections.singletonList(topic));
    return consumer;


public static void runConsumer(String topic) throws InterruptedException 
    final Consumer<Long, String> consumer = createConsumer(topic);

    ConsumerRecords<Long, String> records = consumer.poll(100);
    for (ConsumerRecord<Long, String> record : records)
        System.out.printf("hiiiii offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    consumer.commitAsync();
    consumer.close();
    //System.out.println("DONE");

使用它,我们可以使用来自动态创建的主题的消息。

【讨论】:

【参考方案2】:

您可以连接到 Zookeeper。查看the sample code。本质上,您将在 Zookeeper 节点/brokers/topics 上创建一个观察者。当在这里添加新的孩子时,这是一个正在添加的新主题,您的观察者将被触发。

请注意,这个答案和另一个答案之间的区别在于,这个答案是一个触发器,另一个是轮询 - 这个将尽可能接近实时,另一个将在您的轮询间隔内是最好的。

【讨论】:

感谢您的回复和帮助...基本上我想使用 KafkaConsumer api 来实现这一点,我自己解决了.. @madlad 在下面看到我的回答。 '示例代码'链接无效,问题也是关于消费消息,而不仅仅是了解新主题....新主题将在 consumer.listTopics().keySet( ) 无论如何 链接已修复 -- 还添加了关于两种方法之间差异的行。【参考方案3】:

在 apache kafka 邮件档案中有一个答案。我在下面复制它:

消费者支持配置选项“metadata.max.age.ms” 它基本上控制获取主题元数据的频率。经过 默认情况下,这设置得相当高(5 分钟),这意味着它需要 最多 5 分钟来发现与您的常规主题相匹配的新主题 表达。您可以将其设置得较低以更快地发现主题。

所以在你的道具中你可以:

props.put("metadata.max.age.ms", 5000);

这将使您的消费者每 5 秒发现一次新主题。

【讨论】:

这还取决于您如何设置“auto.offset.reset”消费者属性。如果它是“最新的”,它们将选择来自已知主题(在消费者开始之后)的最新/[之前未消费]消息,但不是动态主题。如果您将其设置为“最早”并且还放置了 consumer.seekToBeginning(consumer.assignment());在 poll 之前 - 只做一次,然后它会识别动态/新主题,但它也会每次都从头开始获取所有记录 我们能以某种方式强制获取元数据请求吗?例如consumer.fetchMeta() 什么的?

以上是关于kafka消费者动态检测添加的主题的主要内容,如果未能解决你的问题,请参考以下文章

kafka过滤/动态主题创建

Flink Connectors之消费Kafka数据相关参数以及API说明

kafka如何动态消费新增topic主题

具有动态数量的并行消费者的 Kafka 工作队列

kafka常用生产者消费者配置

绍圣--kafka之消费者(八)