Servicestack RabbitMQ:当 RabbitMqProducer 无法在 RPC 模式中重新声明临时队列时,无限循环填充死信队列
Posted
技术标签:
【中文标题】Servicestack RabbitMQ:当 RabbitMqProducer 无法在 RPC 模式中重新声明临时队列时,无限循环填充死信队列【英文标题】:Servicestack RabbitMQ: Infinite loop fills up dead-letter-queue when RabbitMqProducer cannot redeclare temporary queue in RPC-pattern 【发布时间】:2014-09-26 15:24:47 【问题描述】:当我声明一个临时回复队列为独占时(例如 rpc-pattern 中的匿名队列 (exclusive=true, autodelete=true)),响应消息无法发布到指定的回复队列(例如 message.replyTo=" amq.gen-Jg_tv8QYxtEQhq0tF30vAA") 因为 RabbitMqProducer.PublishMessage() 尝试使用不同的参数(exclusive=false)重新声明队列,这可以理解地导致错误。
不幸的是,在 RabbitMqProducer.PublishMessage() 中对 channel.RegisterQueue(queueName) 的错误调用似乎对传入队列中的请求消息进行了确认,因此,当 ServiceStack.Messaging.MessageHandler.DefaultInExceptionHandler 尝试确认请求消息时(到将其从传入队列中删除),消息仅停留在传入队列的顶部并再次被处理。此过程无限重复,每次迭代产生一个 dlq 消息,慢慢填满 dlq。
我想知道,
如果ServiceStack处理这种情况,当ServiceStack.RabbitMq.RabbitMqProducer无法声明响应队列时,正确 如果 ServiceStack.RabbitMq.RabbitMqProducer 必须始终在发布响应之前声明响应队列 如果最好有一些配置标志来省略所有交换和队列声明调用(在第一次初始化之外)。 RabbitMqProducer 会假设每个队列/交换都已正确设置并发布消息。(目前我们的客户端只是将其响应队列声明为exclusive=false,一切正常。但我真的很想使用rabbitmq的内置临时队列。)
MQ-Client 代码,需要简单的“SayHello”服务:
const string INQ_QUEUE_NAME = "mq:SayHello.inq";
const string EXCHANGE_NAME="mx.servicestack";
var factory = new ConnectionFactory() HostName = "192.168.179.110" ;
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
// Create temporary queue and setup bindings
// this works (because "mq:tmp:" stops RabbitMqProducer from redeclaring response queue)
string responseQueueName = "mq:tmp:SayHello_" + Guid.NewGuid().ToString() + ".inq";
channel.QueueDeclare(responseQueueName, false, false, true, null);
// this does NOT work (RabbitMqProducer tries to declare queue again => error):
//string responseQueueName = Guid.NewGuid().ToString() + ".inq";
//channel.QueueDeclare(responseQueueName, false, false, true, null);
// this does NOT work either (RabbitMqProducer tries to declare queue again => error)
//var responseQueueName = channel.QueueDeclare().QueueName;
// publish simple SayHello-Request to standard servicestack exchange ("mx.servicestack") with routing key "mq:SayHello.inq":
var props = channel.CreateBasicProperties();
props.ReplyTo = responseQueueName;
channel.BasicPublish(EXCHANGE_NAME, INQ_QUEUE_NAME, props, Encoding.UTF8.GetBytes("\"ToName\": \"Chris\""));
// consume response from response queue
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(responseQueueName, true, consumer);
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
// print result: should be "Hello, Chris!"
Console.WriteLine(Encoding.UTF8.GetString(ea.Body));
当 RabbitMqProducer 不尝试声明队列时,一切似乎都正常工作,就像这样:
public void PublishMessage(string exchange, string routingKey, IBasicProperties basicProperties, byte[] body)
const bool MustDeclareQueue = false; // new config parameter??
try
if (MustDeclareQueue && !Queues.Contains(routingKey))
Channel.RegisterQueueByName(routingKey);
Queues = new HashSet<string>(Queues) routingKey ;
Channel.BasicPublish(exchange, routingKey, basicProperties, body);
catch (OperationInterruptedException ex)
if (ex.Is404())
Channel.RegisterExchangeByName(exchange);
Channel.BasicPublish(exchange, routingKey, basicProperties, body);
throw;
【问题讨论】:
看起来像是特定的应用程序逻辑问题。声明操作对于队列和交换都是幂等的,因此您遇到的问题是尝试使用不同的标志或参数重新声明队列。 谢谢,但我已经明白了。 (我编辑了我的问题以使其更清楚。)我的观点是,在我看来,servicestack 的 messageproducer 似乎可以处理无法很好地声明指定的响应队列的情况(=>无限循环填满 dlq)。 【参考方案1】:该问题已在 servicestack 的 v4.0.32 版本中得到解决(已在此 commit 中修复)。
RabbitMqProducer 不再尝试重新声明临时队列,而是假定回复队列已经存在(这解决了我的问题。)
(无限循环的根本原因(发布响应消息时错误处理错误)可能仍然存在。)
编辑:示例
以下基本 mq-client(不使用 ServiceStackmq 客户端,而是直接依赖于 rabbitmq 的 .net-library;不过它使用 ServiceStack.Text 进行序列化)可以执行通用 RPC:
public class MqClient : IDisposable
ConnectionFactory factory = new ConnectionFactory()
HostName = "192.168.97.201",
UserName = "guest",
Password = "guest",
//VirtualHost = "test",
Port = AmqpTcpEndpoint.UseDefaultPort,
;
private IConnection connection;
private string exchangeName;
public MqClient(string defaultExchange)
this.exchangeName = defaultExchange;
this.connection = factory.CreateConnection();
public TResponse RpcCall<TResponse>(IReturn<TResponse> reqDto, string exchange = null)
using (var channel = connection.CreateModel())
string inq_queue_name = string.Format("mq:0.inq", reqDto.GetType().Name);
string responseQueueName = channel.QueueDeclare().QueueName;
var props = channel.CreateBasicProperties();
props.ReplyTo = responseQueueName;
var message = ServiceStack.Text.JsonSerializer.SerializeToString(reqDto);
channel.BasicPublish(exchange ?? this.exchangeName, inq_queue_name, props, UTF8Encoding.UTF8.GetBytes(message));
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(responseQueueName, true, consumer);
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
//channel.BasicAck(ea.DeliveryTag, false);
string response = UTF8Encoding.UTF8.GetString(ea.Body);
string responseType = ea.BasicProperties.Type;
Console.WriteLine(" [x] New Message of Type '1' Received:20", response, responseType, Environment.NewLine);
return ServiceStack.Text.JsonSerializer.DeserializeFromString<TResponse>(response);
~MqClient()
this.Dispose();
public void Dispose()
if (connection != null)
this.connection.Dispose();
this.connection = null;
要点:
客户端声明匿名队列(=队列名称为空)channel.QueueDeclare()
服务器生成队列并返回队列名称(amq.gen*)
客户端将队列名称添加到消息属性 (props.ReplyTo = responseQueueName;
)
ServiceStack 自动向临时队列发送响应
客户端获取响应并反序列化
可以这样使用:
using (var mqClient = new MqClient("mx.servicestack"))
var pingResponse = mqClient.RpcCall<PingResponse>(new Ping );
重要提示:您必须使用 servicestack 版本 4.0.32+。
【讨论】:
那么您能够让 RPC 以可靠的多线程方式工作吗?介意发布您的 SayHello 服务吗? 在我的答案中添加了示例代码。请注意,此代码不在生产中。 (我们从 java web 服务执行 rpc 调用。)以上是关于Servicestack RabbitMQ:当 RabbitMqProducer 无法在 RPC 模式中重新声明临时队列时,无限循环填充死信队列的主要内容,如果未能解决你的问题,请参考以下文章
ServiceStack.Redis常用操作 - 事务并发锁_转