Spring Integration:如何增加对传入消息的处理

Posted

技术标签:

【中文标题】Spring Integration:如何增加对传入消息的处理【英文标题】:Spring Integration: How to increase processing of incoming messages 【发布时间】:2018-02-01 16:47:06 【问题描述】:

我正在开发一个 Spring 应用程序,它将每分钟接收大约 500 条 xml 消息。下面的 xml 配置只允许每分钟处理大约 60 条消息,其余消息存储在队列中(持久化在 DB 中)并以每分钟 60 条消息的速度检索。

尝试从多个来源阅读文档,但仍不清楚 Poller 与任务执行器相结合的作用。我对目前每分钟处理 60 条消息的理解是因为轮询器配置中的“固定延迟”值设置为 10(因此它将在 1 分钟内轮询 6 次)和“max-messages-per-poll”设置为 10,因此每分钟处理 6x10=60 条消息。

如果我的理解不正确,请指教,并帮助修改xml配置以实现对传入消息的处理速度更快。

task executor 的作用也不清楚——是不是说 pool-size="50" 会允许 50 个线程并行运行来处理 poller 轮询的消息?

我想要的全部是:

    JdbcChannelMessageStore 用于将传入的 xml 消息存储在数据库 (INT_CHANNEL_MESSAGE) 表中。这是必需的,因此在服务器重启的情况下,消息仍然存储在表中并且不会丢失。 要并行执行的传入消息,但数量受控/有限。根据系统处理这些消息的能力,我想限制系统应该并行处理多少条消息。 由于此配置将在集群中的多台服务器上使用,因此任何服务器都可以接收任何消息,因此不会导致两台服务器处理同一消息时发生任何冲突。希望这由 Spring Integration 处理。

抱歉,如果这已在其他地方得到回答,但在阅读了许多帖子后,我仍然不明白这是如何工作的。

提前致谢。

<!-- Message Store configuration start -->              

    <!-- JDBC message store configuration -->
    <bean id="store" class="org.springframework.integration.jdbc.store.JdbcChannelMessageStore">
        <property name="dataSource" ref="dataSource"/>
        <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
        <property name="region" value="TX_TIMEOUT"/>
        <property name="usingIdCache" value="true"/>
    </bean>

    <bean id="queryProvider" class="org.springframework.integration.jdbc.store.channel.mysqlChannelMessageStoreQueryProvider" />        

<int:transaction-synchronization-factory
    id="syncFactory">
    <int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" />
    <int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())" />
</int:transaction-synchronization-factory>

<task:executor id="pool" pool-size="50" queue-capacity="100" rejection-policy="CALLER_RUNS" />  

<int:poller id="messageStorePoller" fixed-delay="10"
    receive-timeout="500" max-messages-per-poll="10" task-executor="pool"
    default="true" time-unit="SECONDS">
    <int:transactional propagation="REQUIRED"
        synchronization-factory="syncFactory" isolation="READ_COMMITTED"
        transaction-manager="transactionManager" /> 
</int:poller>

<bean id="transactionManager"
    class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
    <!--  1)        Store the message in  persistent message store -->
    <int:channel id="incomingXmlProcessingChannel">
         <int:queue message-store= "store" />
    </int:channel> 

    <!-- 2) Check in, Enrich the headers, Check out -->
    <!-- (This is the entry point for WebService requests) -->
    <int:chain input-channel="incomingXmlProcessingChannel" output-channel="incomingXmlSplitterChannel">
        <int:claim-check-in message-store="simpleMessageStore" />
        <int:header-enricher >
            <int:header name="CLAIM_CHECK_ID" expression="payload"/>
            <int:header name="MESSAGE_ID" expression="headers.id" />
            <int:header name="IMPORT_ID" value="XML_IMPORT"/>
        </int:header-enricher>
        <int:claim-check-out message-store="simpleMessageStore" />          
    </int:chain>

在 Artem 回复后添加:

感谢阿尔乔姆。因此,在 10 秒的固定延迟后发生的每次轮询(根据上面的配置),任务执行器将检查任务队列,如果可能(并且需要)启动一个新任务?根据“maxMessagesPerPoll”配置,每个 pollingTask(线程)将从消息存储(队列)接收“10”条消息。

为了获得更高的传入消息处理时间,我应该减少poller上的fixedDelay,以便任务执行器可以启动更多线程吗?如果我将 fixedDelay 设置为 2 秒,将启动一个新线程来执行 10 条消息,并且一分钟内将启动大约 30 个这样的线程,在一分钟内处理“大约”300 条传入消息。

很抱歉在一个问题中问的太多 - 只是想解释完整的问题。

【问题讨论】:

【参考方案1】:

这个类背后的主要逻辑:

    private final class Poller implements Runnable 

    private final Callable<Boolean> pollingTask;

    Poller(Callable<Boolean> pollingTask) 
        this.pollingTask = pollingTask;
    

    @Override
    public void run() 
        AbstractPollingEndpoint.this.taskExecutor.execute(() -> 
            int count = 0;
            while (AbstractPollingEndpoint.this.initialized
                    && (AbstractPollingEndpoint.this.maxMessagesPerPoll <= 0
                    || count < AbstractPollingEndpoint.this.maxMessagesPerPoll)) 
                try 
                    if (!Poller.this.pollingTask.call()) 
                        break;
                    
                    count++;
                
                catch (Exception e) 
                    if (e instanceof MessagingException) 
                        throw (MessagingException) e;
                    
                    else 
                        Message<?> failedMessage = null;
                        if (AbstractPollingEndpoint.this.transactionSynchronizationFactory != null) 
                            Object resource = TransactionSynchronizationManager.getResource(getResourceToBind());
                            if (resource instanceof IntegrationResourceHolder) 
                                failedMessage = ((IntegrationResourceHolder) resource).getMessage();
                            
                        
                        throw new MessagingException(failedMessage, e);
                    
                
                finally 
                    if (AbstractPollingEndpoint.this.transactionSynchronizationFactory != null) 
                        Object resource = getResourceToBind();
                        if (TransactionSynchronizationManager.hasResource(resource)) 
                            TransactionSynchronizationManager.unbindResource(resource);
                        
                    
                
            
        );
    


如您所见,taskExecutor 负责将pollingTask 旋转到一个线程中的maxMessagesPerPoll。如果当前轮询任务对于新的调度来说太长,那么池中的其他线程将被涉及。但是一次轮询中的所有消息都在同一个线程中处理,而不是并行处理。

这就是它的工作原理。由于您在一个 SO 问题中问的太多,我希望这些信息足以帮助您了解下一步。

【讨论】:

谢谢@Artem,在您回复后,我在上面的问题中添加了一个问题。在查看 AbstractPollingEndpoint 类后,它确实增加了我的理解,但不确定我是否正确理解了模式。 是的,你的理解是正确的。您也可以考虑将CallersRunPolicy 用于您的任务执行器,这样当池中没有线程时,调度器的线程将执行轮询周期。但与此同时,在该线程空闲之前,不会再启动新的轮询周期。

以上是关于Spring Integration:如何增加对传入消息的处理的主要内容,如果未能解决你的问题,请参考以下文章

Shiro系列之Shiro+Spring MVC整合(Integration)

Shiro系列之Shiro+Spring MVC整合(Integration)

Shiro系列之Shiro+Spring MVC整合(Integration)

Shiro系列之Shiro+Spring MVC整合(Integration)

Spring Integration:如何一次处理多条消息?

如何在 Spring Integration 中动态注册 Feed Inbound Adapter?