永远运行 kafka 消费者(新的消费者 API)

Posted

技术标签:

【中文标题】永远运行 kafka 消费者(新的消费者 API)【英文标题】:Running kafka consumer(new Consumer API) forever 【发布时间】:2016-10-31 22:22:14 【问题描述】:

我在Apache Kafka 上建立了一个排队系统。该应用程序将向特定的Kafka topic 生成消息,并且在消费者端,我必须使用该主题生成的所有记录。 我使用新的 Java Consumer Api 编写了消费者。 代码看起来像

  Properties props = new Properties();  
                     props.put("bootstrap.servers", kafkaBrokerIp+":9092");  
                     props.put("group.id",groupId);  
                     props.put("enable.auto.commit", "true");
                     props.put("session.timeout.ms", "30000");
                     props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
                     consumer.subscribe(Arrays.asList("consumertest"));  
                     while (true)   
                         ConsumerRecords<String, String> records = consumer.poll(100);  
                         for (ConsumerRecord<String, String> record : records)  
                             System.out.println("Data recieved : "+record.value());  
                               
                     

这里我需要永远运行消费者,以便生产者推送到 kafka 主题的任何记录都应该立即消费和处理。 所以我的困惑是,使用无限while循环(如示例代码中)来消耗数据是否正确?

【问题讨论】:

【参考方案1】:

虽然可以无限循环,但可以在 Kafka 消费者 documentation 中找到一种稍微优雅的方法,如下所示:

public class KafkaConsumerRunner implements Runnable 
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final KafkaConsumer consumer;

    public void run() 
        try 
            consumer.subscribe(Arrays.asList("topic"));
            while (!closed.get()) 
                ConsumerRecords records = consumer.poll(10000);
                // Handle new records
            
         catch (WakeupException e) 
            // Ignore exception if closing
            if (!closed.get()) throw e;
         finally 
           consumer.close();
        
    

    // Shutdown hook which can be called from a separate thread
    public void shutdown() 
        closed.set(true);
        consumer.wakeup();
    

这使您可以选择使用钩子正常关闭。

【讨论】:

【参考方案2】:

是的,您可以使用无限循环。实际上,这不是一个繁忙的循环。在每次轮询期间,如果数据不可用,呼叫将等待给定的时间段。

long millisToWait = 100;
consumer.poll(millisToWait);

新消费者自动处理网络通信问题。确保消费者在关闭时优雅地关闭。

【讨论】:

【参考方案3】:

是的,这是使用无限循环来消耗数据的正确方法。

消费者通常是长时间运行的应用程序 不断轮询 Kafka 以获取更多数据。消费者必须继续轮询 Kafka,否则他们将被考虑 死了,他们正在消耗的分区将被交给另一个 组中的消费者继续消费。

poll() 返回记录列表。每条记录都包含主题和分区 记录来自,分区内记录的偏移量,以及 键和记录的值。记录的处理是特定于应用程序的。

如果您退出循环,请始终在退出之前关闭()消费者。这将关闭网络连接和套接字,还会立即触发重新平衡。

【讨论】:

【参考方案4】:

它对我有用,但您可能希望将内部循环放在 try/catch 块中,以防抛出任何异常。如果您断开连接,还可以考虑定期重新连接任务。

【讨论】:

以上是关于永远运行 kafka 消费者(新的消费者 API)的主要内容,如果未能解决你的问题,请参考以下文章

大数据(6f)图解Kafka生产者和消费者API

Kafka消费组核心API与核心参数运行机制剖析

Sparkstreaming and Kafka

Kafka消费者手动提交消息偏移

我应该使用啥:Kafka Stream 或 Kafka 消费者 API 或 Kafka 连接

Kafka核心API——Consumer消费者