如何使用 EasyNetQ / RabbitMQ 进行错误处理
Posted
技术标签:
【中文标题】如何使用 EasyNetQ / RabbitMQ 进行错误处理【英文标题】:How to do error handling with EasyNetQ / RabbitMQ 【发布时间】:2015-09-04 01:11:12 【问题描述】:我在 C# 中使用带有 EasyNetQ 库的 RabbitMQ。我在这里使用 pub/sub 模式。我还有一些问题希望有人能帮助我解决:
-
当使用消息时出现错误,它会自动移动到错误队列中。如何实现重试(以便将其放回原始队列,并且当它无法处理 X 次时,它会被移至死信队列)?
据我所知,总是有 1 个错误队列用于转储来自所有其他队列的消息。如何为每种类型设置 1 个错误队列,以便每个队列都有自己关联的错误队列?
如何轻松重试错误队列中的消息?我试过软管,但它只是将消息重新发布到错误队列而不是原始队列。我也不太喜欢这个选项,因为我不想在控制台中摆弄。我最好只针对错误队列进行编程。
有人吗?
【问题讨论】:
【参考方案1】:您在使用 EasyNetQ/RabbitMQ 时遇到的问题是,与 SQS 或 Azure 服务总线/队列等其他消息传递服务相比,它更加“原始”,但我会尽力为您指明正确的方向.
问题 1。
这将由您来完成。最简单的方法是你可以在 RabbitMQ/EasyNetQ 中 No-Ack 一条消息,它会被放在队列的头部让你重试。这并不是真正可取的,因为它几乎会立即重试(没有时间延迟),并且还会阻止其他消息被处理(如果您有一个预取计数为 1 的单个订阅者)。
我见过使用“MessageEnvelope”的其他实现。所以一个包装类,当消息失败时,您在 MessageEnvelope 上增加一个重试变量并将消息重新传递回队列。您必须这样做并在消息处理程序周围编写包装代码,这不是 EasyNetQ 的功能。
使用上述方法,我还看到人们使用信封,但允许邮件是死信。一旦它进入死信队列,就会有另一个应用程序/工作人员从死信队列中读取项目。
上述所有这些方法都有一个小问题,即在处理消息时实际上没有任何好的方法可以使对数/指数/任何类型的延迟增加。在将消息返回到队列之前,您可以在代码中“保留”消息一段时间,但这不是一个好的方法。
在所有这些选项中,您自己的自定义应用程序读取死信队列并根据包含重试计数的信封决定是否重新路由消息可能是最好的方法。
问题 2。
您可以使用高级 API 为每个队列指定死信交换。 (https://github.com/EasyNetQ/EasyNetQ/wiki/The-Advanced-API#declaring-queues)。然而,这意味着您将不得不在几乎所有地方使用高级 API,因为使用订阅/发布的简单 IBus 实现查找基于消息类型和订阅者名称命名的队列。使用自定义队列声明意味着您将自己处理队列的命名,这意味着当您订阅时,您需要知道您想要的名称等。不再为您自动订阅!
问题 3
错误队列/死信队列只是另一个队列。您可以收听此队列并使用它做您需要做的事情。但实际上并没有任何开箱即用的解决方案听起来适合您的需求。
【讨论】:
我们发现标准 EasyNetQ 实现除了与一些共享 .NET 类绑定的单个快速演示之外,对于初次使用的用户来说确实没有实际用途。之后,切换到 Easy 的超简单“高级”API。是的,你可以做一些高级的事情,但老实说,这是一个非常简单易用的 API。绝对是 Easy 的高级 API 的粉丝,适用于所有工作。【参考方案2】:我已经完全按照您的描述实现了。以下是基于我的经验并与您的每个问题相关的一些提示。
Q1(如何重试X次):
为此,您可以使用IMessage.Body.BasicProperties.Headers
。当您使用错误队列中的消息时,只需添加一个带有您选择的名称的标题。在进入错误队列的每条消息上查找此标头并将其递增。这将为您提供正在运行的重试计数。
非常重要当消息超过 X 的重试限制时,您有一个策略来处理。您不想丢失该消息。就我而言,我当时将消息写入磁盘。它为您提供了许多有用的调试信息以供稍后使用,因为 EasyNetQ 会自动将您的原始消息与错误信息一起包装。它还具有原始消息,因此您可以根据需要手动(或通过一些批处理重新处理代码自动)稍后以某种受控方式重新排列消息。
您可以查看 Hosepipe 实用程序中的代码以了解执行此操作的好方法。事实上,如果您遵循您在此处看到的模式,那么您甚至可以稍后根据需要使用 Hosepipe 将消息重新排队。
Q2(如何为每个原始队列创建一个错误队列):
您可以使用 EasyNetQ Advanced Bus 干净利落地执行此操作。使用IBus.Advanced.Container.Resolve<IConventions>
进入约定界面。然后您可以使用conventions.ErrorExchangeNamingConvention
和conventions.ErrorQueueNamingConvention
设置错误队列命名约定。在我的例子中,我将约定设置为基于原始队列的名称,这样每次创建队列时我都会得到队列/queue_error 对。
Q3(如何处理错误队列中的消息):
您可以像对任何其他队列一样为错误队列声明消费者。同样,AdvancedBus 允许您通过指定从队列中出来的类型是EasyNetQ.SystemMessage.Error
来干净地完成此操作。所以,IAdvancedBus.Consume<EasyNetQ.SystemMessage.Error>()
会带你到那里。重试只是意味着重新发布到原始交换(注意您放入标头的重试计数(请参阅我对上面 Q1 的回答),并且您从错误队列中消耗的错误消息中的信息可以帮助您找到目标重新发布。
【讨论】:
我搞定了,看这里的例子***.com/questions/32077044/…【参考方案3】:我知道这是一篇旧帖子,但是 - 以防万一它对其他人有所帮助 - 这里是 my self-answered question(我需要问它,因为现有的帮助还不够),它解释了我如何实现在其原始队列上重试失败的消息.以下应该回答您的问题#1 和#3。对于#2,您可能必须使用我没有使用过的高级 API(我认为它违背了 EasyNetQ 的目的;还不如直接使用 RabbitMQ 客户端)。不过,也请考虑实施 IConsumerErrorStrategy。
1) 由于一条消息可以有多个消费者,并且所有人都可能不需要重试消息,我在消息正文中有一个Dictionary<consumerId, RetryInfo>
,因为 EasyNetQ 不(开箱即用)支持复杂类型在邮件标题中。
public interface IMessageType
int MsgTypeId get;
Dictionary<string, TryInfo> MsgTryInfo get; set;
2) 我已经实现了一个class RetryEnabledErrorMessageSerializer : IErrorMessageSerializer
,它只会在框架每次调用它时更新 TryCount 和其他信息。我通过 EasyNetQ 提供的 IoC 支持将这个自定义序列化程序附加到每个消费者的框架中。
public class RetryEnabledErrorMessageSerializer<T> : IErrorMessageSerializer where T : class, IMessageType
public string Serialize(byte[] messageBody)
string stringifiedMsgBody = Encoding.UTF8.GetString(messageBody);
var objectifiedMsgBody = JObject.Parse(stringifiedMsgBody);
// Add/update RetryInformation into objectifiedMsgBody here
// I have a dictionary that saves <key:consumerId, val: TryInfoObj>
return JsonConvert.SerializeObject(objectifiedMsgBody);
在我的 EasyNetQ 包装类中:
public void SetupMessageBroker(string givenSubscriptionId, bool enableRetry = false)
if (enableRetry)
_defaultBus = RabbitHutch.CreateBus(currentConnString,
serviceRegister => serviceRegister.Register<IErrorMessageSerializer>(serviceProvider => new RetryEnabledErrorMessageSerializer<IMessageType>(givenSubscriptionId))
);
else // EasyNetQ's DefaultErrorMessageSerializer will wrap error messages
_defaultBus = RabbitHutch.CreateBus(currentConnString);
public bool SubscribeAsync<T>(Func<T, Task> eventHandler, string subscriptionId)
IMsgHandler<T> currMsgHandler = new MsgHandler<T>(eventHandler, subscriptionId);
// Using the msgHandler allows to add a mediator between EasyNetQ and the actual callback function
// The mediator can transmit the retried msg or choose to ignore it
return _defaultBus.SubscribeAsync<T>(subscriptionId, currMsgHandler.InvokeMsgCallbackFunc).Queue != null;
3) 将消息添加到默认错误队列后,您可以拥有一个简单的控制台应用程序/Windows 服务,该服务会定期在其原始队列中重新发布现有错误消息。类似的东西:
var client = new ManagementClient(AppConfig.BaseAddress, AppConfig.RabbitUsername, AppConfig.RabbitPassword);
var vhost = client.GetVhostAsync("/").Result;
var aliveRes = client.IsAliveAsync(vhost).Result;
var errQueue = client.GetQueueAsync(Constants.EasyNetQErrorQueueName, vhost).Result;
var crit = new GetMessagesCriteria(long.MaxValue, Ackmodes.ack_requeue_false);
var errMsgs = client.GetMessagesFromQueueAsync(errQueue, crit).Result;
foreach (var errMsg in errMsgs)
var innerMsg = JsonConvert.DeserializeObject<Error>(errMsg.Payload);
var pubInfo = new PublishInfo(innerMsg.RoutingKey, innerMsg.Message);
pubInfo.Properties.Add("type", innerMsg.BasicProperties.Type);
pubInfo.Properties.Add("correlation_id", innerMsg.BasicProperties.CorrelationId);
pubInfo.Properties.Add("delivery_mode", innerMsg.BasicProperties.DeliveryMode);
var pubRes = client.PublishAsync(client.GetExchangeAsync(innerMsg.Exchange, vhost).Result, pubInfo).Result;
4) 我有一个包含回调函数的 MessageHandler 类。每当消息传递给消费者时,它都会转到 MessageHandler,它决定消息尝试是否有效,如果是,则调用实际的回调。如果尝试无效(maxRetriesExceeded/消费者无论如何都不需要重试),我会忽略该消息。在这种情况下,您可以选择对消息进行死信。
public interface IMsgHandler<T> where T: class, IMessageType
Task InvokeMsgCallbackFunc(T msg);
Func<T, Task> MsgCallbackFunc get; set;
bool IsTryValid(T msg, string refSubscriptionId); // Calls callback only
// if Retry is valid
这是MsgHandler
中调用回调的中介函数:
public async Task InvokeMsgCallbackFunc(T msg)
if (IsTryValid(msg, CurrSubscriptionId))
await this.MsgCallbackFunc(msg);
else
// Do whatever you want
【讨论】:
以上是关于如何使用 EasyNetQ / RabbitMQ 进行错误处理的主要内容,如果未能解决你的问题,请参考以下文章
EasyNetQ-用于使用 RabbitMQ 的 .NET API开源的工具库