如何处理 WCF 的 MSMQ 绑定中的消息失败

Posted

技术标签:

【中文标题】如何处理 WCF 的 MSMQ 绑定中的消息失败【英文标题】:How do I handle message failure in MSMQ bindings for WCF 【发布时间】:2010-09-10 01:45:13 【问题描述】:

我已经创建了一个 WCF 服务并且正在使用 netMsmqBinding 绑定。

这是一个简单的服务,它将 Dto 传递给我的服务方法并且不期望响应。消息被放置在一个 MSMQ 中,一旦被拾取就插入到一个数据库中。

确保没有数据丢失的最佳方法是什么。

我尝试了以下两种方法:

    抛出异常

    这会将消息放入死信队列以供手动阅读。我可以在我的服务开始时处理这个

    在绑定上设置receiveRetryCount="3"

    在 3 次尝试之后 - 瞬间发生,这似乎将消息留在队列中,但我的服务出错了。重新启动我的服务会重复此过程。

理想情况下,我想做以下事情:

尝试处理消息

如果失败,请等待该消息 5 分钟,然后重试。 如果该进程失败 3 次,则将消息移至死信队列。 重新启动服务会将死信队列中的所有消息推回队列,以便对其进行处理。

我能做到这一点吗?如果有怎么办? 您能否向我指出任何关于如何在我给定的场景中最好地利用 WCF 和 MSMQ 的好文章。

任何帮助将不胜感激。谢谢!

一些附加信息

我在 Windows XP 和 Windows Server 2003 上使用 MSMQ 3.0。 不幸的是,我无法使用针对 MSMQ 4.0 和 Vista/2008 的内置有害消息支持。

【问题讨论】:

【参考方案1】:

我认为使用 MSMQ(仅在 Vista 上可用)您可以这样做:

<bindings>
    <netMsmqBinding>
        <binding name="PosionMessageHandling"
             receiveRetryCount="3"
             retryCycleDelay="00:05:00"
             maxRetryCycles="3"
             receiveErrorHandling="Move" />
    </netMsmqBinding>
</bindings>

WCF 将在第一次调用失败后立即重试 ReceiveRetryCount 次。批处理失败后,消息被移动 到重试队列。延迟 RetryCycleDelay 分钟后,消息从重试队列移动到端点队列,并重试批处理。这将重复 MaxRetryCycle 时间。如果所有这些都失败,则根据可以移动的 receiveErrorHandling 处理消息 (对中毒队列)、拒绝、丢弃或故障

顺便说一句,关于 WCF 和 MSMQ 的好文章是来自 Juval Lowy 的 Progammig WCF 书的第 9 章

【讨论】:

这适用于 MSMQ 4.0 而不是 MSMQ 3.0【参考方案2】:

不幸的是,我被困在 Windows XP 和 Windows Server 2003 上,所以这对我来说不是一个选择。 - (我将在我的问题中重新澄清这一点,因为我在发布后发现了这个解决方案并意识到我无法使用它)

我发现一种解决方案是设置一个自定义处理程序,它将我的消息移动到另一个队列或毒队列并重新启动我的服务。 这对我来说似乎很疯狂。想象一下,我的 Sql Server 宕机了,服务多久会重新启动一次。

所以我最终做的是让线路发生故障并将消息留在队列中。 我还向我的系统日志记录服务记录了一条致命消息,表明这已经发生。 一旦我们的问题得到解决,我会重新启动服务,所有消息都会重新开始处理。

我意识到重新处理此消息或任何其他消息都会失败,那么为什么需要将此消息和其他消息移动到另一个队列。我不妨停止我的服务,并在一切正常时重新启动它。

aogan,你对 MSMQ 4.0 有完美的答案,但不幸的是,我不适合

【讨论】:

【参考方案3】:

如果您使用的是 SQL-Server,那么您应该使用分布式事务,因为 MSMQ 和 SQL-Server 都支持它。发生的情况是您将数据库写入包装在 TransactionScope 块中并仅在成功时调用 scope.Complete() 。如果失败,那么当您的 WCF 方法返回时,消息将被放回队列中以再次尝试。这是我使用的精简版代码:

    [OperationBehavior(TransactionScopeRequired=true, TransactionAutoComplete=true)]
    public void InsertRecord(RecordType record)
    
        try
        
            using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required))
            
                SqlConnection InsertConnection = new SqlConnection(ConnectionString);
                InsertConnection.Open();

                // Insert statements go here

                InsertConnection.Close();

                // Vote to commit the transaction if there were no failures
                scope.Complete();
            
        
        catch (Exception ex)
        
            logger.WarnException(string.Format("Distributed transaction failure for 0", 
                Transaction.Current.TransactionInformation.DistributedIdentifier.ToString()),
                ex);
        
     

我通过排队大量但已知数量的记录来测试这一点,让 WCF 启动大量线程来同时处理其中的许多(达到 16 个线程 - 一次队列中有 16 条消息),然后终止进程中的进程操作的中间。当程序重新启动时,从队列中读回消息并再次处理,就好像什么都没发生一样,并且在测试结束时数据库是一致的并且没有丢失的记录。

分布式事务管理器具有环境存在,当您创建 TransactionScope 的新实例时,它会自动在方法调用范围内搜索当前事务 - WCF 在弹出消息时应该已经创建了该事务退出队列并调用您的方法。

【讨论】:

嗨,克里斯。我确信通过将您的操作行为归因于 TransactionScopeRequired=true 否定了将您的 Sql 调用包装在事务范围中的需要,因为这已经完成了。话虽如此,我不确定您的回答与我的 MSMQ 问题有何关系。【参考方案4】:

SDK 中有一个示例可能对您的情况有用。基本上,它所做的是将 IErrorHandler 实现附加到您的服务,当 WCF 将消息声明为“毒药”(即,当所有配置的重试都已用尽时)时,它将捕获错误。该示例所做的是将消息移动到另一个队列,然后重新启动与该消息关联的 ServiceHost(因为它会在发现有害消息时发生故障)。

这不是一个非常漂亮的示例,但它可能很有用。不过有一些限制:

1- 如果您有多个与您的服务关联的端点(即通过多个队列公开),则无法知道有害消息到达哪个队列。如果您只有一个队列,这不会是问题.我还没有看到任何官方的解决方法,但我已经尝试了一种可能的替代方案,我在这里记录了:http://winterdom.com/weblog/2008/05/27/NetMSMQAndPoisonMessages.aspx

2- 一旦问题消息被移动到另一个队列,它就成为你的责任,所以一旦超时完成,你就可以将它移回处理队列(或者将新服务附加到该队列来处理它)。

老实说,在任何一种情况下,您都在这里看到了 WCF 本身并没有涵盖的一些“手动”工作。

我最近一直在从事一个不同的项目,我需要明确控制重试发生的频率,我当前的解决方案是创建一组重试队列并在重试队列和主处理之间手动移动消息基于一组计时器和一些启发式的队列,只使用原始 System.Messaging 东西来处理 MSMQ 队列。它似乎工作得很好,但如果你这样做会有几个问题。

【讨论】:

以上是关于如何处理 WCF 的 MSMQ 绑定中的消息失败的主要内容,如果未能解决你的问题,请参考以下文章

真实生产案例消息中间件如何处理消费失败的消息?

消息中间件 MQ 如何处理消费失败的消息?

RabbitMQ - Apache Camel 读取消息如何处理失败的消息

如何处理消息被订阅者客户端确认失败?

电商场景下,如何处理消费过程中的重复消息?

如果 DLL 的 app.config 应该在“主配置”中……我们如何处理 DLL 中的 WCF 引用?