RabbitMQ 如何将作业拆分为任务并处理结果

Posted

技术标签:

【中文标题】RabbitMQ 如何将作业拆分为任务并处理结果【英文标题】:RabbitMQ how to split jobs to tasks and handle results 【发布时间】:2017-11-11 21:21:56 【问题描述】:

我在基于 Spring 的 Web 应用程序上有以下用例:

我需要应用 Competing Consumers EIP 并进行以下改动:队列中的消息实际上是属于同一作业的拆分任务。因此,我需要正确跟踪作业的所有任务何时完成及其完成状态,以便将场景保存为 COMPLETED 或 FAILED,记录结果并通过例如通知相应地向用户发送电子邮件

所以,鉴于我上面描述的要求,我的问题是:

    这可以用 RabbitMQ 完成吗?如果可以,怎么做?

【问题讨论】:

在这种情况下,我只需为每个作业放置一个唯一 ID,将此 ID 附加到与作业相关的队列中的每条消息,在每个任务上放置一个状态标志为“进行中”,然后打开最后一个“完成”。您只需定义“失败”的含义并相应地实施规则 @asettouf - 如果我遵循“竞争消费者”模式,也就是“工作队列”rabbitmq.com/tutorials/tutorial-two-java.html,作业的任务将并行执行,这正是我所需要的。所以,考虑到这一点,我看不出你的提议是如何运作的 顺便说一句,如果某人的回答解决了您的问题,您可能希望使用大复选框接受它作为答案。它有助于将注意力集中在 *** 上未回答的问题上。 【参考方案1】:

我创建了一个快速的gist 来展示一个非常粗略的例子来说明如何做到这一点。在这个例子中,有一个生产者和两个消费者,2个队列,一个由生产者发送(“SEND”),由消费者消费,反之亦然,消费者发布到“RECV”队列,由生产者消费.

现在请记住,这是一个非常粗略的示例,因为在这种情况下,生产者只发送一个作业(0 到 5 之间的随机任务数量),然后阻塞直到作业完成。避免这种情况的一种方法是在 Map 中存储作业 ID 和任务数,并且每次检查每个作业 ID 报告的已完成任务数。

【讨论】:

感谢您的反馈。我会尽快评估,看起来是一个可以接受的有用答案。 @kmandalas 没问题,如果你想要完整的项目,请告诉我,尽管 pom.xml 非常简单。另外请记住,我并没有尝试改进或使其尽可能高效,请更多地将其作为概念证明 我认为你的想法是一个合乎逻辑的 PoC。现在,我尝试将它与我在此站点上发现的内容联系起来:blog.zenika.com/2012/03/15/pdf-workers-with-rabbitmq,因为我使用的是 Spring 和相关框架。【参考方案2】:

您正在尝试做的事情超出了 RabbitMQ 的范围。 RabbitMQ 用于发送和接收具有排队能力的消息。 它无法为您跟踪您的工作任务。

您需要拥有“工作存储”服务。每当您的消费者完成任务时,它都会更新 Job Storage 服务,将任务标记为已完成。作业存储服务知道作业中有多少任务,并且当最后一个任务完成时,将作业成功完成。在此服务中,您还将实现所有其他业务逻辑,例如何时将作业视为失败。

【讨论】:

当然需要实现服务。然而,经过进一步研究,我发现通过将“工作队列”与“请求-回复”模式结合起来,当然还有一些服务,我可以找到解决方案。例如:blog.zenika.com/2012/03/15/pdf-workers-with-rabbitmq 如果在消费者处理消息后发布者端需要发生一些事情,是的,消费者可以将响应发送回发布者。

以上是关于RabbitMQ 如何将作业拆分为任务并处理结果的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMq之工作队列(轮询发送消息)

四RabbitMQ中模式—工作队列(Work Queues)模式

Future配合线程池进行多线程任务并返回结果

如何将连续变量拆分为等长(定义数量)的区间并仅在 R 中列出带有切点的区间?

将 n 个任务添加到 celery 队列并等待结果

RabbitMQ实现的RPC