当所有其他消息完成处理时,Spring集成处理消息

Posted

技术标签:

【中文标题】当所有其他消息完成处理时,Spring集成处理消息【英文标题】:Spring integration process message when all other messages finished processing 【发布时间】:2021-06-09 11:31:14 【问题描述】:

当所有其他消息(其他类型)完成处理时,Spring 集成图中的组件是否会首先开始处理消息(一天结束)?在这个问题中,我们必须考虑 spring 集成可以启动多个线程。另一个限制是该组件将用于我无法控制的图形中。所以我不能说:

“其他类型”消息处理需要多长时间 如果某些消息运行出错 只是被一些过滤器丢弃 与发布-订阅频道相乘 如果使用了一些TaskExecutor(引入新的线程和事务边界) 没有我可以检查它是否存在的最终工件

当“一天结束”到达我的组件时,“其他类型”消息可能仍在处理中。即使我的组件位于图表的末尾,运行它的消息也可能不会到达那里。消息被重复的其他可能性,我不知道多少次。因此,我不知道我应该等待“结束一天”处理多长时间。

其他工具/框架也有可能使这个问题变得更容易或完全消除它。

【问题讨论】:

你给了我们不可能完成的任务:en.wikipedia.org/wiki/Go_There,_Don%27t_Know_Where。如果您完全没有来自这些流程的任何挂钩来确保它们已完成,那么如何为您提供一些解决方案?你可能有类似“空闲标志”的东西来查看应用程序中没有更多活动,但由于你无法修改该解决方案,你注定要失败。如果我误解了您的问题,请纠正我。 如果有这个不可能完成的任务的解决方案会很高兴 :-) 正在考虑检查任务执行器是否所有线程都是空闲的,但可能有几个任务执行器,其中一些不涉及。我们想出的下一个最佳解决方案是在图表开头和组件之前计算消息。如果这两个匹配(+错误通道),则处理所有消息。 【参考方案1】:

正在考虑检查任务执行器是否所有线程都空闲,但可能有多个任务执行器,其中一些没有涉及。

当您声称您使用的过程是一个黑匣子时,这听起来与您在问题中所说的无关。

如果您真的可以通过某种方式访问​​该流程,例如通过ChannelInterceptor:https://docs.spring.io/spring-integration/docs/current/reference/html/core.html#channel-interceptors,所以你可以有一些像AtomicBoolean active这样的全局bean在流程开始时设置并在结束时重置。因此,您的“在一天结束时”消息将定期轮询此标志以确定何时发送它。您只需简单地使用@InboundChannelAdapter 来生成您的消息或null 当标志为false 时。

【讨论】:

以上是关于当所有其他消息完成处理时,Spring集成处理消息的主要内容,如果未能解决你的问题,请参考以下文章

Spring 消息

第17章-Spring消息

Spring集成消息处理链的用法?

在主题中保留消息,直到所有消费者完成处理

当spring批处理远程分块工作者应用程序启动时不要运行IntegrationFlow

spring-amqp手动停止RabbitListener