JMS 是不是可以“公平排队”

Posted

技术标签:

【中文标题】JMS 是不是可以“公平排队”【英文标题】:Is "fair queuing" possible with JMSJMS 是否可以“公平排队” 【发布时间】:2014-12-10 06:37:37 【问题描述】:

我需要实现一个公平排队系统,以便根据某个消息标头的值,针对当前排队的消息上该标头的所有值,以循环方式处理消息。

系统中的消息自然地按某些属性进行分组,其中有数千个可能的值,并且当前排队的消息的值集随时间而变化。类比是在创建消息时具有一个标题的消息,该标题是时间的毫秒部分。因此,标头的值将介于 0 和 999 之间,并且该值将在当前排队的所有消息中进行某种分布。

我需要能够按顺序使用消息,这样没有特定值优先于任何其他值。如果排队消息的头值是这样分布的

value | count
------|-------
  A   |   3
  B   |   3
  C   |   2

那么消费订单就是A,B,C,A,B,C,A,B

如果将具有其他值的消息添加到队列中,它们应自动添加到循环序列中。

这意味着对当前排队的消息有一些了解,但不需要消费者掌握这些知识;代理可能具有以某种方式订购交付的机制。

有一些阈值是可以接受的,超过该阈值公平排队开始。也就是说,如果阈值为 10,则可以顺序处理 10 条相同值的消息,但处理的第 11 条消息应该是顺序的 next 值。 下一个可能是相同的值,如果唯一排队的消息具有该值。

可能值的数量可能排除了简单地为每个值创建一个队列并迭代队列,尽管这尚未经过测试。

我们正在使用 HornetQ,但如果有提供这些语义的替代方案,那么我很想知道。

消息是作业,标头值是用户 ID。正在寻求的是,在一定限度内,任何给定用户的工作都不会过度延迟任何其他用户的工作;产生 100 万个作业的用户不会导致其他用户的后续作业等待该百万个作业的处理。

HornetQ 中队列上的消费者按创建顺序进行评估,因此向队列中添加选择性消费者不会阻止任何包罗万象的消费者接收与过滤器匹配的消息。

JMS 组似乎没有帮助,因为它将给定组(用户?)与给定消费者联系在一起。

一个潜在的解决方案是根据需求在某个主题上创建选择性消费者(例如:来自同一用户的 10 条连续消息),并管理所有选择性消费者的生命周期,以确保包罗万象不会处理相同的消息.虽然这似乎确实有一些繁重的同步要求。

【问题讨论】:

我从您的示例中推测 FIFO 不是默认选项?这仅仅是因为可能有一个用户产生了异常的请求高峰吗?该用户是可预测的吗?他们应该有自己的队列/主题进行单独处理吗?消息显然没有隐式顺序,因为您可以无序处理它们(在您的示例中按用户),那么多线程阅读服务是否不起作用? 【参考方案1】:

要考虑的第一个选项是使用多线程消费应用程序。假设每个会话/消费者有一个线程,则可以使用选择器设置同步或异步接收。每个选择器将被键入到特定用户。

假设 JVM 在线程调度方面是合理公平的(我很乐意假设)并且应用程序代码中没有任何死锁,我会断言要求将得到满足。一个线程可能会被用户数百万个作业卡住,其余的不会受到影响。

但是,如果需要单线程应用程序,那么 JMS 规范本身就没有任何帮助。当然,它们可能是供应商扩展,这可能会有所帮助。然而,另一个选项是应用程序查看每条消息并将其放入用户 ID 的特定队列。然后,最终的消费应用程序本身将在这些队列之间“循环”以获得工作。需要另一个应用程序,但您有一个非常确定的系统。

【讨论】:

多线程是我所期望的,但我不能简单地为每个用户分配一个线程,因为有成千上万的用户。虽然 JVM 很可能能够处理数千个线程,但将 GB 的 RAM 纯粹用于线程堆栈是相当浪费的。每个用户的队列当然是为每个用户安排有序交付的一种方式,但可能涉及对空队列的大量迭代。 因此,一个选项可能是使用精心构建的主题树。也许能够将用户分组在一起,这样您就可以冒一些用户中的一个引起问题的风险,但总体上更加公平。如果您使用消息侦听器,那么根据队列的实现,有很多空白队列可能不是问题。我同意不断轮询空队列会很糟糕,但如果有一个回调机制服务器到客户端就可以了。【参考方案2】:

我建议使用消息优先级 + 设置 consumerWindowSize=0 并始终让客户端从服务器选择一条新消息。

消息的延迟会更长,但消息总是来自服务器。

请注意,您需要考虑一些种族。如果您正在消费消息 C 并且消息 B 到达怎么办?然后,您将消耗 C 以供以后消耗 B。但是您可以在那里做的不多,因为 C 已经在消耗中了。

您也可以考虑消息分组,但您会将消息组绑定到来自负载平衡的单个使用者。

【讨论】:

JMS 优先级是在提交消息时设置的,这意味着要实现公平排队,因为我们希望提交者需要了解队列的当前状态,这是不切实际的。我现在正在使用基于消息组的系统进行测试,但正如您所说,这些组与特定消费者相关联存在一些问题。【参考方案3】:

您希望 JMS 代理实现消息传递算法(公平排队),据我所知,该算法不是 JMS 规范的一部分。有可能找到一种方法让经纪人这样做,但我对此表示怀疑,而且您提出的任何解决方案都可能是特定于经纪人的。

相反,为什么不将所需的排队算法放在您自己的应用程序中呢?例如:编写一个“公平队列转发器 (FQF)”应用程序,订阅所有消息,无论它们来自代理的顺序如何。让此 FQF 应用程序尽可能快地使用消息,以便 JMS 代理队列始终为空或接近空。然后,FQF 应用程序可以将消息存储在本地队列中,并以您所需的排队算法确定的任何顺序一次将它们重新发布到最终消息处理应用程序订阅的队列或主题。为此,您可能希望使用事务或某种流控制,以便 FQF 应用程序仅以终端系统可以处理的速率发布消息。

根据您的示例,消息表示要根据消息头中的用户 id 属性按特定顺序处理的作业。所以我建议你编写一个作业调度算法,使用你想要的任何排队算法将作业传递给作业处理器。

这样,您可以完全控制消息处理顺序,而不必以某种方式“诱骗”代理做您想做的事情。您只需将 JMS 用作消息传递机制,因此您不必编写自己的消息传递协议。

【讨论】:

【参考方案4】:

我不确定我是否完全理解这一点,但是您的属性有数千个可能的值并且会随着时间而变化,这是有问题的。这听起来像是一个典型的“拯救哈希值”问题。那么如何创建该属性的哈希值,然后对该值进行取模以得出一个公平但先前已知的值?

假设让 100 个消费者处理 100 个队列(命名为 Q0 到 Q99)是可行的。然后您可以在您的 JMS 生产者中执行此操作:

String queueName = "Q" + user.hashCode()%100;

这是生产者发送到的队列名称。用户值也作为属性添加。同一个用户将进入同一个队列(如果有太多则排队),并且用户几乎公平地分布在消费应用程序中。

现在您仍然面临一个不良用户创造一百万个工作岗位的问题。此解决方案的第二部分可能是启动时为空的 JMS 选择器。一旦消费了第一条消息,您就会计算每个用户的作业数量,并且一旦达到阈值,例如来自同一用户的 10 个作业,您可以通过添加一个选择器来暂时禁止该用户,例如:'user NOT LIKE user123'。如果有不止一次这样的用户,您可以使用“AND”来累积选择。一旦消费者没有收到任何进一步的消息,您将这个消费者的选择设置为空并再次开始处理队列。

【讨论】:

以上是关于JMS 是不是可以“公平排队”的主要内容,如果未能解决你的问题,请参考以下文章

Linux磁盘IO调度算法

JMS

消息中间件JMS

使用 Spring JMS 进行错误处理的最佳实践

使用 JMS 队列的潜在陷阱?

消息中间件ActiveMQ及Spring整合JMS的介绍