在其他几条消息完成后发送消息而不使用外部存储?
Posted
技术标签:
【中文标题】在其他几条消息完成后发送消息而不使用外部存储?【英文标题】:Sending a message after several other messages have completed without utilizing an external store? 【发布时间】:2021-11-25 12:25:39 【问题描述】:我有一个应用程序,它应该使用 JMS 将几个长时间运行的任务异步排队以响应特定请求。其中一些任务可能会在几秒钟内完成,而其他任务可能需要更长的时间才能完成。在所有任务启动后,原始请求应该已经完成(即启动任务的消息已排队) - 即我不想在任务执行时阻止请求。
但是,现在,一旦所有消息都已成功处理,我想对每个请求执行另一个操作。为此,我想将另一条消息发送到另一个队列 - 但只有在所有消息都已处理之后。
所以我所做的有点类似于回复-响应模式,但不完全是:多条消息的响应(在同一个事务中排队)一旦全部完成,就应该在一个事务中聚合和处理可用的。另外,我不想通过等待回复来“阻止”将消息排队的事务。
我的第一个天真的方法如下:
当请求进来时:
-
为要执行的 n 个操作中的每一个排队 n 条消息。为他们提供所有相同的关联 ID。
将 n(即发送的消息数)与消息的相关 ID 一起存储在数据库中。
成功完成请求
每个工人都会做以下事情:
-
从队列中接收消息
做需要做的工作来处理消息
根据关联 ID 减少存储在数据库中的计数器。
如果计数器达到零:向已完成队列发送“COMPLETED”消息
但是,我想知道是否有不需要数据库(或任何其他类型的外部存储)来跟踪所有消息是否已被处理的替代解决方案。
JMS 是否提供了一些可以帮助我解决此问题的功能? 或者在这种情况下我真的必须使用数据库吗?
【问题讨论】:
【参考方案1】:如果您的系统是分布式的,而且我认为是分布式的,那么如果没有像您实现的那样的某种全局锁存器锁,就很难解决这个问题。需要注意的主要事情是“任务”必须在“全局存储”中发出信号,表明它们已经结束。您的应用程序本质上是在每次通过在 db 中插入一行新请求时创建一个新的倒计时闩锁实例(由 CorrelationID 标识)。你的任务是通过计数锁定来“发出”工作结束的信号。结束持有锁的作业必须清理行。
现在全局存储不一定是数据库,但它仍然必须是某种全局访问状态。你必须继续计数。如果你只有一个 JMS,你必须通过发送消息来创建闩锁和倒计时。
想到的最简单的解决方案是让每个作业向 JOBS_FINISHED 队列发送 TASK_ENDED 消息。 TASK_ENDED 消息代表:“由具有 CorrelationID Z 的请求 Y 触发的任务 X 已结束”信号。就像在 db 中倒计时一样。此 q 的接收者是一项特殊任务,其唯一工作是在收到具有给定关联 ID 的请求的所有消息时触发 COMPLETED 消息。所以这个工作只是按顺序读取消息。并计算它遇到的每个唯一相关 id。一旦计数到预期的数字,它应该清除该计数器并发送 COMPLETED 消息。
您可以在处理请求时创建的消息的 JMS 标头中对触发任务的数量和任何其他细节进行编码。例如:
// pretend this request handling triggers 10 tasks
// here we are creating first of ten START TASK messages
TextMessage msg1 = session.createTextMessage("Start a first task");
msg1.setJMSCorrelationID(request.id);
msg1.setIntProperty("TASK_NUM", 1);
msg1.setIntProperty("TOTAL_TASK_COUNT", 10);
您只需将该信息传递给 TASK_ENDED 消息,一直到最终工作。您必须确保发送到结束作业的所有消息都被接收到同一作业实例。
你可以从这里开始,通过发布订阅消息、错误处理和临时队列之类的东西来扩展想法,但这对你的需求来说变得非常具体,所以我会在这里结束。
【讨论】:
以上是关于在其他几条消息完成后发送消息而不使用外部存储?的主要内容,如果未能解决你的问题,请参考以下文章
Strophe & Ejabberd:发送消息后重新认证的问题