Apache Kafka 不平衡集群逻辑

Posted

技术标签:

【中文标题】Apache Kafka 不平衡集群逻辑【英文标题】:Apache Kafka Unbalanced Cluster Logic 【发布时间】:2021-07-12 01:27:30 【问题描述】:

我们有一个要求,我们需要通过将 3 个代理集群的流量分配(例如 80% 给代理 1、15% 给代理 2、5% 给代理 3)分配给 Kafka 集群,并为根据broker的流量分布给broker的topic。

为了使用 kafka-python 在 python 编程中实现这个逻辑,我们从 main 函数中调用产生不平衡消息函数。下面提供了实现逻辑的代码示例:-

主要功能

def mf():
   .
   .
   . 
    # create a topic if the topic doesn't exists. Tps_crtn will create new topic if no existing topics found else, will send messages to the existing topics, as usual.
    tpc_list = tps_crtn(base_topic_name=bt, no_of_topics=int(ntp), 
                    topic_partn=int(ptp), 
                    repicas_per_partn=int(rpp))
    #traffic distribution list
    dl = [80,15,5]
    while True:
        
      for ix, topic in enumerate (tpc_list):
        produce_unbalanced_message(topic_name=topic,
         no_of_msgs=int(round((int(nm) * (float(dl[ix])/100.0)))),
         max_wait_time=float(mwt)
if __name__ == "__main__":
    mf()

主函数调用下面提到的生产者发送函数,以便向主题列表中的每个主题发送消息。

不平衡产生消息功能

def produce_unbalanced_message(topic_name='test-topic',
             no_of_msgs=-1,
             max_wait_time=2):
kafka_admin_client: KafkaAdminClient = KafkaAdminClient(
    bootstrap_servers='10.22.151.16:9100'
    )
. 
.
# List of all node ids in the cluster
LOG.info("Fetch the existing Kafka node list")
nodeids: List[int] = [node.nodeId for node in kafka_admin_client._client.cluster.brokers()]
for n in nodeids:
print(n)
.
.
.
# sending unbalanced messages to Kafka
producer.send(topic_name,
              key=key,
              value=message)
.
.

根据要求,消息应该根据broker号和相应的流量分布列表发送,而不是主题列表。我们从 producer_unbalanced_message 函数中的 nodeids 列表中获取的代理编号。

但是,在按照流量分布列表参数测试超过三个主题的代码时,我们遇到了-index out of bound 错误。这是因为我们一增加它们的值就在主题列表中,流量列表分布值不匹配,因为它们是根据代理设置的。

谁能建议应该尝试哪些更改,以便根据从nodeids列表和相应的流量分配列表获得的代理编号而不是根据主题列表发送消息?

【问题讨论】:

您将要定义自己的分区函数。否则,您无法通过其 ID 来定位特定代理以获取记录(密钥通过哈希函数,并且您不能保证此处不会发生哈希冲突) @OneCricketeer 假设我们定义了一个分区函数,我们在其中获取代理的节点 ID,然后在遍历节点 ID 时在 for 循环中调用静态 if 块,如您在下面提到的,相应地发送消息? 代理 ID 不映射到分区,所以我不明白这对你有什么帮助。您需要描述主题,然后找到领导分区,并确保没有重叠,并确保代理不会在您生产(或仅使用一个副本)时重新平衡分区 【参考方案1】:

流量列表分布值不匹配,因为它们是根据代理设置的

您的两个列表是耦合的。如果您的主题比“分发列表”更多/更少,那么您不能使用一个列表的索引来访问另一个列表。

IMO,如果你有一个静态定义的 if 块,它会更具可读性,因为实际上不需要获取主题列表来创建不平衡的集群。

如果您想要 100% 的分布,只需使用随机范围

import random

while True:
   value = random.random()
   topic = None
   if 0 <= value < 0.80:
        topic = 't1'
   elif 0.80 <= value < 0.95:
        topic = 't2'
   else:
        topic = 't3' 
   
   print('Produce to topic ' + topic)

如果您真的希望主题“不平衡”,您将需要验证主题是否只有一个副本并且也由不同的代理托管

【讨论】:

感谢您的建议。只是想了解上面提到的sn-p中如果我们不使用主题列表那么t1,t2和t3将如何对应我们要产生数据的主题呢? 这些字符串是主题名称。

以上是关于Apache Kafka 不平衡集群逻辑的主要内容,如果未能解决你的问题,请参考以下文章

Kafka重平衡机制

kafka 分区分配及再平衡总结

Kafka Connect 进入重新平衡循环

Kafka分区管理

Kafka 进行机器扩容后的副本再平衡 和 为已有分区增加 replica 实践

kafkaKafka Connect中的增量协同平衡