ActiveMQ 消费/转发来自另一个 ActiveMQ 实例的消息

Posted

技术标签:

【中文标题】ActiveMQ 消费/转发来自另一个 ActiveMQ 实例的消息【英文标题】:ActiveMQ consume/forward messages from another ActiveMQ instance 【发布时间】:2022-01-23 19:24:30 【问题描述】:

我有两个代理 A 和 B。如果我想将消息从 A 转发到 B,一切都很简单。我只需要像这样配置的代理中的网络连接器:

<networkConnectors>
    <networkConnector staticBridge="true" userName="user" password="pass" uri="static://(tcp://B:61616)">
        <staticallyIncludedDestinations>
            <queue physicalName="QUEUE.TO.FORWARD.MESSAGE" />
        </staticallyIncludedDestinations>
    </networkConnector>
</networkConnectors>

如果我想从其他队列(我们将其命名为 QUEUE.TO.CONSUME)中使用来自代理 B 的消息,我只需要做同样的事情,但将双工设置为 true,然后只听 QUEUE.TO.CONSUME像这样在经纪人 A 上:

<networkConnectors>
    <networkConnector name="from-B-to-A" staticBridge="true" duplex="true" userName="user" password="pass" uri="static://(tcp://B:61616)">
        <staticallyIncludedDestinations>
            <queue physicalName="QUEUE.TO.CONSUME" />
        </staticallyIncludedDestinations>
    </networkConnector>
    <networkConnector staticBridge="true" userName="user" password="pass" uri="static://(tcp://B:61616)">
        <staticallyIncludedDestinations>
            <queue physicalName="QUEUE.TO.FORWARD.MESSAGE" />
        </staticallyIncludedDestinations>
    </networkConnector>
</networkConnectors>

但它并没有像我预期的那样工作。似乎只有每秒钟的消息被转发,其余的只是丢失了。令人惊讶的是,它在代理 B QUEUE.TO.CONSUME 上创建了两个消费者,我假设其中一个消费者在不转发到代理 A 的情况下使用消息。如何在代理 A 上创建桥接器,允许我在不丢失消息的情况下使用来自代理 B 的消息。在代理 B 中创建网络连接器目前不是一个选项。 我也尝试过像这样创建入站队列桥:

<jmsBridgeConnectors>
    <jmsQueueConnector outboundQueueConnectionFactory="#remoteBroker" localUsername="user" localPassword="password">
        <inboundQueueBridges>
            <inboundQueueBridge inboundQueueName="QUEUE.TO.CONSUME" localQueueName="QUEUE.TO.CONSUME" />
        </inboundQueueBridges>
    </jmsQueueConnector>
</jmsBridgeConnectors>
...
</broker>
<bean id="remoteBroker" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="failover://(nio:B:61616)" />
        <property name="userName" value="user" />
        <property name="password" value="password" />
</bean>

此配置在远程代理 B 上创建消费者,但它不消耗任何消息,这些消息只是挂在队列中并且没有任何反应。 Broker A 仍然没有收到任何消息到它的本地队列。

【问题讨论】:

【参考方案1】:

好的,我想通了。我刚刚使用嵌入式 Apache Camel 来定义到远程主机的路由,它看起来像这样(conf 目录中的camel.xml):

<beans
        xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:camel="http://camel.apache.org/schema/spring"
        xsi:schemaLocation="
     http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
     http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

    <camelContext id="context" xmlns="http://camel.apache.org/schema/spring">
        <route>
            <from uri="remoteBroker:queue:QUEUE.TO.CONSUME"/>
            <to uri="localBroker:queue:QUEUE.TO.CONSUME"/>
        </route>
    </camelContext>

    <bean id="remoteBroker" class="org.apache.activemq.camel.component.ActiveMQComponent">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL" value="tcp://B:61616"/>
                <property name="userName" value="user"/>
                <property name="password" value="password"/>
            </bean>
        </property>
    </bean>

    <bean id="localBroker" class="org.apache.activemq.camel.component.ActiveMQComponent">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL" value="vm://localhost"/>
            </bean>
        </property>
    </bean>

</beans>

在哪里 localhost 我代理 A。在 activemq.xml 中:

<import resource="camel.xml"/>

【讨论】:

以上是关于ActiveMQ 消费/转发来自另一个 ActiveMQ 实例的消息的主要内容,如果未能解决你的问题,请参考以下文章

JmsListener 没有收到来自 Activemq 的消息

ActiveMQ——ActiveMQ的集群

独立消费者(SpringJMS)在ActiveMQ上创建了另一个队列

ActiveMQ(13):ActiveMQ的集群

ActiveMq Prefetch

ActiveMQ(11):集群下的消息回流功能