弹簧集成dsl缓冲区
Posted
技术标签:
【中文标题】弹簧集成dsl缓冲区【英文标题】:spring integration dsl buffer 【发布时间】:2019-04-18 17:00:58 【问题描述】:我有一个要求,我需要保留/缓冲在通道上接收到的消息,并根据消息数量保留在数据库中,或者超时意味着 1 分钟内没有收到消息。 有没有办法在spring集成中实现这一点
IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(connectionFactory)
.destination(sourceQueue))
.transform(someTransform, "transform")
.handle(someService, "save")
.get();
【问题讨论】:
【参考方案1】:有一个基于Aggregator
EI 模式实现的.aggregate()
运算符。
您可以使用JdbcMessageStore
进行配置以缓冲消息并将它们存储到数据库中。
您可以通过ReleaseStrategy
(基于每条消息到达)将它们保留在那里直到某些条件,或者由于group timeout
而释放它们。
如果您不希望之后将它们全部作为一条聚合消息,您可以考虑使用 SimpleMessageGroupProcessor
,它只会生成 Collection<Message<?>>
并遍历它们以逐个发送到输出。
在参考手册中查看有关聚合器的更多信息:https://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#aggregator
【讨论】:
我们想积累没有相关 ID 的消息,我们看到以下错误Caused by: java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing?
与此更改 .aggregate(aggregatorSpec -> aggregatorSpec.releaseStrategy(group -> group.size() > 2))
你可以做一个静态correlationKey:.correlationStrategy(m -> 1)
。您还需要确保expireGroupsUponCompletion(true)
。
感谢.aggregate(aggregatorSpec -> aggregatorSpec.correlationStrategy(m -> 1).expireGroupsUponCompletion(true).releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(2, 100)))
工作
如果没有.expireGroupsUponTimeout(true).groupTimeout(2000)
,上述方法就无法工作,即使我们使用TimeoutCountSequenceSizeReleaseStrategy
,我们是否真的需要再次调用groupTimeout
TimeoutCountSequenceSizeReleaseStrategy
仅在消息到达聚合器时执行其逻辑。 groupTimeout()
的点是在特定时间没有收到消息时有一些后台任务。以上是关于弹簧集成dsl缓冲区的主要内容,如果未能解决你的问题,请参考以下文章
使用 Reactor 2.0 在 Spring 4 上执行多线程