如何在我的 C# 应用程序中限制来自 ActiveMQ 的消息量?

Posted

技术标签:

【中文标题】如何在我的 C# 应用程序中限制来自 ActiveMQ 的消息量?【英文标题】:How can I throttle the amount of messages coming from ActiveMQ in my C# app? 【发布时间】:2011-05-15 17:02:14 【问题描述】:

我在 .Net 程序中使用 ActiveMQ,我被消息事件淹没了。

简而言之,当我收到队列事件“onMessage(IMessage receivedMsg)”时,我将消息放入一个内部队列,X 线程在其中执行它们的工作。

起初我在创建会话时有:'AcknowledgementMode.AutoAcknowledge',所以我猜测队列中的所有消息都被吸入并放入内存队列(这是有风险的,因为崩溃,一切都丢失了)。

然后我在创建会话时使用:'AcknowledgementMode.ClientAcknowledge',当工作人员准备好消息时,它会调用消息上的'commit()' 方法。但是,仍然会从队列中吸出所有消息。

我如何配置它只处理 X 数量的消息或在内部队列中,而不是立即“下载”所有内容?

【问题讨论】:

虽然我从来没有接触过 ActiveMQ ——你能不能不翻一下:用 ActiveMQ 监听器设置 N 个线程,并在监听事件中处理消息? @will Hughes:我的整个程序都是以异步方式编写的。因此,即使是数据库访问也是通过回调异步完成的。所以我不能留在活动中。 【参考方案1】:

您使用的是 .NET 4.0 吗?您可以使用 BlockingCollection 。将其设置为它可能包含的最大数量。一旦线程尝试放入多余的元素,添加操作将阻塞,直到集合再次低于阈值。

也许这样可以进行节流?

Rx框架中也有一个用于节流的API,但不知道它是如何实现的。如果您将 Queue 源实现为 Observable,则该 API 将可供您使用,但我不知道这是否符合您的需求。

【讨论】:

很好的答案!奇迹般有效。谢谢【参考方案2】:

您可以设置客户端预取来控制客户端将发送多少条消息。当 Session 处于 Auto Ack 时,客户端只会在通过 onMessage 回调或通过同步接收将消息传递到您的应用程序后才确认消息。默认情况下,客户端将从代理预取 1000 条消息,如果客户端出现故障,这些消息将重新传递给另一个客户端,这是一个队列,否则对于一个主题,它们将被丢弃,因为主题是基于广播的频道。如果您将预取设置为一个,那么您的客户端只会从服务器发送一条消息,然后每次您的 onMessage 回调完成时,都会发送一条新消息,因为客户端会确认该消息,即如果会话处于自动确认状态模式。

有关所有选项,请参阅 NMS 配置页面: http://activemq.apache.org/nms/configuring.html

问候

蒂姆。 FuseSource.com

【讨论】:

bish:不幸的是,即使我设置为 clientacknowledge,队列也会触发与队列中的消息一样多的事件。我用队列中的 10.000 个消息对其进行了测试,它们会在我不承认任何事情的情况下被解雇。停止它的唯一方法是停止 onMessage 直到我处理它。 听起来你的编程模型会更适合使用同步接收调用,然后你可以随意限制它。消息系统的重点是尽可能快地传递消息,您的应用需要处理节流。

以上是关于如何在我的 C# 应用程序中限制来自 ActiveMQ 的消息量?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用过滤器限制来自地址栏的所有请求

来自运行时数据层的 C# 中的动态 odata 服务

如何在 ASP.NET 中显示来自 C# 的警告框?

如何在 C# 中获取当前用户的 Active Directory 详细信息

在数据流任务中,如何使用来自另一个源的值限制行流?

是否可以在 C# 中限制公共枚举值?