寻找正确的 mule 组件以按 fifo 顺序解复用消息
Posted
技术标签:
【中文标题】寻找正确的 mule 组件以按 fifo 顺序解复用消息【英文标题】:Looking for correct mule component to demux messages in fifo order 【发布时间】:2015-02-01 15:54:36 【问题描述】:我一直在寻找用于解决以下问题的正确消息模式:
我有一个队列,其中包含域中每个用户 ID 的消息,每条消息都是一个 userChanged 事件。业务需求是必须按照 FIFO 顺序处理特定用户 ID 的所有消息,如果在处理特定用户 ID 的消息期间发生错误,则不应再进行处理,直到该错误被标记为已成功处理。必须处理所有消息,并且此解决方案需要部署在集群 ESB 环境中。
我想将这些事件解复用到每个用户 ID 的 FIFO 队列中,这样我就可以并行处理来自不同用户 ID 的消息,并按顺序处理用户的每个消息。
到目前为止,我已经提出了两种可能的解决方案,涉及 Mule 和 Rabbit,但两者都涉及自定义组件或我不知道的现有组件。
让 Mule 流读取第一个 RabbitMQ 队列上的所有消息,并使用 AMQP 入站端点从标头中获取用户 ID。然后它使用一个 AMQP 出站端点,如果它不存在,则动态创建一个持久队列,命名为:userChangedEvent-#[flowVars.userid]
,并将消息发布到该队列。自定义组件需要执行以下操作:
一种。创建一个共享对象映射,该映射将识别所有动态 amqp 侦听器流(如果它不存在)
湾。检查地图以查看是否存在用户标识的侦听器流实例。如果没有,则添加一个动态 amqp 侦听器流的实例,该实例将侦听userChangedEvent-#[flowVars.userid]
队列,使用 userid 作为键将其添加到映射中。
C。启动流程。
d。动态流需要配置为单线程配置处理,业务逻辑成功完成后手动确认消息,并在出现错误时停止流。
让一个 amqp 入站端点从队列中读取所有消息,然后“整理”路由器会将具有相同用户 ID 的消息路由到该用户 ID 的可重用业务逻辑流的同一实例,如果一个给定的用户 ID 不存在。这种情况下的悬而未决的问题是: 一种。这样的路由器是否存在或是否需要开发? 湾。业务逻辑流需要在单个线程中执行,因此对于流的每个实例,都需要以持久的方式维护积压的消息。 C。其中一个实例的执行错误应该会阻止该实例处理更多消息,直到它得到解决。
我已经想到了一些变通方法,例如将用户标识“分桶”到各种预定义队列 @987654324@ 以便我们可以预定义我们需要的所有流和关联的 amqp 侦听器并消除对动态流的需求,但这让我觉得不优雅的解决方案。
我觉得必须有一种消息传递模式来解决这个问题,但我撕掉了我的 EIP 副本,但无济于事!任何建议将不胜感激。
更新:这在概念上是我所追求的,一个带通道的解复用器(我称之为桶),但我认为动态通道创建(每个用户 ID 1 个)会更好:http://www.coralblocks.com/index.php/2014/06/demultiplexing-with-coralqueue-for-parallel-processing/
【问题讨论】:
【参考方案1】:您不可能同时拥有多个并发消费者并遵守先进先出顺序。但是,您可以使用称为独占消费者的功能拥有多个非并发消费者以提高可用性。您可以使用connector 的属性exclusiveConsumers
来激活它。
关于错误停止处理,我可以提出两种不同的方法:
如果此业务需求可能发生变化,您可以利用复杂的事件处理引擎作为事件信号量。 否则,您可以使用here 中描述的断路器模式,这可能是最简单的方法。【讨论】:
感谢 Victor 的建议,断路器看起来很有希望,CEP 方法听起来很有趣 - 你有任何文档可以指出吗? 我最初的问题可能不清楚。我不希望在同一个队列上获得多个并发消费者并尊重 fifo 排序。我想从第一个队列中取出消息流,并通过用户 ID 将它们分隔成流,这些流将按 fifo 顺序处理,并在出现异常时使用断路器模式停止对给定用户 ID 的处理。我建议使用队列(每个用户 ID 一个队列)或使用有状态路由器进行解复用。 Rabbitmq 将遵守插入顺序,因此默认配置应该没问题。可以在此处工作的文档中找到有关断路器的文档。 esper 上的示例是 blogs.mulesoft.org/mule-esper-cep-and-non-events 和 manning.com/dossot2/MuleinAction2E_CH14.pdf 感谢维克多的链接。请查看更新,我在其中发布了一条显示带通道的 Demuxer 的行,这在功能上确实是我所追求的:这在概念上是我所追求的,带通道的 demuxer(我称之为桶)但我认为动态车道创建(每个用户 ID 1 个)会更好:coralblocks.com/index.php/2014/06/… 我个人会使用 ActiveMQ 的消息组 (activemq.apache.org/message-groups.html) 之类的东西,但是现在可以在 rabbitmq (***.com/questions/20530591/…) 上使用。有了你所拥有的,我可能会在一个队列中接收消息,将元素放在另一个队列中,只有一个带有你想要的存储桶名称的标题。然后我会在不同的独占入站端点上利用消息选择器。以上是关于寻找正确的 mule 组件以按 fifo 顺序解复用消息的主要内容,如果未能解决你的问题,请参考以下文章