如何从现实世界中的多个队列中读取?

Posted

技术标签:

【中文标题】如何从现实世界中的多个队列中读取?【英文标题】:How to read from multiple queues in real-world? 【发布时间】:2012-06-13 11:06:06 【问题描述】:

这是一个理论问题:

当我使用消息队列构建应用程序时,我将需要多个队列支持不同的数据类型以用于不同的目的。假设我有 20 个队列(例如,一个用于创建新用户,一个用于处理新订单,一个用于编辑用户设置等)。

我将使用“最少”1 个 Web 角色和 1 个辅助角色将其部署到 Windows Azure。

如何以正确的方式读取所有这 20 个队列?这是我的想法,但我几乎没有或根本没有这方面的实际实践经验:

创建一个在工作角色“主”类中产生 20 个线程的类。让这些线程中的每一个执行一个方法来轮询不同的队列,并让所有这些线程在每次轮询之间休眠(当然还有增加休眠时间的退避机制)。

这导致有 20 个线程(或 21 个?)和 20 个正在被主动轮询的队列,从而导致大量浪费的消息(每次轮询一个空队列时,它都被计为一条消息)。

你如何解决这个问题?

【问题讨论】:

【参考方案1】:

我阅读了其他答案(非常好的答案)并想对此发表自己的看法。

坚持使用 Windows Azure 队列,正如 @Lucifure 所描述的那样:我真的不认为需要多个队列,除了两种情况:

您需要不同的优先级。您最不想要的就是一条高优先级消息被困在数百条低优先级消息后面。为这些创建一个高优先级队列。 消息读取+删除的数量将超过每秒 500 个事务的目标。在这种情况下,创建多个队列,以将事务量分散到存储分区(存储帐户每秒将处理超过 5K 的事务)。

如果您坚持使用单个队列(基于存储,而不是服务总线),您可以一次读取消息块(最多 32 个)。您可以轻松制定一种格式来帮助您区分消息类型(可能带有简单的前缀)。然后,只需将消息交给适当的线程进行处理。服务总线队列没有多消息读取,尽管它们允许预取(这会导致缓冲消息被下载到缓存中)。

一个队列优于多个队列:您消除(或大大减少)“许多队列没有消息,导致空读取”的问题。

如果您需要更高的吞吐量,您可以随时增加执行队列读取和分派的线程数。

记住每次删除都是原子的;没有批处理。就队列轮询而言:您考虑退避是正确的。成功阅读消息(或消息块)后,您无需后退。当您尝试阅读后没有得到任何东西时,请退后。

与服务总线队列相比,一个很好的优势是:Windows Azure 队列为您提供了一个近似的消息计数(这在考虑向多个实例横向扩展时非常有用)。服务总线队列不提供此功能。

【讨论】:

但是当你收到消息时,你怎么知道如何处理它呢?假设我们有一个存储 3 种类型对象的队列:订单、客户、产品。我们收到一个“产品”对象。您如何知道是否应该添加、更新或删除产品?除了为每个目的创建一个队列之外,我认为没有干净的方法来处理这个问题。 只需用一些独特的前缀格式化消息。然后,您的队列读取代码会查看前缀并决定如何处理每条消息。例如:RENDER|\raw\image1.jpg|\rendered\image1.jpgTHUMBNAIL|\raw\image1.jpg|\thumbs\image1.jpg。通过分隔符'|' 解析,检查它是哪种消息类型,并将其传递给适当的线程。注意:队列消息是二进制或字符串。想出你想要的任何格式。举个简单的例子。 嗯,感觉非常非常非常脏。我希望有一个更清洁的解决方案,奇怪的是那里(显然)没有。 broredMessage 具有 ContentType 属性。消息的类型可以是 CreateOrder、UpdateOrder、DeleteOrder、CreateCustomer、UpdateCustomer 等...【参考方案2】:

另一种策略是使用单个或更少的队列,这样一个队列可以支持多于一种类型的消息。如果您的系统架构能够支持,这种方法更易于管理且成本更低。

在现实世界中,我已经成功地使用了多个队列(出于可扩展性目的),每个队列在由计时器事件触发的单独线程上读取。根据队列的负载和应用程序的需要,定时器事件被更改为以动态变化的时间间隔为队列提供服务。

【讨论】:

您将哪种数据放入队列(即,如果您将所有不同类型混合在一起,您的工作人员如何知道它将从队列中取出哪种类型)? 如果您使用服务总线队列,您正在使用 BrokeredMessage。这个 BrokeredMessage 对象有一个名为 Properties 的属性,您可以在其中添加您在接收消息时可以使用的自定义信息。 基本上使用自描述消息...我使用了一个封装了实际消息对象的信封类,类似于信封类。信封包含实际的消息对象及其类型名称,并被序列化为 XML 并放入队列中。在读取时,解析 XML 以获取类型名称,然后反序列化为实际消息并分派给消息处理程序。相当复杂,但可以使用更简单的方法。 Sandrino 关于将 Brokered Message 与 Service Bus Queues 结合使用的评论也是一个非常可行的策略。【参考方案3】:

如果存储队列的回退机制不足以满足您的需求,我建议您考虑使用服务总线队列。使用服务总线队列,您不必进行如此激进的轮询。

您仍然需要实现一个循环来轮询队列,但接收超时使其比使用存储队列时的持续轮询机制更轻松。

在以下示例中,我尝试从队列中接收消息。如果没有找到消息,它将保持连接打开 30 秒,以查看是否有新消息进入。如果 30 秒后没有消息到达,则 Receive 方法将返回 null(我将有一个循环尝试再次调用 Receive)。请注意,最长超时时间为 24

MessagingFactory factory = MessagingFactory.Create(ServiceBusEnvironment.CreateServiceUri("sb", ServiceNamespace, string.Empty), credentials); 
QueueClient myQueueClient = factory.CreateQueueClient("TestQueue");
myQueueClient.Receive(new TimeSpan(hours: 0, minutes: 0, seconds: 30));

为要读取的每个队列弹出线程是个好主意,但是看到 CLR 线程池的容量限制,您还应该考虑异步接收消息(例如使用 TaskFactory.FromAsync ):http://msdn.microsoft.com/en-us/library/windowsazure/hh851744.aspx

【讨论】:

我的问题并不是关于回退机制,而是关于“从多个队列中读取”部分。您是否有一个如何使用 FromAsync 实现此功能的示例?那么当我们有 200 个队列而不是 20 个时呢?你如何在不创建太多线程的情况下解决这个问题? 我现在看到你的例子还不够好,我认为这只是关于 TPL,但它是关于 Azure。现在阅读:)

以上是关于如何从现实世界中的多个队列中读取?的主要内容,如果未能解决你的问题,请参考以下文章

java面向对象思想如何理解?

现实世界中的 WCF 故障合约

如何让强化学习走进现实世界?

我意识中的未来世界

现实世界中哪些地方用到了Java?

java基础---抽象和封装