Kafka消费者-消费者进程和线程与主题分区的关系是啥
Posted
技术标签:
【中文标题】Kafka消费者-消费者进程和线程与主题分区的关系是啥【英文标题】:Kafka consumer - what's the relation of consumer processes and threads with topic partitionsKafka消费者-消费者进程和线程与主题分区的关系是什么 【发布时间】:2015-12-04 21:38:00 【问题描述】:我最近一直在使用 Kafka,对消费者组下的消费者有点困惑。混淆的中心是是否将消费者实现为进程或线程。对于这个问题,假设我使用的是高级消费者。
让我们考虑一个我尝试过的场景。在我的主题中有 2 个分区(为简单起见,我们假设复制因子仅为 1)。我创建了一个消费者 (ConsumerConnector
) 进程 consumer1
和组 group1
,然后创建了一个大小为 2 的主题计数映射,然后在该进程下生成了 2 个消费者线程 consumer1_thread1
和 consumer1_thread2
。看起来 consumer1_thread1
正在使用分区 0
并且 consumer1_thread2
正在使用分区 1
。这种行为总是确定性的吗?下面是代码sn-p。 TestConsumer
类是我的消费线程类。
...
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(2));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(2);
int threadNumber = 0;
for (final KafkaStream stream : streams)
executor.submit(new TestConsumer(stream, threadNumber));
threadNumber++;
...
现在,让我们考虑另一种情况(我没有尝试过,但很好奇),我启动了 2 个消费者进程 consumer1
和 consumer2
,它们都具有相同的组 group1
,并且它们每个都是单线程进程.现在我的问题是:
在这种情况下,两个独立的消费者进程(尽管在同一个组下)如何与分区相关联?和上面的单进程多线程场景有什么区别?
一般来说,消费者线程或进程如何映射/关联到主题中的分区?
Kafka 文档确实说过,消费者组下的每个消费者都会消费一个分区。但是,这是指消费者线程(如我上面的代码示例)还是独立的消费者进程?
关于将消费者实现为进程与线程,我在这里是否遗漏了什么微妙的东西?提前致谢。
【问题讨论】:
【参考方案1】:一个消费者组可以运行多个消费者实例(多个进程具有相同的group-id
)。在消费时每个分区只被组中的一个消费者实例消费。
例如如果您的主题包含 2 个分区,并且您启动了一个具有 2 个消费者实例的消费者组 group-A
,那么每个消费者实例都将使用来自该主题的特定分区的消息。
如果您使用不同的组 ID group-A
和 group-B
启动相同的 2 个使用者,则来自主题的两个分区的消息将被广播到每个分区。因此,在这种情况下,group-A
下运行的消费者实例将拥有来自主题的两个分区的消息,group-B
也是如此。
在他们的documentation上阅读更多信息
编辑:根据您的评论,
我想知道在同一进程下拥有 2 个消费者线程与 2 个消费者进程(两种情况下的组相同)之间的有效区别是什么
消费者group-id
在整个集群中是相同的/全局的。假设您已经使用 2 个线程启动了一个进程,然后使用相同的 groupId 生成了另一个进程(可能在另一台机器上),该进程具有另外 2 个线程,那么 kafka 将添加这 2 个新线程来使用来自该主题的消息。所以最终会有 4 个线程负责消费同一个主题。然后,Kafka 将触发重新平衡以将分区重新分配给线程,因此线程T1 of process P1
正在使用的特定分区可能会被分配给线程T2 of process P2
使用。以下几行来自维基页面
当使用相同的消费者组名称启动一个新进程时,Kafka 会将该进程的线程添加到可用于消费主题的线程集中并触发“重新平衡”。在此重新平衡期间,Kafka 会将可用分区分配给可用线程,可能会将分区移动到另一个进程。如果您混合了新旧业务逻辑,则有可能某些消息会转到旧逻辑。
【讨论】:
我明白你说的。但是,我最初的问题主要关注的是线程与进程。我想知道在同一进程下拥有 2 个消费者线程与 2 个消费者进程(在两种情况下组相同)之间的有效区别是什么? 感谢您的编辑。这真的回答了我的好奇心。与此同时,我对多个进程进行了一些进一步的实验,并看到了重新平衡的发生:)。【参考方案2】:选择具有相同 ID 的多个消费者组实例与单个消费者组实例的主要设计决策是弹性。例如,如果您有一个带有两个线程的消费者,那么如果这台机器出现故障,您就会失去所有消费者。如果您有两个具有相同 id 的独立消费者组,每个都在不同的主机上,那么它们可以在失败中幸存下来。理想情况下,每个消费者组应该有两个线程,因此如果一个主机出现故障,另一个消费者组使用其休眠线程来占用另一个分区。实际上,总是希望拥有比分区更多的线程来覆盖这个因素。
-
您可以在不同的主机上运行每个使用者组。对于给定名称/ID 的单个消费者组,它只会在单个主机上运行,因为它在单个运行时环境中管理其所有线程。
Kafka 有一个算法来确定哪些线程/消费者组读取各种主题分区。卡夫卡试图以一种有弹性的方式均匀地分配这些。当一个消费者组失败时,它使其他组中的其他线程能够读取给定的分区。
指消费组中的单个线程。如果线程多于分区,那么其中一些将保持休眠状态,直到其他线程无法提供弹性。
偏好与弹性有关。因此,通过使用相同 ID 设置多个消费者组,我可以在多个主机上运行,从而使我的应用程序能够容忍故障。
【讨论】:
你的回答有很多不正确的地方。首先,您可以使用单个消费者组运行多个消费者主机。在组级别上跟踪偏移量,因此使用组来避免组内的线程踩到彼此的脚趾。 IE。如果您设置 2 个主机和 2 个不同的组来提取同一主题,您最终会收到双倍的消息量。 第二件事是Kafka没有确定线程何时失败的算法,您需要手动重新生成失败的线程,否则即使对于高级,组也不会消耗该分区-基于消费者。我用 4 个分区和 8 个线程测试了这个确切的东西,属于失败线程的分区只是保持未使用(即它的偏移在线程重新生成之前不会移动)。 因此,如果您有 4 个分区并产生 8 个使用者线程,则 4 个线程将什么都不做。另一方面,如果你生成 2 个线程,每个线程将消耗 2 个分区。 我已经编辑了我的答案以澄清其含义。如果您使用相同的 id/name 运行多个使用者组,那么工作将分布在多个 JVM 中——我已经对此进行了测试,并且确实发生了。此外,kafka 确实有一个算法来确定线程何时失败,我再次通过杀死消费者组中的一个线程然后观察另一个消费者组中具有相同名称的另一个线程占用分区来测试这一点。 我的建议是创建多个具有相同 id 的消费者组,并观察 kafka 将负载分配给消费者组。关键是具有相同的名称/ ID。这就是你如何提高消费者对失败的抵抗力。如果给定消费者组名称的所有消费者组都在同一主机上并且失败,那么您的应用程序将停止从 Kafka 读取。【参考方案3】:感谢@user2720864的详细解答,但我认为@user2720864在回答中提到的re-allocation案例不正确=>一个分区不能被两个消费者消费。
当有更多消费者(与分区相比)时,每个分区将仅分配给一个消费者,而剩余消费者将保持懒惰,直到一些工作消费者死亡或被移除组。
基于Kafka Consumers document:
在 Kafka 中实现消费的方式是将日志中的分区划分为消费者实例,以便每个实例在任何时候都是“公平份额”分区的独家消费者时间。维护组成员身份的过程由 Kafka 协议动态处理。如果有新实例加入该组,它们将接管该组其他成员的一些分区;如果一个实例死亡,它的分区将被分配给剩余的实例。
还有它的API specification 在“消费者组和主题订阅”部分:
这是通过平衡消费者组中所有成员之间的分区来实现的,以便将每个分区分配给组中的恰好一个消费者。
【讨论】:
以上是关于Kafka消费者-消费者进程和线程与主题分区的关系是啥的主要内容,如果未能解决你的问题,请参考以下文章