Kafka Consumer多线程实例续篇
Posted 大数据Kafka技术分享
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka Consumer多线程实例续篇相关的知识,希望对你有一定的参考价值。
在上一篇《Kafka Consumer多线程实例》中我们讨论了KafkaConsumer多线程的两种写法:多KafkaConsumer多线程以及单KafkaConsumer多线程。在第二种用法中我使用的是自动提交的方式,省去了多线程提交位移的麻烦。很多人跑来问如果是手动提交应该怎么写?由于KafkaConsumer不是线程安全的,因此我们不能简单地在多个线程中直接调用consumer.commitSync来提交位移。本文将给出一个实际的例子来模拟多线程消费以及手动提交位移。
本例中包含3个类:
ConsumerThreadHandler类:consumer多线程的管理类,用于创建线程池以及为每个线程分配任务。另外consumer位移的提交也在这个类中进行
ConsumerWorker类:本质上是一个Runnable,执行真正的消费逻辑并上报位移信息给ConsumerThreadHandler
Main类:测试主方法类
测试代码
详见原文链接~~
测试步骤
1. 首先创建一个测试topic: test-topic,10个分区,并使用kafka-producer-perf-test.sh脚本生产50万条消息
2. 运行Main,假定group.id设置为test-group
3. 新开一个终端,不断地运行以下脚本监控consumer group的消费进度
LAG列全部为0表示consumer group的位移提交正常。值得一提的是,各位可以通过控制consumer.poll的超时时间来控制ConsumerThreadHandler类提交位移的频率。
以上是关于Kafka Consumer多线程实例续篇的主要内容,如果未能解决你的问题,请参考以下文章