当我尝试使用 pika (python) 向 RabbitMQ 确认消息时出现错误“未知的传递标签”
Posted
技术标签:
【中文标题】当我尝试使用 pika (python) 向 RabbitMQ 确认消息时出现错误“未知的传递标签”【英文标题】:Error "unknown delivery tag" occurs when i try ack messages to RabbitMQ using pika (python) 【发布时间】:2012-03-12 15:27:26 【问题描述】:我想在几个线程中处理消息,但在执行此代码时出现错误:
from __future__ import with_statement
import pika
import sys
from pika.adapters.blocking_connection import BlockingConnection
from pika import connection, credentials
import time
import threading
import random
from pika.adapters.select_connection import SelectConnection
from pika.connection import Connection
import traceback
def doWork(body, args, channel):
r = random.random()
time.sleep(r * 10)
try:
channel.basic_ack(delivery_tag=args.delivery_tag)
except :
traceback.print_exc()
auth = credentials.PlainCredentials(username="guest", password="guest")
params = connection.ConnectionParameters(host="localhost", credentials=auth)
conn = BlockingConnection(params)
channel = conn.channel()
while True:
time.sleep(0.03)
try:
method_frame, header_frame, body = channel.basic_get(queue="test_queue")
if method_frame.NAME == 'Basic.GetEmpty':
continue
t = threading.Thread(target=doWork, args=[body, method_frame, channel])
t.setDaemon(True)
t.start()
except Exception, e:
traceback.print_exc()
continue
错误说明:
回溯(最近一次通话最后): 文件“C:\work\projects\mq\start.py”,第 43 行,在 method_frame, header_frame, body = channel.basic_get(queue="test_queue") 文件“C:\work\projects\mq\libs\pika\adapters\blocking_connection.py”,第 318 行,在 basic_get self.basic_get_(self, self._on_basic_get, ticket, queue, no_ack) 文件“C:\work\projects\mq\libs\pika\channel.py”,第 469 行,在 basic_get no_ack=no_ack)) 文件“C:\work\projects\mq\libs\pika\adapters\blocking_connection.py”,第 244 行,在 send_method 中 self.connection.process_data_events() 文件“C:\work\projects\mq\libs\pika\adapters\blocking_connection.py”,第 94 行,在 process_data_events self._handle_read() _handle_read 中的文件“C:\work\projects\mq\libs\pika\adapters\base_connection.py”,第 162 行 self._on_data_available(数据) _on_data_available 中的文件“C:\work\projects\mq\libs\pika\connection.py”,第 589 行 框架)#参数 文件“C:\work\projects\mq\libs\pika\callback.py”,第 124 行,正在进行中 回调(*参数,**关键字) _on_remote_close 中的文件“C:\work\projects\mq\libs\pika\adapters\blocking_connection.py”,第 269 行 frame.method.reply_text) AMQPChannelError: (406, 'PRECONDITION_FAILED - 未知的交付标签 204')版本:pika 0.9.5、rabbitMQ 2.6.1
【问题讨论】:
昨天我尝试使用 py-amqplib 库代替 pika。它工作得很好。 pika 库中可能存在问题。 如果你想在多个线程之间共享你的代码,你应该使用一个线程安全的库,比如 rabbitpy 或 amqp-storm。不确定 py-amqplib 是否是线程安全的。 github.com/eandersson/amqp-storm 【参考方案1】:问题可能是您将no_ack=True
设置为这样:
consumer_tag = channel.basic_consume(
message_delivery_event,
no_ack=True,
queue=queue,
)
然后确认消息:
channel.basic_ack(delivery_tag=args.delivery_tag)
您必须选择是否要确认并设置正确的消费参数。
【讨论】:
我的代码的根本原因是同步问题和配置问题。我有一个简单的包装来创建 rabbitmq 消费者。当使用临时队列(channel.queueDeclare("", false, true, true, args).getQueue())时,需要在多线程环境中使用同步来保护 nextDelivery。这意味着,如果您收到一条消息,您需要在使用任何其他消息之前确认它。否则调用ack时会抛出异常,consume时会一直抛出异常... 这正是我遇到的问题,非常感谢。 我在合并失败后收到此错误,其中一条消息使用相同的传递标签被确认两次【参考方案2】:对我来说,我只是告诉队列我不会确认,然后我确认了。
例如错误:
channel.basic_consume(callback, queue=queue_name, no_ack=True)
然后在我的回调中:
def callback(ch, method, properties, body):
# do stuff
ch.basic_ack(delivery_tag = method.delivery_tag)
正确:
channel.basic_consume(callback, queue=queue_name, no_ack=False)
底线:如果要手动确认,请设置 no_ack=False。
来自文档:
没有确认: (布尔) 如果设置为 True,将使用自动确认模式(请参阅http://www.rabbitmq.com/confirms.html)
【讨论】:
谢谢。这真的很有用。我看到的问题是参数名称(no_ack 或 .net 中的 noAck)有点令人困惑。我觉得它应该被称为“ack”,如果你通过 true 它将确认消息。【参考方案3】:您的代码存在错误。您跨线程共享通道。 pika 不支持此功能(请参阅FAQ)。你有两个选择:
-
在
basic_get(...)
中定义no_ack=True
标志并且不要在线程函数doWork(...)
中使用通道对象
如果您需要在完成工作后才确认消息,则让主线程(while True:
循环)处理消息确认(而不是工作线程)。下面是执行此操作的代码的修改版本。
from __future__ import with_statement
import pika
import sys
from pika.adapters.blocking_connection import BlockingConnection
from pika import connection, credentials
import time
import threading
import random
from pika.adapters.select_connection import SelectConnection
from pika.connection import Connection
import traceback
from Queue import Queue, Empty
def doWork(body, args, channel, ack_queue):
time.sleep(random.random())
ack_queue.put(args.delivery_tag)
def doAck(channel):
while True:
try:
r = ack_queue.get_nowait()
except Empty:
r = None
if r is None:
break
try:
channel.basic_ack(delivery_tag=r)
except:
traceback.print_exc()
auth = credentials.PlainCredentials(username="guest", password="guest")
params = connection.ConnectionParameters(host="localhost", credentials=auth)
conn = BlockingConnection(params)
channel = conn.channel()
# Create a queue for the messages that should be ACKed by main thread
ack_queue = Queue()
while True:
time.sleep(0.03)
try:
doAck(channel)
method_frame, header_frame, body = channel.basic_get(queue="test_queue")
if method_frame.NAME == 'Basic.GetEmpty':
continue
t = threading.Thread(target=doWork, args=[body, method_frame, channel, ack_queue])
t.setDaemon(True)
t.start()
except Exception, e:
traceback.print_exc()
continue
【讨论】:
【参考方案4】:我没有修复,但我可以使用 BlockingConnection 验证它是否发生 适配器。
当确认或拒绝响应 channel.basic_recover() 重新传递的消息时,总是会发生这种情况
pika 0.9.5、rabbitMQ 2.2.0、python 2.7 和 Erlang R14B01
我的解决方法是始终指定 Deliver_tag=0
我怀疑这仅在您正在确认/确认的消息是您阅读的最后一条消息(在流中)时才有效。我正在编写的库以一种可以独立确认每个消息的方式对消息进行抽象,这与此解决方案不同。
任何人都可以确认这是否已被 pika 团队中的任何人修复或确认吗?或者,这可能是 RabbitMQ 的问题吗?
【讨论】:
我在 node-amqp 中看到了这个错误,所以它一定是 RabbitMQ(版本 3.0.2-1)的问题。【参考方案5】:看到RabbitMQ - upgraded to a new version and got a lot of "PRECONDITION_FAILED unknown delivery tag 1"之后
我将基本消耗更改为如下所示:
consumer_tag = channel.basic_consume(
message_delivery_event,
no_ack=True,
queue=queue,
)
当指定消息的传递标签时,这会在初始(未重新传递)确认中导致所描述的错误。传递是从消息传递的方法结构中提取的。
使用
channel.basic_ack(delivery_tag=0)
在这种情况下也抑制错误
查看http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2011-July/013664.html 看起来好像它可能是 RabbitMQ 中的一个问题。
【讨论】:
【参考方案6】:这个问题是因为你设置了 noack: true ,但仍然试图发送确认。
【讨论】:
【参考方案7】:如果您尝试在创建消息的不同通道上确认消息,您也可能会遇到此错误。如果您正在关闭或重新创建频道,则可能会发生这种情况。
来自文档:https://www.rabbitmq.com/confirms.html
代理会抱怨“未知传递标签”的另一种情况是,当尝试在与接收传递的通道不同的通道上进行确认时,无论是肯定的还是否定的。必须在同一渠道上确认交付。
【讨论】:
以上是关于当我尝试使用 pika (python) 向 RabbitMQ 确认消息时出现错误“未知的传递标签”的主要内容,如果未能解决你的问题,请参考以下文章
获取 pika.exceptions.StreamLostError:传输指示 EOF 在运行使用 pika 的 python 脚本 docker 映像时
python 使用`pika.BlockingConnection`的RabbitMQ客户端,主要用于发布。