NServiceBus - 如何为每个接收者订阅的消息类型获取单独的队列?

Posted

技术标签:

【中文标题】NServiceBus - 如何为每个接收者订阅的消息类型获取单独的队列?【英文标题】:NServiceBus - How to get separate queue for each message type receiver subscribes to? 【发布时间】:2012-02-23 10:56:03 【问题描述】:

我有以下情况:

因此,接收者订阅了两种事件:eventA 和 eventB。 NServiceBus 为接收者(Receiver)创建队列,并将 eventA 和 eventB 类型的消息放置到同一个队列中。问题是,如果我可以将 NServiceBus 配置为对接收者的每种类型的事件使用单独的队列(ReceiverEventA 和 ReceiverEventB)?或者我可以在单个进程中有两个接收器(每个接收器单独的队列)。 问题是,EventA 的处理时间比 EventB 长得多,而且它们是独立的 - 所以如果它们位于不同的队列中,它们可以同时处理。

更新:如果我采用这样的幼稚方法,接收器无法以空引用异常开始:

 private static IBus GetBus<THandler, TEvent>()
                       
        var bus = Configure.With(new List<Type>
                                     
                                         typeof(THandler), 
                                         typeof(TEvent),
                                         typeof(CompletionMessage)
                                     )
            .Log4Net()
            .DefaultBuilder()
            .XmlSerializer()
            .MsmqTransport()
            .IsTransactional(true)
            .PurgeOnStartup(false)
            .UnicastBus()
            .LoadMessageHandlers()
            .ImpersonateSender(false);

        bus.Configurer.ConfigureProperty<MsmqTransport>(x => x.InputQueue, "Queue" + typeof(THandler).Name);

        return bus.CreateBus().Start();
    

    [STAThread]
    static void Main()
    
        Busses = new List<IBus>
                     
                         GetBus<ItemEventHandlerA, ItemEventA>(),
                         GetBus<ItemEventHandlerB, ItemEventB>()
                     ;          

        Application.EnableVisualStyles();
        Application.SetCompatibleTextRenderingDefault(false);
        Application.Run(new TestForm());
    

异常堆栈跟踪是:

在 NServiceBusTest2.WinFormsReceiver.Program.GetBusTHandler,TEvent 在 C:\Users\User\Documents\Visual Studio 2010\Projects\NServiceBusTest2\NServiceBusTest2.WinFormsReceiver\Program.cs:第 57 行 在 C:\Users\User\Documents\Visual Studio 2010\Projects\NServiceBusTest2\NServiceBusTest2.WinFormsReceiver\Program.cs:line 26 中的 NServiceBusTest2.WinFormsReceiver.Program.Main() 在 System.AppDomain._nExecuteAssembly(RuntimeAssembly 程序集,String[] args) 在 System.AppDomain.ExecuteAssembly(字符串 assemblyFile,证据 assemblySecurity,String [] args) 在 Microsoft.VisualStudio.HostingProcess.HostProc.RunUsersAssembly() 在 System.Threading.ThreadHelper.ThreadStart_Context(对象状态) 在 System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback 回调, 对象状态, Boolean ignoreSyncCtx) 在 System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback 回调, 对象状态) 在 System.Threading.ThreadHelper.ThreadStart()

【问题讨论】:

【参考方案1】:

我开始写一个更冗长的解释,说明为什么不应该有两个单独的进程,以支持我对@stephenl 发布的答案的评论。 NServiceBus 基本上对每个进程强制执行一个输入队列。

所以通常情况下,您将有两个独立的进程。 EventAService 会从 QForEventA 中读取 EventA,而 EventBService 会从 QForEventB 中读取 EventB。

然后我更仔细地查看了您的示例代码,并意识到您使用的是 Windows 窗体应用程序。呸!现在我觉得有点傻。当然你只能有一个进程。想象一下,如果在启动 Outlook 后,您还必须启动 MailService.exe 才能真正获取邮件!

所以问题实际上是您的 Windows 窗体应用程序中 EventA 和 EventB 的处理时间截然不同。我不知道那是什么工作,但这对于客户端应用程序来说有点奇怪。

大多数情况下,它是一项需要大量处理的服务,并且客户端收到的任何消息都相当轻量级 - 类似于“实体 X 已更改,因此下次您需要直接从数据库”和处理它只需要从缓存中删除一些东西——当然不是一个长时间运行的过程。

但无论出于何种原因,您的客户端中的处理都需要更长的时间,这在 WinForms 应用程序中是令人担忧的,因为担心 UI 线程编组、阻塞 UI 线程等。

我建议您不要在 WinForms 应用程序的 NServiceBus 处理程序中执行所有处理,而是将其编组到其他地方的系统。将其作为工作项或类似的东西扔到 ThreadPool 中。或者将长时间运行的项目放入队列中,并以自己的速度对这些项目进行后台线程处理。这样,所有 NServiceBus 消息处理程序所做的就是“是的,收到消息。非常感谢。”那么,NServiceBus 消息是否一次处理一条就无关紧要了。

更新:

在 cmets 中,OP 询问如果在 NServiceBus 完成接收工作后将工作扔到 ThreadPool 会发生什么。也就是说,当然,这种方法的另一面——在 NServiceBus 完成后,你自己一个人——如果它在 ThreadPool 中失败,那么由你来创建自己的重试逻辑,或者只是捕获异常,提醒 WinForms 应用程序的用户,让它死掉。

显然这是最佳选择,但它引出了一个问题 - 您究竟想在 WinForms 应用程序中完成什么样的工作?如果 NServiceBus 提供的稳健性(有害消息的自动重试和错误队列)是这个难题的关键部分,那么为什么它首先会在 WinForms 应用程序中发生呢?此逻辑可能需要卸载到 WinForms 应用程序外部的服务,其中为每种消息类型(通过部署单独的服务)设置单独的队列变得容易,然后只有影响 UI 的部分会被发送回 WinForms 客户端。当发送到 UI 的消息只影响 UI 时,处理它们几乎总是微不足道的,并且您不需要卸载到 ThreadPool 来跟上。

直接谈到您在GitHub Issue 中描述的情况,这听起来确实像是每种消息类型的单独进程正是规定的解决方案。我听说部署和管理这么多流程听起来让人不知所措,但我想您会发现它并不像听起来那么糟糕。甚至还有一些优势 - 例如,如果您必须重新部署与 Amazon.com 的连接器,您只需要重新部署那个端点,其他任何一个都不会停机,或者担心可能会向其他端点引入错误。

为了简化部署,希望您使用的是持续集成服务器,然后签入诸如 DropkicK 之类的工具,这些工具可以帮助脚本化部署。就个人而言,我最喜欢的部署工具是老式的 Robocopy。即使像 1) NET STOP ServiceName, 2) ROBOCOPY, 3) NET START ServiceName 这样简单的东西也很有效。

【讨论】:

我正在考虑将 ThreadPool 作为工作项,但如果它在那里失败了怎么办?处理程序将完成它的工作,因此消息将从队列中删除,并且没有标准的方法可以将其放回那里(或者我错过了什么?)。我们使用 Windows 窗体作为流程管理工具 - 我在这里提到了更具体的示例:github.com/NServiceBus/NServiceBus/issues/219 感谢您的帮助。回答您的问题 - WinForms 用于让仪表板暂停/恢复,还可以查看销售渠道的当前状态 - 这是什么处理。我们找到了一种让多个接收器使用单独的 AppDomain 的方法,但总的来说,这是一种痛苦的解决方法。看起来我们必须要么接受 AppDomains 的解决方法,要么寻找 NServiceBus 的替代方案,要么重新考虑部署(现在我们有 cc.net + clickOnce)并将仪表板和处理程序设置为单独的部分。 我想你可能已经猜到我的建议是后一种选择。 ;-) 祝你好运!【参考方案2】:

你不需要,你只需要为接收器中的每个事件创建一个处理程序。例如

public class EventAHandler : IHandleMessages<EventA>

public class EventBHandler : IHandleMessages<EventB>

如果您想分离队列,则需要将每个处理程序放入单独的端点。这有意义吗?

另外,我认为您的图表需要一些工作。我敢肯定,这是一个标签,但我认为 Host 是实际的发布者,而不是客户端端点(您将其命名为 Publisher A 和 Publisher B)。

更新:这是一个简单的例子,但说明了我的意思 - https://github.com/sliedig/sof9411638

我已经扩展了 NServiceBus 附带的全双工示例,以包括三个额外的端点,其中两个分别订阅服务器发布的事件,一个端点处理这两个端点。 HTH。

【讨论】:

关于命名,这只是示例,所以您可能是对的,这些更像是发件人,主机是发布者。关于队列,如果两个事件都被推送到同一个队列中,是不是如果EventA先来,处理时间长,EventB会等到EventA处理完?我认为如果他们在不同的队列中,他们可以同时处理或者我错了吗? 如果您将其配置为使用多个线程,那么它们相互阻塞可能不会有太大问题。话虽如此,您可能仍然有大量 EventAs 涌入,这将占用所有线程并阻止 EventB 的处理。如果这是您非常关心的事情,您应该有一个单独的 EventB 接收端点。 好吧,据我了解,将其配置为使用多线程需要商业许可证 :) 而且我们没有那么多消息,为此购买商业许可证,仅 EventA 和 EventB 的平均处理时间非常不同的。您能否指出具有两个接收端点的接收器样本? 接收者不会有两个接收端点。只有两个接收器。在上图中,您将拆分 Receiver,以便让 ReceiverA(带有输入队列 ReceiverAQueue)订阅 EventA,而 ReceiverB(带有输入队列 ReceiverBQueue)订阅 EventB。然后可以根据每种事件类型的 SLA 要求单独扩展每个(逻辑)接收器(使用线程数或分配器和多个物理工作进程)。 @DavidBoike,如果你能给出一个非常简短的例子在代码中的样子,我会把它标记为答案。

以上是关于NServiceBus - 如何为每个接收者订阅的消息类型获取单独的队列?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用Jaeger获得Nservicebus的痕迹?

应用程序启动后,如何为用户订阅主题以获取通知?

无法从 ASP.NET Core 6.0 API NServiceBus 端点订阅事件

如何为源和接收器下拉列表设置不同的 ckdDragPlaceholders?

如何为特定令牌设置静默通知?

如何为本地通知界面编写适当的测试?