超过每个分区允许的最大接收器数量 eventthub 中的错误
Posted
技术标签:
【中文标题】超过每个分区允许的最大接收器数量 eventthub 中的错误【英文标题】:Exceeded the maximum number of allowed receivers per partition Error in eventhub 【发布时间】:2019-10-31 02:57:11 【问题描述】:就像下面的代码一样,我不断地从 azure eventthub 接收数据。我经常看到错误提示“超出了每个分区允许的最大接收器数量”,我知道它来自哪里。
import os
import sys
import logging
import time
from azure.eventhub import EventHubClient, Receiver, Offset
logger = logging.getLogger("azure")
ADDRESS = ""
USER = ""
KEY = ""
CONSUMER_GROUP = "$default"
OFFSET = Offset("@latest")
PARTITION = "0"
total = 0
last_sn = -1
last_offset = "-1"
client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=0, offset=OFFSET)
client.run()
start_time = time.time()
while True:
for event_data in receiver.receive(timeout=5000):
print("Received: ".format(event_data.body_as_str(encoding='UTF-8')))
a = event_data.body_as_str(encoding='UTF-8')
total += 1
end_time = time.time()
run_time = end_time - start_time
print("Received messages in seconds".format(total, run_time))
这里的行是添加接收器的行,如果我添加超过五个接收器,它会达到每个分区可能接收器数量的限制。
receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=0
-
因此,我尝试删除带有功能的接收器。
比如使用
receiver.client.clients.remove
或receiver.client.clients.clear()
,清除之前添加的接收者。但是,这些方法似乎都不起作用。
我看到这个错误的原因是因为每当我需要停止运行脚本进行调试时,我都会运行上面的整个代码,所以每当我重新运行它时,我都必须运行receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=0
这一行
-
我还尝试只运行“add_receiver”行下方的部分代码
client.run()
start_time = time.time()
while True:
for event_data in receiver.receive(timeout=5000):
print("Received: ".format(event_data.body_as_str(encoding='UTF-8')))
a = event_data.body_as_str(encoding='UTF-8')
total += 1
end_time = time.time()
run_time = end_time - start_time
print("Received messages in seconds".format(total, run_time))
但是,我看到另一个错误说
EventHubError: This receive handler is now closed.
有什么方法可以解决这个问题?
【问题讨论】:
嗨,Brian,你能解释一下这个“如果我添加超过五个接收器,它会达到 xxxx 的限制”吗? @IvanYang 这意味着 Azure 事件中心限制了每个消费者的接收者数量,而我正在达到这个限制。 @IvanYang 抱歉,有错别字。 你说你知道这个错误来自哪里。你能给我解释一下吗?我遇到了同样的错误,最近才开始使用 azure 【参考方案1】:经过努力,我认为这可能是解决此问题的方法。
对于上面的第 22 行,我可以简单地添加“keep_alive”输入,例如:
receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=0, offset=OFFSET, keep_alive = 10000000)
这样,我可以让客户端接收器保持打开状态,而不会收到错误消息“EventHubError:此接收处理程序现已关闭。”,然后只需从下面运行唯一的部分:
start_time = time.time()
while True:
for event_data in receiver.receive(timeout=5000):
print("Received: ".format(event_data.body_as_str(encoding='UTF-8')))
a = event_data.body_as_str(encoding='UTF-8')
total += 1
end_time = time.time()
run_time = end_time - start_time
print("Received messages in seconds".format(total, run_time))
【讨论】:
我遇到了同样的问题,但使用的是 Nodejs API。 nodejs 代码没有参数 keep_alive 。你知道如何解决这个问题吗?任何帮助表示赞赏。我现在正在寻找一个星期的解决方案以上是关于超过每个分区允许的最大接收器数量 eventthub 中的错误的主要内容,如果未能解决你的问题,请参考以下文章