Kafka consumer在项目中的多线程处理方式
Posted benfly
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka consumer在项目中的多线程处理方式相关的知识,希望对你有一定的参考价值。
项目实践:
描述:
RunnableConsumer类:这是一个线程类,实现了Runnable接口,里面创建了一个KafkaConsumer对象,线程启动程序中执行对topic的订阅,并拉取消息
public class RunnableConsumer<K,V> implements Runnable { private Consumer<K,V> consumer; private final IConsumerListener<ConsumerRecords<K,V>> listener; private RunnableConsumer(final IConsumerListener<ConsumerRecords<K,V>> listener, Properties... props) { this.consumer = new KafkaConsumer<>(props, keyDeserClass, valueDeserClass); this.listener = listener; } public void run() { try { consumer.subscribe(topics); while (true) { ConsumerRecords<K,V> records = null; try { //now handle any new record(s) records = consumer.poll(1000); if(records != null && records.count() > 0) { listener.notify(records); } } catch(WakeupException wex) { LOGGER.trace("Got a WakeupException. Doing nothing. Exception Details:",wex); } } } catch (Throwable e) { // ignore as we are waking up from the poll, we need to cleanly shutdown our consumer LOGGER.error("getting non-recoverable throwable: ", e); throw e; } finally { //TODO: need to check on consumer closing that any outstanding offset commit is done. //otherwise we need to manually do it here. processCommit(SyncMode.SYNC); LOGGER.info("Trying to close Kafka consumer, ConsumerGroup.isRunning: {}", ConsumerGroup.this.isRunning); consumer.close(); } } }
Listener类:这是一个监听类,用于实际处理某一topic消息,先创建监听对象,在创建cg时,注册到RunnerConsumer类中,如果consumer拉取到消息,则将消息通知监听类去具体处理,不同的业务需要定义不同的业务监听类
修改为第二种方式
如果想使用第二种方式,将数据的处理从consumer中解耦出来,可以将上面的listener修改为一个线程类,在consumer中有拉取到消息,则从线程池中取出线程处理数据,这种方式的一个最大的问题,就是如何保证消息是按顺序处理的,例如,如果一个partition中先后有2条消息,当consumer poll到消息后,将提交到2个线程处理,这就无法保证顺序处理,需要额外的线程同步处理机制。同时因为不需要在consumer中对数据进行处理,consumer的性能也提高了,而且避免了数据处理超时,consumer Rebalance等潜在问题
records = consumer.poll(1000); if(records != null && records.count() > 0) { executor.submit(new listener(records)); }
参考:
http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#multithreaded
https://howtoprogram.xyz/2016/05/29/create-multi-threaded-apache-kafka-consumer/
以上是关于Kafka consumer在项目中的多线程处理方式的主要内容,如果未能解决你的问题,请参考以下文章