使用代理网络中的临时队列的请求/回复模式的 ActiveMQ/Camel 故障转移 - 无法发布到已删除的临时队列

Posted

技术标签:

【中文标题】使用代理网络中的临时队列的请求/回复模式的 ActiveMQ/Camel 故障转移 - 无法发布到已删除的临时队列【英文标题】:ActiveMQ/Camel failover for request/reply pattern using temp-queues in Network of brokers - can not publish to deleted temp-queue 【发布时间】:2016-06-30 13:53:29 【问题描述】:

由于扩展原因,我们最近从单个 ActiveMQ 代理切换到代理网络。虽然在大多数情况下,一切都按预期工作,但我们在白天重新部署代理后遇到了一个奇怪的问题:

首先,对于技术堆栈,我们使用 ActiveMQ 5.12.1 和 Camel 2.13.4 来集成 java 方法和 JMS 端点。代理端是一个代理网络,目前由 3 个成员组成,使用以下配置

<broker useJmx="$activemq.expose.jmx" persistent="false"
    brokerName="$activemq.brokerName" xmlns="http://activemq.apache.org/schema/core">
    <sslContext>
        <amq:sslContext keyStore="$activemq.broker.keyStore"
            keyStorePassword="$activemq.broker.keyStorePassword"
            trustStore="$activemq.broker.trustStore"
            trustStorePassword="$activemq.broker.trustStorePassword" />
    </sslContext>
    <systemUsage>
        <systemUsage>
            <memoryUsage>
                <memoryUsage limit="$activemq.memoryUsage" />
            </memoryUsage>
            <tempUsage>
                <tempUsage limit="$activemq.tempUsage" />
            </tempUsage>
        </systemUsage>
    </systemUsage>
    <destinationPolicy>
        <policyMap>
            <policyEntries>
                <policyEntry queue=">" enableAudit="false">
                    <networkBridgeFilterFactory>
                        <conditionalNetworkBridgeFilterFactory
                            replayWhenNoConsumers="true" />
                    </networkBridgeFilterFactory>
                </policyEntry>
            </policyEntries>
        </policyMap>
    </destinationPolicy>
    <networkConnectors>
        <networkConnector name="queues"
            uri="static:($activemq.otherBrokers)"
            networkTTL="2" dynamicOnly="true"
            decreaseNetworkConsumerPriority="true"
            conduitSubscriptions="false">
            <excludedDestinations>
                <topic physicalName=">" />
            </excludedDestinations>
        </networkConnector>
        <networkConnector name="topics"
            uri="static:($activemq.otherBrokers)"
            networkTTL="1" dynamicOnly="true"
            decreaseNetworkConsumerPriority="true"
            conduitSubscriptions="true">
            <excludedDestinations>
                <queue physicalName=">" />
            </excludedDestinations>
        </networkConnector>
    </networkConnectors>
    <transportConnectors>
        <transportConnector
            uri="$activemq.protocol$activemq.host:$activemq.tcp.port?needClientAuth=true"
            updateClusterClients="true" rebalanceClusterClients="true" />
        <transportConnector
            uri="$activemq.websocket.protocol$activemq.websocket.host:$activemq.websocket.port?needClientAuth=true"
            updateClusterClients="true" rebalanceClusterClients="true" />
    </transportConnectors>
</broker>

使用以下占位符值

activemq.tcp.port=9000
activemq.protocol=ssl://
activemq.brokerName=activemq-server1.com
activemq.expose.jmx=true
activemq.otherBrokers=ssl://server2.com:9000,ssl://server3.com:9000
activemq.websocket.port=9001
activemq.websocket.protocol=stomp+ssl://
activemq.websocket.host=server1.com
activemq.memoryUsage=1gb
activemq.tempUsage=1gb

在客户端正在使用以下骆驼配置

<bean id="xxx.activemq.redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
    <property name="maximumRedeliveries" value="0" />
</bean>

<bean id="xxx.activemq.jmsConnectionFactory" class="org.apache.activemq.ActiveMQSslConnectionFactory">
    <property name="trustStore" value="$activemq.broker.trustStore" />
    <property name="trustStorePassword" value="$activemq.broker.trustStorePassword" />
    <property name="keyStore" value="$activemq.broker.keyStore" />
    <property name="keyStorePassword" value="$activemq.broker.keyStorePassword" />
    <property name="brokerURL" value="$activemq.broker.url" />
    <property name="redeliveryPolicy" ref="xxx.activemq.redeliveryPolicy" />
</bean>

<bean id="xxx.activemq.jmsConfiguration" class="org.apache.activemq.camel.component.ActiveMQConfiguration">
    <property name="receiveTimeout" value="6000" />
    <property name="connectionFactory" ref="xxx.activemq.pooledConnectionFactory" />
</bean>

<bean id="xxx.activemq.pooledConnectionFactory"
    class="org.apache.activemq.pool.PooledConnectionFactory"
    init-method="start" destroy-method="stop">
    <property name="maxConnections" value="8" />
    <property name="idleTimeout" value="0" />
    <property name="timeBetweenExpirationCheckMillis"
        value="10000" />
    <property name="connectionFactory"
        ref="xxx.activemq.jmsConnectionFactory" />
</bean>

<bean id="xxx.activemq.jms.abstractComponent" abstract="true"
    class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="configuration"
        ref="xxx.activemq.jmsConfiguration" />
    <property name="connectionFactory"
        ref="xxx.activemq.pooledConnectionFactory" />
    <property name="allowNullBody" value="true" />
    <property name="transferException" value="true" />
    <property name="defaultTaskExecutorType"
        value="#T(org.apache.camel.component.jms.DefaultTaskExecutorType).ThreadPool" />
    <property name="requestTimeout" value="5000" />
</bean>

<bean id="xxx.activemq.jms.queue"
    parent="xxx.activemq.jms.abstractComponent">
    <property name="concurrentConsumers" value="2" />
    <property name="maxConcurrentConsumers" value="2" />
</bean>

连接网址为

activemq.broker.url=failover:(ssl://server1.com:9000,ssl://server2.com:9000,ssl://server3.com:9000)?randomize=true

请求/回复 EIP 是通过让生产者设置相应的 jmsReplyTo 标头并使用临时队列让骆驼默认为 InOut 来实现的。

在部署之前,所有消息传递都按预期工作,但是之后对于一些请求/回复队列,我们​​将开始在生产者端出现超时。日志中显示了以下条目:

生产者方面

Caused by: org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 5000 millis due reply message with correlationID: Camel-ID-xxx-intranet-phs-49404-1457684675710-8-11 not received on destination: temp-queue://ID:xxx.intranet.phs-41986-1457684806758-1:3:1. 
Exchange[Message: BeanInvocation public abstract xxx.xxx.rapi.dto.RemoteDTO xxx.xxx.xxx.facade.RemoteFacade.findRemoteDTO(java.lang.String,java.lang.Long) with [xxx, 31333]]] 

在消费者方面:

Caused by: javax.jms.InvalidDestinationException: Cannot publish to a deleted Destination: temp-queue://ID:xxx.intranet.phs-41986-1457684806758-1:3:1 

从那时起,我们进行了一些研究,发现只要网络的任意代理关闭,问题就会出现,并且仅针对那些在关闭时打开临时队列以进行回复的生产者,并且他们故障转移到新的代理。之后,该生产者的问题将持续存在,直到他重新启动。一旦他在重新启动后重新加入,一切都会恢复正常。这个问题也在grokbase 以及activemq-failover-with-temporary-queues-on-a-network-of-brokers 和activemq-how-to-handle-broker-failovers-while-using-temporary-queues 的两个主题上进行了描述。我们已经尝试了activemq-how-to-handle-broker-failovers-while-using-temporary-queues 中给出的一个解决方案来设置缓存超时,但没有从中得到任何结果,另一个建议的选项来为客户端打开咨询侦听并不是我们设置中的真正选项,因为我们想要使用诸如 clusterRebalancing 之类的功能,以便在运行时更轻松地向网络添加额外的代理。

我们还在骆驼和 ActiveMQ 方面发现了一些 JIRA 问题,例如 CAMEL-3193,这些问题描述了这个问题,并且显然针对我们更新的版本修复了这些问题,所以我们很困惑。目前我们正在考虑切换到临时队列而不是独占回复队列来解决这个问题,但首先想问一下我们是否只是在某个地方遗漏了一些配置。

如果您需要任何其他信息,请尽管询问!

【问题讨论】:

【参考方案1】:

参见AMQ-5469:临时队列在建立它们的连接断开时被删除。当您关闭代理时会发生这种情况。

解决方案可以是以下两种选择之一:

客户端在重新建立代理连接时重新创建临时队列 不要使用临时队列,使用持久队列

【讨论】:

以上是关于使用代理网络中的临时队列的请求/回复模式的 ActiveMQ/Camel 故障转移 - 无法发布到已删除的临时队列的主要内容,如果未能解决你的问题,请参考以下文章

Java EE 容器中的同步请求-回复模式

Kafka 主题过滤与微服务请求/回复模式的临时主题

将“回复”队列中的消息复制到另一个队列

使用 ActiveMQ、Camel 和 Spring 实现请求-回复模式

Servicestack RabbitMQ:当 RabbitMqProducer 无法在 RPC 模式中重新声明临时队列时,无限循环填充死信队列

使用 JMS/ActiveMQ 并发同步请求-回复 - 模式/库?