Kafka消费者可以并行处理多条消息吗
Posted
技术标签:
【中文标题】Kafka消费者可以并行处理多条消息吗【英文标题】:Can Kafka consumer process multiple messages parallely 【发布时间】:2016-06-06 04:55:48 【问题描述】:想知道 Kafka 消费者(Java 客户端)是否可以并行读取和处理多条消息……我的意思是使用多个线程……我应该使用 rxJava 吗??
1) 这样做是个好方法吗??? 2)根据我的理解,Kafka甚至将每个线程都视为消费者......如果我错了,请纠正我......
3) 并且还想让 Java 客户端作为守护程序服务在 Linux 中运行,以便它连续运行并轮询 Kafka 的消息、读取和处理相同的..这是一个好方法..
【问题讨论】:
【参考方案1】:Kafka支持按分区并行处理消息,可以启动多个消费者,一个kafka客户端一个或多个分区,kafka也可以通过这种方式支持同一个分区顺序处理。
当然,你可以在一个consumer中启动多个线程处理多条消息,但不能保证一个partition的顺序处理。
【讨论】:
让我明白,所以如果我有 3 个消费者(同一组)和 3 个分区,每个分区将被分配给消费者..现在每个消费者一次只能读取和处理一条消息..as我一次有 3 个消费者,只处理了 3 条消息吗?消息不知道它来自哪个分区.. (1)对。 (2) 一个partition只分配给一个consumer,所以同一个partition中的消息可以顺序处理。 当您说“因此可以按顺序处理同一分区中的消息”时,我们无法实现高吞吐量……如果消费者一次只处理一条消息……即例如,生产者每秒发布 40 条消息……但消费者每秒只处理 1 条消息……尽管我们有多个消费者,假设如果我们有 10 个消费者,则每秒发送 10 条消息……????? 可以为topic创建10个patition,然后启动10个consumer每秒消费10条消息【参考方案2】:好的,这里有很多问题
想知道是否可以通过 Kafka 消费者(Java 客户端)并行读取和处理多条消息
java 的 kafka 客户端仅支持串行处理,您可以通过创建多个线程和每个线程一个使用者来并行化 kafka 使用者最多 partitions 您的主题的数量,但是线程很棘手,我建议您使用一些库来实现这一点,例如rapids-kafka-client.
public static void main(String[] args)
ConsumerConfig.<String, String>builder()
.prop(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())
.prop(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())
.prop(GROUP_ID_CONFIG, "stocks")
.topics("stock_changed")
.consumers(7)
.callback((ctx, record) ->
System.out.printf("status=consumed, value=%s%n", record.value());
)
.build()
.consume()
.waitFor();
这样做是个好方法吗???
是的,没问题,创建分区是为了实现并行性
根据我的理解,Kafka 甚至将每个线程都视为消费者...如果我错了,请纠正我...
在 Kafka vanilla 客户端库中,消费者是一个能够从主题的一个或多个分区下载消息的类,该客户端本身不支持多线程,您可以创建多个线程或使用某些库(例如 rapids- kafka-client) 用于此目的,因此您可以为不同的主题分区创建许多消费者,然后并行消费。
并且还希望将 Java 客户端作为守护程序服务在 Linux 中运行,以便它连续运行并轮询 Kafka 以获取消息、读取和处理相同的..这是一个好方法吗..
是的,使用库,编写代码,发布一个 jar,运行它并让它继续处理数据
【讨论】:
以上是关于Kafka消费者可以并行处理多条消息吗的主要内容,如果未能解决你的问题,请参考以下文章