如何在 kafka 0.9.0 中使用多线程消费者?
Posted
技术标签:
【中文标题】如何在 kafka 0.9.0 中使用多线程消费者?【英文标题】:How to use multi-thread consumer in kafka 0.9.0? 【发布时间】:2016-07-16 09:38:50 【问题描述】:kafka 的文档给出了一个方法,描述如下:
每个线程一个消费者:一个简单的选择是为每个线程提供自己的消费者>实例。
我的代码:
public class KafkaConsumerRunner implements Runnable
private final AtomicBoolean closed = new AtomicBoolean(false);
private final CloudKafkaConsumer consumer;
private final String topicName;
public KafkaConsumerRunner(CloudKafkaConsumer consumer, String topicName)
this.consumer = consumer;
this.topicName = topicName;
@Override
public void run()
try
this.consumer.subscribe(topicName);
ConsumerRecords<String, String> records;
while (!closed.get())
synchronized (consumer)
records = consumer.poll(100);
for (ConsumerRecord<String, String> tmp : records)
System.out.println(tmp.value());
catch (WakeupException e)
// Ignore exception if closing
System.out.println(e);
//if (!closed.get()) throw e;
// Shutdown hook which can be called from a separate thread
public void shutdown()
closed.set(true);
consumer.wakeup();
public static void main(String[] args)
CloudKafkaConsumer kafkaConsumer = KafkaConsumerBuilder.builder()
.withBootstrapServers("172.31.1.159:9092")
.withGroupId("test")
.build();
ExecutorService executorService = Executors.newFixedThreadPool(5);
executorService.execute(new KafkaConsumerRunner(kafkaConsumer, "log"));
executorService.execute(new KafkaConsumerRunner(kafkaConsumer, "log.info"));
executorService.shutdown();
但它不起作用并抛出异常:
java.util.ConcurrentModificationException: KafkaConsumer 对多线程访问不安全
此外,我阅读了 Flink(分布式流和批处理数据处理的开源平台)的源代码。使用多线程消费者的 Flink 和我的类似。
long pollTimeout = Long.parseLong(flinkKafkaConsumer.properties.getProperty(KEY_POLL_TIMEOUT, Long.toString(DEFAULT_POLL_TIMEOUT)));
pollLoop: while (running)
ConsumerRecords<byte[], byte[]> records;
//noinspection SynchronizeOnNonFinalField
synchronized (flinkKafkaConsumer.consumer)
try
records = flinkKafkaConsumer.consumer.poll(pollTimeout);
catch (WakeupException we)
if (running)
throw we;
// leave loop
continue;
flink code of mutli-thread
怎么了?
【问题讨论】:
【参考方案1】:Kafka 消费者不是线程安全的。正如您在问题中指出的那样,该文件指出
一个简单的选择是给每个线程一个自己的消费者实例
但在您的代码中,您拥有由不同 KafkaConsumerRunner 实例包装的相同消费者实例。因此,多个线程正在访问同一个消费者实例。 kafka文档明确说明
Kafka 消费者不是线程安全的。所有网络 I/O 都发生在 进行调用的应用程序的线程。这是责任 用户确保多线程访问正确 同步。非同步访问将导致 ConcurrentModificationException。
这正是您收到的例外情况。
【讨论】:
【参考方案2】:它会在您调用订阅时引发异常。 this.consumer.subscribe(topicName);
像这样将该块移动到同步块中:
@Override
public void run()
try
synchronized (consumer)
this.consumer.subscribe(topicName);
ConsumerRecords<String, String> records;
while (!closed.get())
synchronized (consumer)
records = consumer.poll(100);
for (ConsumerRecord<String, String> tmp : records)
System.out.println(tmp.value());
catch (WakeupException e)
// Ignore exception if closing
System.out.println(e);
//if (!closed.get()) throw e;
【讨论】:
为我工作。【参考方案3】:也许不是你的情况,但如果你正在合并处理多个主题的数据,那么你可以从同一个消费者的多个主题中读取数据。如果没有,那么最好创建使用每个主题的单独作业。
【讨论】:
以上是关于如何在 kafka 0.9.0 中使用多线程消费者?的主要内容,如果未能解决你的问题,请参考以下文章