如何从我的连接自己的通道以外的其他通道恢复未确认的 AMQP 消息?
Posted
技术标签:
【中文标题】如何从我的连接自己的通道以外的其他通道恢复未确认的 AMQP 消息?【英文标题】:How can I recover unacknowledged AMQP messages from other channels than my connection's own? 【发布时间】:2011-10-27 03:40:50 【问题描述】:似乎我的 rabbitmq 服务器运行时间越长,未确认消息的问题就越多。我很想重新排队。实际上,似乎有一个 amqp 命令可以执行此操作,但它仅适用于您的连接正在使用的通道。我构建了一个小鼠兔脚本至少可以尝试一下,但是我要么遗漏了一些东西,要么无法以这种方式完成(用 rabbitmqctl 怎么样?)
import pika
credentials = pika.PlainCredentials('***', '***')
parameters = pika.ConnectionParameters(host='localhost',port=5672,\
credentials=credentials, virtual_host='***')
def handle_delivery(body):
"""Called when we receive a message from RabbitMQ"""
print body
def on_connected(connection):
"""Called when we are fully connected to RabbitMQ"""
connection.channel(on_channel_open)
def on_channel_open(new_channel):
"""Called when our channel has opened"""
global channel
channel = new_channel
channel.basic_recover(callback=handle_delivery,requeue=True)
try:
connection = pika.SelectConnection(parameters=parameters,\
on_open_callback=on_connected)
# Loop so we can communicate with RabbitMQ
connection.ioloop.start()
except KeyboardInterrupt:
# Gracefully close the connection
connection.close()
# Loop until we're fully closed, will stop on its own
connection.ioloop.start()
【问题讨论】:
你能解决这个问题吗? ***.com/questions/8296201/… SO 答案可能需要什么,这取决于您为什么还有其他频道仍然挂着未确认的消息。僵尸频道。不重复,因为这个主题是关于其他频道中的消息,而不是频道本身。 【参考方案1】:未确认消息是那些已通过网络传递给消费者但尚未被确认或拒绝的消息 - 但消费者尚未关闭最初接收它们的通道或连接。因此,代理无法确定消费者是否只是花了很长时间来处理这些消息,或者它是否已经忘记了它们。因此,它使它们处于未确认状态,直到消费者死亡或它们被确认或拒绝。
由于这些消息在未来仍可能由最初消费它们的仍然活着的消费者有效处理,因此(据我所知)您不能将另一个消费者插入其中并尝试对它们做出外部决策。您需要让您的消费者在处理每条消息时对其做出决定,而不是让旧消息不被确认。
【讨论】:
所以basic.recover 必须被消费者调用?我使用 celeryd 来管理连接。可以使用 celeryctl 将该恢复命令发送到响应不佳的队列(如果您熟悉...) @will 我对您使用 Celery 表示哀悼。 celery 开发人员根本不了解 AMQP,并创建了一个严重损坏的实现。您需要做出选择,要么摆脱 celery 并正确执行 AMQP,要么停止将 AMQP 与 celery 一起使用并改用像 Redis 这样简单的东西。我选择放弃 celery 并继续使用 AMQP。 那是相当的控诉。如果你不介意我问,celery 的 AMQP 实现没有正确执行怎么办? 我同意,我想听听你为什么认为 Celery 坏得这么厉害。它使用非常广泛,这是我第一次听到这种抱怨。【参考方案2】:如果消息未被确认,只有两种方法可以将它们重新放入队列:
basic.nack
此命令将导致消息被放回队列并重新传递。
与代理断开连接
此操作将强制将此通道中所有未确认的消息放回队列中。
注意:basic.recover 将尝试在同一频道(给同一消费者)上重新发布未确认的消息,这有时是所需的行为。
RabbitMQ spec for basic.recover and basic.nack
真正的问题是:为什么消息未被确认?
可能导致消息未确认的情况:
消费者获取了太多消息,然后没有足够快地处理和确认它们。
解决方案:预取尽可能少的消息。
有缺陷的客户端库(我目前在使用 pika 0.9.13 时遇到了这个问题。如果队列中有很多消息,那么一定数量的消息会被卡住,即使是几个小时后也是如此。 p>
解决方案:我必须多次重启消费者,直到所有未确认的消息都从队列中消失。
【讨论】:
您的鼠兔问题是否已报告?能给个链接吗? 这是一个 python 递归限制。关于无法递归 >1000 次的问题,这显然是 pika 0.9.13 发生的事情。 0.9.14 没有看到它。 终于找到报出问题的地方了:github.com/pika/pika/issues/286 rabbitmq.com/consumers.html#acknowledgement-timeout 新版本中有此设置以避免消费者超时。【参考方案3】:一旦所有工作人员/消费者停止,所有未确认的消息将进入就绪状态。
通过在ps aux
输出上确认grep
来确保停止所有工作人员,并在发现时停止/杀死它们。
如果您正在使用 supervisor 管理工作人员,显示为工作人员已停止,您可能需要检查僵尸。 Supervisor 报告该工作程序已停止,但在 ps aux 输出上 grepped 时,您仍然会发现僵尸进程正在运行。杀死僵尸进程将使消息恢复到就绪状态。
【讨论】:
您还可以通过使用 RabbitMQ 管理控制台来确定兔子连接是否被僵尸进程阻止,正如我在这里描述的那样:***.com/questions/11926077/…以上是关于如何从我的连接自己的通道以外的其他通道恢复未确认的 AMQP 消息?的主要内容,如果未能解决你的问题,请参考以下文章