由于消息发送到多个并发消费者,骆驼拆分和聚合失败

Posted

技术标签:

【中文标题】由于消息发送到多个并发消费者,骆驼拆分和聚合失败【英文标题】:Camel Split and Aggregate failing because messages going to multiple concurrent consumers 【发布时间】:2014-10-15 15:13:54 【问题描述】:

我有一个简单的骆驼路线,它采用项目列表,将它们拆分,将每个元素发送到 mq 节点进行处理,然后通过聚合器将它们重新连接在一起。

非常接近组合消息处理器:http://camel.apache.org/composed-message-processor.html

但是我们注意到拆分后camel会创建多个并发消费者?还是交流?由于消息被发送给多个消费者,他们永远不会完成。

列表:1,2,3,4

拆分:amq::process_each_item

聚合:

[Camel (camel-3) thread #41 - Aggregating 1 - Waiting on 3 more items
[Camel (camel-1) thread #16 - Aggregating 2 - Waiting on 3 more items
[Camel (camel-3) thread #49 - Aggregating 3 - Waiting on 2 more items
[Camel (camel-1) thread #15 - Aggregating 4 - Waiting on 2 more items

因此,camel 产生了 2 个聚合器,每个聚合器都在等待 4 个项目,但它们每个只能得到两个。

骆驼路线:

<camelContext xmlns="http://camel.apache.org/schema/spring">

        <route> <!-- This route splits the reg request into it's items. Adding needed info to the message header.  -->
            <from uri="activemq:registration.splitByItemQueue" />  <!-- pick up the reg req -->
            <setHeader headerName="regReqId"> <!-- Need to store the Reg Req in the header  -->
                <simple>$body.registrationRequest.id</simple>
            </setHeader>

            <split parallelProcessing="false" strategyRef="groupedExchangeAggregator"> <!-- Split the RegRequestInfo into it's individual requestItems (add, drop, etc) -->
                <method ref="requestSplitter"  method="split" />   <!-- does the actual splitting -->
                <setHeader headerName="JMSXGroupID"> <!-- This is CRITICAL. It is how we ensure valid seat check counts without db locking -->
                    <simple>FOID=$body.formatOfferingId</simple>  <!-- grouping on the foid -->
                </setHeader>
                <to uri="activemq:registration.lprActionQueue"/> <!-- send to queue's for processing-->
            </split>
        </route>

        <route>    <!-- performs the registration + seat check -->
            <from uri="activemq:registration.lprActionQueue" />

            <bean ref="actionProcessor" method="process"/> <!-- go to the java code that makes all the decisions -->
            <to uri="activemq:registration.regReqItemJoinQueue"/> <!-- send to join queue's for final processing-->
        </route>

        <route>    <!-- This route joins items from the reg req item split. Once all items have completed, update state-->
            <from uri="activemq:registration.regReqItemJoinQueue" />  <!-- Every Reg Req Item will come here-->
            <aggregate strategyRef="groupedExchangeAggregator" ignoreInvalidCorrelationKeys="false" completionFromBatchConsumer="true"> <!-- take all the Reg Req Items an join them to their req -->
                <correlationExpression>
                    <header>regReqId</header> <!-- correlate on the regReqId we stored in the header -->
                </correlationExpression>

                <bean ref="actionProcessor" method="updateRegistrationRequestStatus"/> <!-- update status -->                   
            </aggregate>
        </route>
</camelContext>

<bean id="groupedExchangeAggregator" class="org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy" />

在我的本地机器上,上述工作正常,但是当我们部署到我们的测试服务器时,一半的消息发送到一个骆驼聚合器,一半发送到另一个。导致没有人完成。请注意,在下面的配置中,我们为骆驼设置了并发消费者为 1。

这里是骆驼/activemq 配置

<amq:broker useJmx="false" persistent="false">
        <amq:plugins>
            <amq:statisticsBrokerPlugin />
        </amq:plugins>
        <amq:transportConnectors>
            <amq:transportConnector uri="tcp://localhost:0" />
        </amq:transportConnectors>
    </amq:broker>

    <!-- Basic AMQ connection factory -->
    <amq:connectionFactory id="amqConnectionFactory" brokerURL="vm://localhost" />

    <!-- Wraps the AMQ connection factory in Spring's caching (ie: pooled) factory
         From the AMQ "Spring Support"-page: "You can use the PooledConnectionFactory for efficient pooling... or you
         can use the Spring JMS CachingConnectionFactory to achieve the same effect."
         See "Consuming JMS from inside Spring" at http://activemq.apache.org/spring-support.html
         Also see http://codedependents.com/2010/07/14/connectionfactories-and-caching-with-spring-and-activemq/

         Note: there are pros/cons to using Spring's caching factory vs Apache's PooledConnectionFactory; but, until
         we have more explicit reasons to favor one over the other, Spring's is less tightly-coupled to a specific
         AMQP-implementation.
         See http://***.com/a/19594974
    -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <constructor-arg ref="amqConnectionFactory"/>
        <property name="sessionCacheSize" value="1"/>
    </bean>

    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="connectionFactory" />
    </bean>

    <bean id="jmsConfig"
          class="org.apache.camel.component.jms.JmsConfiguration">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="concurrentConsumers" value="1"/>
    </bean>

    <bean id="activemq"
          class="org.apache.activemq.camel.component.ActiveMQComponent">
        <property name="configuration" ref="jmsConfig"/>
    </bean>

【问题讨论】:

【参考方案1】:

原来我们有另一个 spring 上下文/servlet 导入我们的配置。我们认为这是问题所在。

【讨论】:

以上是关于由于消息发送到多个并发消费者,骆驼拆分和聚合失败的主要内容,如果未能解决你的问题,请参考以下文章

rabbitmq--路由模式

python使用rabbitMQ介绍三(发布订阅模式)

骆驼拦截/元帅jms回复

RocketMQ源码(23)—DefaultMQPushConsumer消费者重试消息和死信消息源码

(二)RocketMQ订阅与发布

RabbitMQ:生产者消息确认消息持久化消费者消息确认消费失败重试机制