Kafka消费者-消费者进程和线程与主题分区的关系是啥

Posted

技术标签:

【中文标题】Kafka消费者-消费者进程和线程与主题分区的关系是啥【英文标题】:Kafka consumer - what's the relation of consumer processes and threads with topic partitionsKafka消费者-消费者进程和线程与主题分区的关系是什么 【发布时间】:2015-12-04 21:38:00 【问题描述】:

我最近一直在使用 Kafka,对消费者组下的消费者有点困惑。混淆的中心是是否将消费者实现为进程或线程。对于这个问题,假设我使用的是高级消费者。

让我们考虑一个我尝试过的场景。在我的主题中有 2 个分区(为简单起见,我们假设复制因子仅为 1)。我创建了一个消费者 (ConsumerConnector) 进程 consumer1 和组 group1,然后创建了一个大小为 2 的主题计数映射,然后在该进程下生成了 2 个消费者线程 consumer1_thread1consumer1_thread2。看起来 consumer1_thread1 正在使用分区 0 并且 consumer1_thread2 正在使用分区 1。这种行为总是确定性的吗?下面是代码sn-p。 TestConsumer 类是我的消费线程类。

    ...
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(2));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    executor = Executors.newFixedThreadPool(2);

    int threadNumber = 0;
    for (final KafkaStream stream : streams) 
        executor.submit(new TestConsumer(stream, threadNumber));
        threadNumber++;
    
    ...

现在,让我们考虑另一种情况(我没有尝试过,但很好奇),我启动了 2 个消费者进程 consumer1consumer2,它们都具有相同的组 group1,并且它们每个都是单线程进程.现在我的问题是:

    在这种情况下,两个独立的消费者进程(尽管在同一个组下)如何与分区相关联?和上面的单进程多线程场景有什么区别?

    一般来说,消费者线程或进程如何映射/关联到主题中的分区?

    Kafka 文档确实说过,消费者组下的每个消费者都会消费一个分区。但是,这是指消费者线程(如我上面的代码示例)还是独立的消费者进程?

    关于将消费者实现为进程与线程,我在这里是否遗漏了什么微妙的东西?提前致谢。

【问题讨论】:

【参考方案1】:

一个消费者组可以运行多个消费者实例(多个进程具有相同的group-id)。在消费时每个分区只被组中的一个消费者实例消费

例如如果您的主题包含 2 个分区,并且您启动了一个具有 2 个消费者实例的消费者组 group-A,那么每个消费者实例都将使用来自该主题的特定分区的消息。

如果您使用不同的组 ID group-Agroup-B 启动相同的 2 个使用者,则来自主题的两个分区的消息将被广播到每个分区。因此,在这种情况下,group-A 下运行的消费者实例将拥有来自主题的两个分区的消息,group-B 也是如此。

在他们的documentation上阅读更多信息

编辑:根据您的评论,

我想知道在同一进程下拥有 2 个消费者线程与 2 个消费者进程(两种情况下的组相同)之间的有效区别是什么

消费者group-id 在整个集群中是相同的/全局的。假设您已经使用 2 个线程启动了一个进程,然后使用相同的 groupId 生成了另一个进程(可能在另一台机器上),该进程具有另外 2 个线程,那么 kafka 将添加这 2 个新线程来使用来自该主题的消息。所以最终会有 4 个线程负责消费同一个主题。然后,Kafka 将触发重新平衡以将分区重新分配给线程,因此线程T1 of process P1 正在使用的特定分区可能会被分配给线程T2 of process P2 使用。以下几行来自维基页面

当使用相同的消费者组名称启动一个新进程时,Kafka 会将该进程的线程添加到可用于消费主题的线程集中并触发“重新平衡”。在此重新平衡期间,Kafka 会将可用分区分配给可用线程,可能会将分区移动到另一个进程。如果您混合了新旧业务逻辑,则有可能某些消息会转到旧逻辑。

【讨论】:

我明白你说的。但是,我最初的问题主要关注的是线程与进程。我想知道在同一进程下拥有 2 个消费者线程与 2 个消费者进程(在两种情况下组相同)之间的有效区别是什么? 感谢您的编辑。这真的回答了我的好奇心。与此同时,我对多个进程进行了一些进一步的实验,并看到了重新平衡的发生:)。【参考方案2】:

选择具有相同 ID 的多个消费者组实例与单个消费者组实例的主要设计决策是弹性。例如,如果您有一个带有两个线程的消费者,那么如果这台机器出现故障,您就会失去所有消费者。如果您有两个具有相同 id 的独立消费者组,每个都在不同的主机上,那么它们可以在失败中幸存下来。理想情况下,每个消费者组应该有两个线程,因此如果一个主机出现故障,另一个消费者组使用其休眠线程来占用另一个分区。实际上,总是希望拥有比分区更多的线程来覆盖这个因素。

    您可以在不同的主机上运行每个使用者组。对于给定名称/ID 的单个消费者组,它只会在单个主机上运行,​​因为它在单个运行时环境中管理其所有线程。 Kafka 有一个算法来确定哪些线程/消费者组读取各种主题分区。卡夫卡试图以一种有弹性的方式均匀地分配这些。当一个消费者组失败时,它使其他组中的其他线程能够读取给定的分区。 指消费组中的单个线程。如果线程多于分区,那么其中一些将保持休眠状态,直到其他线程无法提供弹性。 偏好与弹性有关。因此,通过使用相同 ID 设置多个消费者组,我可以在多个主机上运行,​​从而使我的应用程序能够容忍故障。

【讨论】:

你的回答有很多不正确的地方。首先,您可以使用单个消费者组运行多个消费者主机。在组级别上跟踪偏移量,因此使用组来避免组内的线程踩到彼此的脚趾。 IE。如果您设置 2 个主机和 2 个不同的组来提取同一主题,您最终会收到双倍的消息量。 第二件事是Kafka没有确定线程何时失败的算法,您需要手动重新生成失败的线程,否则即使对于高级,组也不会消耗该分区-基于消费者。我用 4 个分区和 8 个线程测试了这个确切的东西,属于失败线程的分区只是保持未使用(即它的偏移在线程重新生成之前不会移动)。 因此,如果您有 4 个分区并产生 8 个使用者线程,则 4 个线程将什么都不做。另一方面,如果你生成 2 个线程,每个线程将消耗 2 个分区。 我已经编辑了我的答案以澄清其含义。如果您使用相同的 id/name 运行多个使用者组,那么工作将分布在多个 JVM 中——我已经对此进行了测试,并且确实发生了。此外,kafka 确实有一个算法来确定线程何时失败,我再次通过杀死消费者组中的一个线程然后观察另一个消​​费者组中具有相同名称的另一个线程占用分区来测试这一点。 我的建议是创建多个具有相同 id 的消费者组,并观察 kafka 将负载分配给消费者组。关键是具有相同的名称/ ID。这就是你如何提高消费者对失败的抵抗力。如果给定消费者组名称的所有消费者组都在同一主机上并且失败,那么您的应用程序将停止从 Kafka 读取。【参考方案3】:

感谢@user2720864的详细解答,但我认为@user2720864在回答中提到的re-allocation案例不正确=>一个分区不能被两个消费者消费。

当有更多消费者(与分区相比)时,每个分区将仅分配给一个消费者,而剩余消费者将保持懒惰,直到一些工作消费者死亡或被移除组。

基于Kafka Consumers document:

在 Kafka 中实现消费的方式是将日志中的分区划分为消费者实例,以便每个实例在任何时候都是“公平份额”分区的独家消费者时间。维护组成员身份的过程由 Kafka 协议动态处理。如果有新实例加入该组,它们将接管该组其他成员的一些分区;如果一个实例死亡,它的分区将被分配给剩余的实例。

还有它的API specification 在“消费者组和主题订阅”部分:

这是通过平衡消费者组中所有成员之间的分区来实现的,以便将每个分区分配给组中的恰好一个消费者。

【讨论】:

以上是关于Kafka消费者-消费者进程和线程与主题分区的关系是啥的主要内容,如果未能解决你的问题,请参考以下文章

Kafka:分发进程和存储

Kafka 分区机制详解

kafka消费者分区分配策略

主题、分区和键

中通消息平台 Kafka 顺序消费线程模型的实践与优化

Kafka 消费者意外地重新平衡