永远运行 kafka 消费者(新的消费者 API)
Posted
技术标签:
【中文标题】永远运行 kafka 消费者(新的消费者 API)【英文标题】:Running kafka consumer(new Consumer API) forever 【发布时间】:2016-10-31 22:22:14 【问题描述】:我在Apache Kafka
上建立了一个排队系统。该应用程序将向特定的Kafka topic
生成消息,并且在消费者端,我必须使用该主题生成的所有记录。
我使用新的 Java Consumer Api 编写了消费者。
代码看起来像
Properties props = new Properties();
props.put("bootstrap.servers", kafkaBrokerIp+":9092");
props.put("group.id",groupId);
props.put("enable.auto.commit", "true");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList("consumertest"));
while (true)
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.println("Data recieved : "+record.value());
这里我需要永远运行消费者,以便生产者推送到 kafka 主题的任何记录都应该立即消费和处理。 所以我的困惑是,使用无限while循环(如示例代码中)来消耗数据是否正确?
【问题讨论】:
【参考方案1】:虽然可以无限循环,但可以在 Kafka 消费者 documentation 中找到一种稍微优雅的方法,如下所示:
public class KafkaConsumerRunner implements Runnable
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;
public void run()
try
consumer.subscribe(Arrays.asList("topic"));
while (!closed.get())
ConsumerRecords records = consumer.poll(10000);
// Handle new records
catch (WakeupException e)
// Ignore exception if closing
if (!closed.get()) throw e;
finally
consumer.close();
// Shutdown hook which can be called from a separate thread
public void shutdown()
closed.set(true);
consumer.wakeup();
这使您可以选择使用钩子正常关闭。
【讨论】:
【参考方案2】:是的,您可以使用无限循环。实际上,这不是一个繁忙的循环。在每次轮询期间,如果数据不可用,呼叫将等待给定的时间段。
long millisToWait = 100;
consumer.poll(millisToWait);
新消费者自动处理网络通信问题。确保消费者在关闭时优雅地关闭。
【讨论】:
【参考方案3】:是的,这是使用无限循环来消耗数据的正确方法。
消费者通常是长时间运行的应用程序 不断轮询 Kafka 以获取更多数据。消费者必须继续轮询 Kafka,否则他们将被考虑 死了,他们正在消耗的分区将被交给另一个 组中的消费者继续消费。
poll() 返回记录列表。每条记录都包含主题和分区 记录来自,分区内记录的偏移量,以及 键和记录的值。记录的处理是特定于应用程序的。
如果您退出循环,请始终在退出之前关闭()消费者。这将关闭网络连接和套接字,还会立即触发重新平衡。
【讨论】:
【参考方案4】:它对我有用,但您可能希望将内部循环放在 try/catch 块中,以防抛出任何异常。如果您断开连接,还可以考虑定期重新连接任务。
【讨论】:
以上是关于永远运行 kafka 消费者(新的消费者 API)的主要内容,如果未能解决你的问题,请参考以下文章