超过每个分区允许的最大接收器数量 eventthub 中的错误

Posted

技术标签:

【中文标题】超过每个分区允许的最大接收器数量 eventthub 中的错误【英文标题】:Exceeded the maximum number of allowed receivers per partition Error in eventhub 【发布时间】:2019-10-31 02:57:11 【问题描述】:

就像下面的代码一样,我不断地从 azure eventthub 接收数据。我经常看到错误提示“超出了每个分区允许的最大接收器数量”,我知道它来自哪里。

import os
import sys
import logging
import time
from azure.eventhub import EventHubClient, Receiver, Offset

logger = logging.getLogger("azure")

ADDRESS = ""
USER = ""
KEY = ""

CONSUMER_GROUP = "$default"
OFFSET = Offset("@latest")
PARTITION = "0"

total = 0
last_sn = -1
last_offset = "-1"
client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)

receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=0, offset=OFFSET)    
client.run()
start_time = time.time()
while True:
    for event_data in receiver.receive(timeout=5000):
        print("Received: ".format(event_data.body_as_str(encoding='UTF-8')))
        a = event_data.body_as_str(encoding='UTF-8')
        total += 1
    end_time = time.time()
    run_time = end_time - start_time
    print("Received  messages in  seconds".format(total, run_time))

这里的行是添加接收器的行,如果我添加超过五个接收器,它会达到每个分区可能接收器数量的限制。

receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=0
    因此,我尝试删除带有功能的接收器。 比如使用receiver.client.clients.removereceiver.client.clients.clear(),清除之前添加的接收者。但是,这些方法似乎都不起作用。

我看到这个错误的原因是因为每当我需要停止运行脚本进行调试时,我都会运行上面的整个代码,所以每当我重新运行它时,我都必须运行receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=0这一行

    我还尝试只运行“add_receiver”行下方的部分代码
client.run()
start_time = time.time()
while True:
    for event_data in receiver.receive(timeout=5000):
        print("Received: ".format(event_data.body_as_str(encoding='UTF-8')))
        a = event_data.body_as_str(encoding='UTF-8')
        total += 1
    end_time = time.time()
    run_time = end_time - start_time
    print("Received  messages in  seconds".format(total, run_time))

但是,我看到另一个错误说 EventHubError: This receive handler is now closed.

有什么方法可以解决这个问题?

【问题讨论】:

嗨,Brian,你能解释一下这个“如果我添加超过五个接收器,它会达到 xxxx 的限制”吗? @IvanYang 这意味着 Azure 事件中心限制了每个消费者的接收者数量,而我正在达到这个限制。 @IvanYang 抱歉,有错别字。 你说你知道这个错误来自哪里。你能给我解释一下吗?我遇到了同样的错误,最近才开始使用 azure 【参考方案1】:

经过努力,我认为这可能是解决此问题的方法。

对于上面的第 22 行,我可以简单地添加“keep_alive”输入,例如:

receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=0, offset=OFFSET, keep_alive = 10000000)  

这样,我可以让客户端接收器保持打开状态,而不会收到错误消息“EventHubError:此接收处理程序现已关闭。”,然后只需从下面运行唯一的部分:

start_time = time.time()
while True:
    for event_data in receiver.receive(timeout=5000):
        print("Received: ".format(event_data.body_as_str(encoding='UTF-8')))
        a = event_data.body_as_str(encoding='UTF-8')
        total += 1
    end_time = time.time()
    run_time = end_time - start_time
    print("Received  messages in  seconds".format(total, run_time))

【讨论】:

我遇到了同样的问题,但使用的是 Nodejs API。 nodejs 代码没有参数 keep_alive 。你知道如何解决这个问题吗?任何帮助表示赞赏。我现在正在寻找一个星期的解决方案

以上是关于超过每个分区允许的最大接收器数量 eventthub 中的错误的主要内容,如果未能解决你的问题,请参考以下文章

11g自动分区超过最大限制

将RDD划分为每个分区中元素数量固定的分区

如果群发邮件时数量超过 100 万,怎么在一个小时之内发完

减少 spotify URI 以接近允许轨道的最大数量

linux 打开文件数too many open files解决方法

分区数量超过消费者时的 Apache Kafka 消息消费