弹簧集成dsl缓冲区

Posted

技术标签:

【中文标题】弹簧集成dsl缓冲区【英文标题】:spring integration dsl buffer 【发布时间】:2019-04-18 17:00:58 【问题描述】:

我有一个要求,我需要保留/缓冲在通道上接收到的消息,并根据消息数量保留在数据库中,或者超时意味着 1 分钟内没有收到消息。 有没有办法在spring集成中实现这一点

IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(connectionFactory)
                    .destination(sourceQueue))
                .transform(someTransform, "transform")
                .handle(someService, "save")
                .get();

【问题讨论】:

【参考方案1】:

有一个基于Aggregator EI 模式实现的.aggregate() 运算符。

您可以使用JdbcMessageStore 进行配置以缓冲消息并将它们存储到数据库中。

您可以通过ReleaseStrategy(基于每条消息到达)将它们保留在那里直到某些条件,或者由于group timeout而释放它们。

如果您不希望之后将它们全部作为一条聚合消息,您可以考虑使用 SimpleMessageGroupProcessor,它只会生成 Collection<Message<?>> 并遍历它们以逐个发送到输出。

在参考手册中查看有关聚合器的更多信息:https://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#aggregator

【讨论】:

我们想积累没有相关 ID 的消息,我们看到以下错误 Caused by: java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing? 与此更改 .aggregate(aggregatorSpec -> aggregatorSpec.releaseStrategy(group -> group.size() > 2)) 你可以做一个静态correlationKey:.correlationStrategy(m -> 1)。您还需要确保expireGroupsUponCompletion(true) 感谢.aggregate(aggregatorSpec -> aggregatorSpec.correlationStrategy(m -> 1).expireGroupsUponCompletion(true).releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(2, 100)))工作 如果没有.expireGroupsUponTimeout(true).groupTimeout(2000),上述方法就无法工作,即使我们使用TimeoutCountSequenceSizeReleaseStrategy,我们是否真的需要再次调用groupTimeout TimeoutCountSequenceSizeReleaseStrategy 仅在消息到达聚合器时执行其逻辑。 groupTimeout()的点是在特定时间没有收到消息时有一些后台任务。

以上是关于弹簧集成dsl缓冲区的主要内容,如果未能解决你的问题,请参考以下文章

buffer的作用

使用 Reactor 2.0 在 Spring 4 上执行多线程

字节缓冲区的大小和形状不匹配。我在将我的 Ml 模型集成到 android 应用程序时遇到了这个错误

Kafka集成优化

使用缓冲区和溢出缓冲区将 IMU 数据写入 csv 文件?

NVIDIA 卡上不同的 OpenGL 帧缓冲区 RGBA 值