Spring Integration:如何一次处理多条消息?

Posted

技术标签:

【中文标题】Spring Integration:如何一次处理多条消息?【英文标题】:Spring Integration: how to process multiple messages at one time? 【发布时间】:2014-10-23 05:31:32 【问题描述】:

我有以下配置:

<bean id="mongoDbMessageStore" class="org.springframework.integration.mongodb.store.MongoDbMessageStore">
    <constructor-arg ref="mongoDbFactoryDefault"/>
</bean>

<!-- the queue capacity is unbounded as it uses a persistent store-->
<int:channel id="logEntryChannel">
    <int:queue message-store="mongoDbMessageStore"/>
</int:channel>

<!-- the poller will process 10 messages every 6 seconds -->
<int:outbound-channel-adapter channel="logEntryChannel" ref="logEntryPostProcessorReceiver" method="handleMessage">
    <int:poller max-messages-per-poll="10" fixed-rate="6000"/>
</int:outbound-channel-adapter>

而消息处理程序定义为

@Override
public void handleMessage(Message<?> message) throws MessagingException 
    Object payload = message.getPayload();
    if (payload instanceof LogEntry) 
        LogEntry logEntry = (LogEntry) payload;
        String app = (String) message.getHeaders().get("app");
        logger.info("LogEntry Received - " + app + " " + logEntry.getEntityType() + " " + logEntry.getAction() + " " + logEntry.getEventTime());
        logEntryPostProcessService.postProcess(app, logEntry);
     else 
        throw new MessageRejectedException(message, "Unknown data type has been received.");
    

我想要的是类似的东西

@Override
public void handleMessage(List<Message<?>> messages) throws MessagingException 
...

所以基本上轮询器在一次调用中发送所有 10 条消息,而不是每条消息调用该方法 10 次。

这样做的原因是有可能批量处理块中的所有消息,从而提高性能。

【问题讨论】:

【参考方案1】:

感谢@Artem Bilan,这里是最终解决方案:

<bean id="mongoDbMessageStore" class="org.springframework.integration.mongodb.store.MongoDbMessageStore">
    <constructor-arg ref="mongoDbFactoryDefault"/>
</bean>

<!-- the queue capacity is unbounded as it uses a persistent store-->
<int:channel id="logEntryChannel">
    <int:queue message-store="mongoDbMessageStore"/>
</int:channel>

<!-- 
    the poller will process 100 messages every minute 
    if the size of the group is 100 (the poll reached the max messages) or 60 seconds time out (poll has less than 100 messages) then the payload with the list of messages is passed to defined output channel
-->
<int:aggregator input-channel="logEntryChannel" output-channel="logEntryAggrChannel"
    send-partial-result-on-expiry="true"
    group-timeout="60000"
    correlation-strategy-expression="T(Thread).currentThread().id"
    release-strategy-expression="size() == 100">
    <int:poller max-messages-per-poll="100" fixed-rate="60000"/>
</int:aggregator>

<int:channel id="logEntryAggrChannel"/>        

<!-- the payload is a list of log entries as result of the aggregator -->
<int:outbound-channel-adapter channel="logEntryAggrChannel" ref="logEntryPostProcessorReceiver" method="handleMessage"/>

根据评论(在上面的代码中),我必须设置 group-timeout/send-partial-result-on-expiry,因为某些组是使用 Thread ID 形成的,但从未处理过,因为它们没有达到条件大小 == 100。

【讨论】:

【参考方案2】:

确实如此,因为 (AbstractPollingEndpoint):

taskExecutor.execute(new Runnable() 
    @Override
    public void run() 
        int count = 0;
        while (initialized && (maxMessagesPerPoll <= 0 || count < maxMessagesPerPoll)) 
...
            if (!pollingTask.call()) 
                break;
            
...
    
);

因此,您的所有消息 (max-messages-per-poll) 都在同一个线程中处理。 但是,它们会被一个一个地发送给处理程序,而不是作为一个整体发送给处理程序。

要并行处理,您应该在 logEntryPostProcessorReceiver 之前使用 ExecutorChannel。像这样的:

<channel id="executorChannel">
   <dispatcher task-executor="threadPoolExecutor"/>
</channel>

<bridge input-channel="logEntryChannel" output-channel="executorChannel">
   <poller max-messages-per-poll="10" fixed-rate="6000"/>
</bridge>

<outbound-channel-adapter channel="executorChannel" ref="logEntryPostProcessorReceiver" method="handleMessage"/>

更新

要将消息作为一批处理,您应该aggregate 他们。由于它们都是polling endpoint 的结果,因此消息中没有sequenceDetails。你可以用correlationId 的一些假值来克服它:

<aggregator correlation-strategy-expression="T(Thread).currentThread().id"
        release-strategy-expression="size() == 10"/>

其中size() == 10 应该等于max-messages-per-poll

之后您的logEntryPostProcessorReceiver 必须应用payloads 的list。或者只是一条消息,其中payload&lt;aggregator&gt; 的结果列表。

【讨论】:

在我的情况下,我希望将所有轮询消息作为一批,因为一次处理它们会更快。但是我学到了一些新东西,一种并行处理消息的方法。

以上是关于Spring Integration:如何一次处理多条消息?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spring Integration DSL 中为通道设置多个消息处理程序?

Spring Integration - 入站通道适配器执行下游通道并行处理

Spring Integration and AMQP:如何优雅地处理反序列化异常?

如何在处理过程中引发异常的 Spring Integration 中路由消息?

Spring-Integration Webflux 异常处理

Spring Integration 没有输出通道或回复通道