Kafka如何并行消费一个主题
Posted
技术标签:
【中文标题】Kafka如何并行消费一个主题【英文标题】:Kafka how to consume one topic parallel 【发布时间】:2014-10-25 11:27:19 【问题描述】:看了kafka文档,还是不知道怎么并行消费一个topic?
假设: 我有一个话题,比如“发生了什么事”(不要拆分这个话题),而且我有很多客户想要消费它。 那么我应该怎么做,让多个客户可以并行消费呢?我应该使用分区和客户组吗?
我对此有一个想法,但不确定是否正确。
对同一个topic做很多partition,对一个customer做一个partition,所以一个producer必须对这些partition做同样的,每个customer在不同的customer group,是不是?
【问题讨论】:
【参考方案1】:使用分区是一种能够并行处理主题的方式。假设您的主题有 10 个分区,那么您可以在同一个消费者组中有 10 个消费者,每个消费者读取一个分区。如果您的消费者少于分区,那么他们每个人将负责一个以上的分区。如果您的消费者多于分区,那么有些消费者不会获得分配给他们的任何分区,并且除了可以替换另一个已经死亡的消费者之外无事可做。
【讨论】:
so kafka fix 一个客户必须映射一个分区,一个分区也必须映射一个分区? 一个消费者映射到一个或多个分区。【参考方案2】:Kafka 中的每个主题都可以组织成许多分区。分区允许并行消耗增加吞吐量。
生产者使用 Kafka 生产者客户端库将消息发布到主题,该库使用分区器在可用分区之间平衡消息。生产者连接到的代理负责使用 Zookeeper 中的分区所有者信息将消息发送到该分区的领导者代理。消费者使用 Kafka 的高级消费者库(它处理代理领导者的更改、管理 Zookeeper 中的偏移量信息以及隐式地找出分区所有者信息等)来消费来自流中分区的消息;每个流可能会映射到几个分区,具体取决于消费者选择创建消息流的方式。
例如,如果一个主题有 10 个分区和 3 个消费者实例(C1、C2、C3 按此顺序启动)都属于同一个消费者组,我们可以有不同的消费模型来允许读取并行,如下所示
每个消费者使用一个流。
在这个模型中,当 C1 启动时,主题的所有 10 个分区都映射到同一个流,并且 C1 开始从该流中消费。当 C2 启动时,Kafka 重新平衡两个流之间的分区。因此,每个流将被分配到 5 个分区(取决于重新平衡算法,它也可能是 4 对 6)并且每个消费者从其流中消费。同样,当 C3 启动时,分区再次在 3 个流之间重新平衡。请注意,在此模型中,当从分配给多个分区的流中消费时,分区之间的消息顺序将混乱。
每个消费者使用多个流 (比如 C1 使用 3,C2 使用 3,C3 使用 4)。在这个模型中,当 C1 启动时,所有 10 个分区都分配给 3 个流,C1 可以使用多个线程同时从 3 个流中消费。当 C2 启动时,分区在 6 个流之间重新平衡,类似地,当 C3 启动时,分区在 10 个流之间重新平衡。每个消费者可以同时从多个流中消费。请注意,这里的流和分区的数量是相等的。如果流的数量超过分区,一些流将不会收到任何消息,因为它们不会被分配任何分区。
【讨论】:
【参考方案3】:@Lundahl 做了所有的教学,我会给你一个实用的样本。
-
为某些含义创建一个主题,例如
news_events
使用您的消费者需要的并行度(分区),您可以使用处理一条消息的时间、您将拥有的消息数量以及您希望处理所有消息的时间来计算。
让我们为该主题创建消费者,您不想阅读新闻和您的兄弟姐妹,每个人都在您的时间,然后每个人都需要一个消费者组ID,这样kafka就会知道线程a,b ,c 代表一个消费组,d,e,c 代表第二个消费组,每个消费组都会收到相同的消息,按时处理,不会相互影响。
一条消息将来自一个或另一个分区,永远不会在两个分区,默认情况下,Kafka 会循环选择分区,请记住,所有消费者组都可以连接并从所有相同分区读取数据
我建议您使用 rapids-kafka-client,一个为您执行并行处理的库,选择线程数等于您拥有的分区数,选择一个使用者组,然后看看奇迹发生。
public static void main(String[] args)
ConsumerConfig.<String, String>builder()
.prop(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())
.prop(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())
.prop(GROUP_ID_CONFIG, "news-app")
.topics("news_events")
.consumers(7)
.callback((ctx, record) ->
System.out.printf("status=consumed, value=%s%n", record.value());
)
.build()
.consume()
.waitFor();
您可以阅读更多关于消费者组、主题和分区的信息here
【讨论】:
【参考方案4】:我假设您想要的是客户之间以发布/订阅方式并行消费。
除此之外,您还可以在单个客户中进行并行消费,以扩展消费者应用程序。
客户之间并行消费
如果您所说的“客户”是指对独立消费主题消息感兴趣的不同组织,那么您所需要的只是消费者群体。
这是一个简单的发布/订阅模式,每个客户运行自己的应用程序并阅读所有主题的消息,而不会干扰其他人。 每个客户应用程序都可以被视为一个消费者组,由一个或多个 Kafka 消费者(无论是运行在单个节点上还是分布在集群中)组成,所有这些都共享消费者组的标识符。
无论分区如何,您都可以实现此目标。 如果主题是分区的,您无需担心将相同的消息写入所有分区。请记住,在 Kafka 消息是durable 中,Kafka 消费者读取的消息不会被删除,并且可供来自不同消费者组的其他 Kafka 消费者读取(直到它过期)。此外,分区并不意味着像这样工作,它们有助于扩展数据存储(在某一点上,所有主题的数据都不会只适合一个节点)并扩展消费者应用程序,如下所示。
单个客户内并行消费
您可以进一步并行化,或者更好地说,扩展消费者组内的消息消费,实际上是 Kafka 消费者。
假设主题很大,生产者写入其中的速率很高,而消费者组只有一个消费者:这个可怜的消费者可能难以跟上消息到达率,尤其是在消息处理也很耗时的情况下。 在这种情况下,您需要分区和消费者组中的更多消费者,以便 Kafka 将分区分配给消费者以在它们之间分配读取负载。
分区分配的工作原理已在此处的其他答案中进行了解释,但基本上是针对给定的消费者组:
每个主题的分区都专门分配给一个消费者, 消费者可能会被分配更多分区 如果消费者不仅仅是主题的分区,它们中的一些将保持空闲,因为它们不会被分配任何分区来消费。请记住,Kafka中的消息排序只保证在分区级别,所以如果您有很多分区和排序事项,您需要根据您的要求选择正确的消息键来对数据进行分区。
例如,如果您希望消息按设备排序,device_id
将是您的密钥,以保证同一设备的消息将写入同一分区。
【讨论】:
以上是关于Kafka如何并行消费一个主题的主要内容,如果未能解决你的问题,请参考以下文章