使用Spring集成组件关联2个JMS队列之间的消息
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用Spring集成组件关联2个JMS队列之间的消息相关的知识,希望对你有一定的参考价值。
我有2个JMS队列,我的应用程序使用Jms.messageDrivenChannelAdapter(...)
组件订阅它们。
第一个队列接收Paid
类型的消息。第二个队列接收Reversal
类型的消息。
业务场景定义了Paid
类型和Reversal
类型的消息之间的相关性。
Reversal
应该等待Paid
才能被处理。
如何使用Spring Integration实现这种“等待”模式?
是否可以关联2个JMS队列之间的消息?
见the documentation about the Aggregator。
聚合器使用某种关联策略关联消息,并根据某种释放策略释放该组。
聚合器通过关联和存储它们来组合一组相关消息,直到该组被认为是完整的。此时,聚合器通过处理整个组来创建单个消息,并将聚合消息作为输出发送。
默认情况下,输出有效负载是分组消息有效负载的列表,但您可以提供自定义输出处理器。
编辑
@SpringBootApplication
public class So55299268Application {
public static void main(String[] args) {
SpringApplication.run(So55299268Application.class, args);
}
@Bean
public IntegrationFlow in1(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory)
.destination("queue1"))
.channel("aggregator.input")
.get();
}
@Bean
public IntegrationFlow in2(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory)
.destination("queue2"))
.channel("aggregator.input")
.get();
}
@Bean
public IntegrationFlow aggregator() {
return f -> f
.aggregate(a -> a
.correlationExpression("headers.jms_correlationId")
.releaseExpression("size() == 2")
.expireGroupsUponCompletion(true)
.expireGroupsUponTimeout(true)
.groupTimeout(5_000L)
.discardChannel("discards.input"))
.handle(System.out::println);
}
@Bean
public IntegrationFlow discards() {
return f -> f.handle((p, h) -> {
System.out.println("Aggregation timed out for " + p);
return null;
});
}
@Bean
public ApplicationRunner runner(JmsTemplate template) {
return args -> {
send(template, "one", "two");
send(template, "three", null);
};
}
private void send(JmsTemplate template, String one, String two) {
template.convertAndSend("queue1", one, m -> {
m.setJMSCorrelationID(one);
return m;
});
if (two != null) {
template.convertAndSend("queue2", two, m -> {
m.setJMSCorrelationID(one);
return m;
});
}
}
}
和
GenericMessage [payload = [two,one],headers = {jms_redelivered = false,jms_destination = queue:// queue1,jms_correlationId = one,id = 784535fe-8861-1b22-2cfa-cc2e67763674,priority = 4,jms_timestamp = 1553290921442,jms_messageId = ID:Gollum2.local-55540-1553290921241-4:1:3:1:1,时间戳= 1553290921457}]
2019-03-22 17:42:06.460 INFO 55396 --- [ask-scheduler-1] o.s.i.a.AggregatingMessageHandler:使用correlationKey [3]使MessageGroup过期
聚合超时三次
以上是关于使用Spring集成组件关联2个JMS队列之间的消息的主要内容,如果未能解决你的问题,请参考以下文章