Kafka Consumer多线程实例续篇

Posted 大数据Kafka技术分享

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka Consumer多线程实例续篇相关的知识,希望对你有一定的参考价值。

在上一篇《Kafka Consumer多线程实例》中我们讨论了KafkaConsumer多线程的两种写法:多KafkaConsumer多线程以及单KafkaConsumer多线程。在第二种用法中我使用的是自动提交的方式,省去了多线程提交位移的麻烦。很多人跑来问如果是手动提交应该怎么写?由于KafkaConsumer不是线程安全的,因此我们不能简单地在多个线程中直接调用consumer.commitSync来提交位移。本文将给出一个实际的例子来模拟多线程消费以及手动提交位移。

  本例中包含3个类:

  • ConsumerThreadHandler类:consumer多线程的管理类,用于创建线程池以及为每个线程分配任务。另外consumer位移的提交也在这个类中进行

  • ConsumerWorker类:本质上是一个Runnable,执行真正的消费逻辑并上报位移信息给ConsumerThreadHandler

  • Main类:测试主方法类

测试代码

详见原文链接~~

测试步骤

1. 首先创建一个测试topic: test-topic,10个分区,并使用kafka-producer-perf-test.sh脚本生产50万条消息

2. 运行Main,假定group.id设置为test-group

3. 新开一个终端,不断地运行以下脚本监控consumer group的消费进度

LAG列全部为0表示consumer group的位移提交正常。值得一提的是,各位可以通过控制consumer.poll的超时时间来控制ConsumerThreadHandler类提交位移的频率。

以上是关于Kafka Consumer多线程实例续篇的主要内容,如果未能解决你的问题,请参考以下文章

Kafka consumer在项目中的多线程处理方式

Apache Kafka系列 多线程Consumer方案

kafka high-level consumer 多线程访问异常

kafka Consumer分区数与多线程消费topic

kafka可以修改分区不

使用多线程 + 多处理的 Python 日志记录