Kafka消费者可以并行处理多条消息吗

Posted

技术标签:

【中文标题】Kafka消费者可以并行处理多条消息吗【英文标题】:Can Kafka consumer process multiple messages parallely 【发布时间】:2016-06-06 04:55:48 【问题描述】:

想知道 Kafka 消费者(Java 客户端)是否可以并行读取和处理多条消息……我的意思是使用多个线程……我应该使用 rxJava 吗??

1) 这样做是个好方法吗??? 2)根据我的理解,Kafka甚至将每个线程都视为消费者......如果我错了,请纠正我......

3) 并且还想让 Java 客户端作为守护程序服务在 Linux 中运行,以便它连续运行并轮询 Kafka 的消息、读取和处理相同的..这是一个好方法..

【问题讨论】:

【参考方案1】:

Kafka支持按分区并行处理消息,可以启动多个消费者,一个kafka客户端一个或多个分区,kafka也可以通过这种方式支持同一个分区顺序处理。

当然,你可以在一个consumer中启动多个线程处理多条消息,但不能保证一个partition的顺序处理。

【讨论】:

让我明白,所以如果我有 3 个消费者(同一组)和 3 个分区,每个分区将被分配给消费者..现在每个消费者一次只能读取和处理一条消息..as我一次有 3 个消费者,只处理了 3 条消息吗?消息不知道它来自哪个分区.. (1)对。 (2) 一个partition只分配给一个consumer,所以同一个partition中的消息可以顺序处理。 当您说“因此可以按顺序处理同一分区中的消息”时,我们无法实现高吞吐量……如果消费者一次只处理一条消息……即例如,生产者每秒发布 40 条消息……但消费者每秒只处理 1 条消息……尽管我们有多个消费者,假设如果我们有 10 个消费者,则每秒发送 10 条消息……????? 可以为topic创建10个patition,然后启动10个consumer每秒消费10条消息【参考方案2】:

好的,这里有很多问题

想知道是否可以通过 Kafka 消费者(Java 客户端)并行读取和处理多条消息

java 的 kafka 客户端仅支持串行处理,您可以通过创建多个线程和每个线程一个使用者来并行化 kafka 使用者最多 partitions 您的主题的数量,但是线程很棘手,我建议您使用一些库来实现这一点,例如rapids-kafka-client.

public static void main(String[] args)
  ConsumerConfig.<String, String>builder()
      .prop(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())
      .prop(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())
      .prop(GROUP_ID_CONFIG, "stocks")
      .topics("stock_changed")
      .consumers(7)
      .callback((ctx, record) -> 
        System.out.printf("status=consumed, value=%s%n", record.value());
      )
      .build()
      .consume()
      .waitFor();

    这样做是个好方法吗???

是的,没问题,创建分区是为了实现并行性

    根据我的理解,Kafka 甚至将每个线程都视为消费者...如果我错了,请纠正我...

在 Kafka vanilla 客户端库中,消费者是一个能够从主题的一个或多个分区下载消息的类,该客户端本身不支持多线程,您可以创建多个线程或使用某些库(例如 rapids- kafka-client) 用于此目的,因此您可以为不同的主题分区创建许多消费者,然后并行消费。

    并且还希望将 Java 客户端作为守护程序服务在 Linux 中运行,以便它连续运行并轮询 Kafka 以获取消息、读取和处理相同的..这是一个好方法吗..

是的,使用库,编写代码,发布一个 jar,运行它并让它继续处理数据

【讨论】:

以上是关于Kafka消费者可以并行处理多条消息吗的主要内容,如果未能解决你的问题,请参考以下文章

kafka相关术语名词

kafka分区

具有动态数量的并行消费者的 Kafka 工作队列

Kafka学习笔记

kafka消费者java版本读取不到消息怎么办

kafka