ActiveMQ Artemis 复制无法正常工作

Posted

技术标签:

【中文标题】ActiveMQ Artemis 复制无法正常工作【英文标题】:ActiveMQ Artemis replication colocated not working 【发布时间】:2020-03-26 19:16:45 【问题描述】:

我有一个 ActiveMQ Artemis 集群,并将它们配置为通过网络共存的复制。

这是它们的两个配置:

经纪人1

<?xml version='1.0'?>
<configuration xmlns="urn:activemq"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:xi="http://www.w3.org/2001/XInclude"
           xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="urn:activemq:core ">

  <name>10.1.1.130</name>


  <persistence-enabled>true</persistence-enabled>

  <!-- this could be ASYNCIO, MAPPED, NIO
       ASYNCIO: Linux Libaio
       MAPPED: mmap files
       NIO: Plain Java Files
   -->
  <journal-type>ASYNCIO</journal-type>

  <paging-directory>data/paging</paging-directory>

  <bindings-directory>data/bindings</bindings-directory>

  <journal-directory>data/journal</journal-directory>

  <large-messages-directory>data/large-messages</large-messages-directory>

  <journal-datasync>true</journal-datasync>

  <journal-min-files>2</journal-min-files>

  <journal-pool-files>10</journal-pool-files>

  <journal-device-block-size>4096</journal-device-block-size>

  <journal-file-size>10M</journal-file-size>

  <journal-buffer-timeout>23940000</journal-buffer-timeout>

  <journal-max-io>1</journal-max-io>

  <!-- how often we are looking for how many bytes are being used on the disk in ms -->
  <disk-scan-period>5000</disk-scan-period>

  <!-- once the disk hits this limit the system will block, or close the connection in certain protocols
       that won't support flow control. -->
  <max-disk-usage>90</max-disk-usage>

  <!-- should the broker detect dead locks and other issues -->
  <critical-analyzer>true</critical-analyzer>

  <critical-analyzer-timeout>120000</critical-analyzer-timeout>

  <critical-analyzer-check-period>60000</critical-analyzer-check-period>

  <critical-analyzer-policy>HALT</critical-analyzer-policy>


 <!-- Clustering configuration -->
  <connectors>
     <connector name="netty-connector">tcp://localhost:61616</connector>
     <!-- connector to the server2 -->
     <connector name="server2-connector">tcp://10.1.1.131:61616</connector>
  </connectors>
  <ha-policy>
     <replication>
        <colocated>
           <backup-request-retries>-1</backup-request-retries>
           <backup-request-retry-interval>2000</backup-request-retry-interval>
           <excludes>
              <connector-ref>server2-connector</connector-ref>
              <connector-ref>netty-connector</connector-ref>
           </excludes>
           <max-backups>1</max-backups>
           <request-backup>true</request-backup>
           <master> 
           </master>
           <slave>
           </slave>
        </colocated>
     </replication>
  </ha-policy>
 <cluster-user>ACTIVEMQ.CLUSTER.ADMIN.USER</cluster-user>
 <cluster-password>123456</cluster-password>
 <cluster-connections>
     <cluster-connection name="my-cluster">
        <connector-ref>netty-connector</connector-ref>
        <retry-interval>500</retry-interval>
        <use-duplicate-detection>true</use-duplicate-detection>
        <message-load-balancing>STRICT</message-load-balancing>
        <max-hops>1</max-hops>
        <static-connectors>
           <connector-ref>server2-connector</connector-ref>
        </static-connectors>
     </cluster-connection>
  </cluster-connections>

  <acceptors>
     <!-- Acceptor for every supported protocol -->
     <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>

     <!-- AMQP Acceptor.  Listens on default AMQP port for AMQP traffic.-->
     <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>

     <!-- STOMP Acceptor. -->
     <acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>

     <!-- HornetQ Compatibility Acceptor.  Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
     <acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>

     <!-- MQTT Acceptor -->
     <acceptor name="mqtt">tcp://0.0.0.0:4883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>

  </acceptors>


  <security-settings>
     <security-setting match="#">
        <permission type="createNonDurableQueue" roles="amq"/>
        <permission type="deleteNonDurableQueue" roles="amq"/>
        <permission type="createDurableQueue" roles="amq"/>
        <permission type="deleteDurableQueue" roles="amq"/>
        <permission type="createAddress" roles="amq"/>
        <permission type="deleteAddress" roles="amq"/>
        <permission type="consume" roles="amq"/>
        <permission type="browse" roles="amq"/>
        <permission type="send" roles="amq"/>
        <!-- we need this otherwise ./artemis data imp wouldn't work -->
        <permission type="manage" roles="amq"/>
     </security-setting>
  </security-settings>

经纪人2:

<?xml version='1.0'?>
<configuration xmlns="urn:activemq"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:xi="http://www.w3.org/2001/XInclude"
           xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="urn:activemq:core ">


  <name>10.1.1.131</name>


  <persistence-enabled>true</persistence-enabled>

  <!-- this could be ASYNCIO, MAPPED, NIO
       ASYNCIO: Linux Libaio
       MAPPED: mmap files
       NIO: Plain Java Files
   -->
  <journal-type>ASYNCIO</journal-type>

  <paging-directory>data/paging</paging-directory>

  <bindings-directory>data/bindings</bindings-directory>

  <journal-directory>data/journal</journal-directory>

  <large-messages-directory>data/large-messages</large-messages-directory>

  <journal-datasync>true</journal-datasync>

  <journal-min-files>2</journal-min-files>

  <journal-pool-files>10</journal-pool-files>

  <journal-device-block-size>4096</journal-device-block-size>

  <journal-file-size>10M</journal-file-size>

  <journal-buffer-timeout>23940000</journal-buffer-timeout>

  <journal-max-io>1</journal-max-io>

  <!-- how often we are looking for how many bytes are being used on the disk in ms -->
  <disk-scan-period>5000</disk-scan-period>

  <!-- once the disk hits this limit the system will block, or close the connection in certain protocols
       that won't support flow control. -->
  <max-disk-usage>90</max-disk-usage>

  <!-- should the broker detect dead locks and other issues -->
  <critical-analyzer>true</critical-analyzer>

  <critical-analyzer-timeout>120000</critical-analyzer-timeout>

  <critical-analyzer-check-period>60000</critical-analyzer-check-period>

  <critical-analyzer-policy>HALT</critical-analyzer-policy>


 <!-- Clustering configuration -->
  <connectors>
     <connector name="netty-connector">tcp://localhost:61616</connector>
     <!-- connector to the server1 -->
     <connector name="server1-connector">tcp://10.1.1.130:61616</connector>
  </connectors>
  <ha-policy>
     <replication>
        <colocated>
           <backup-request-retries>-1</backup-request-retries>
           <backup-request-retry-interval>2000</backup-request-retry-interval>
           <excludes>
              <connector-ref>server1-connector</connector-ref>
              <connector-ref>netty-connector</connector-ref>
           </excludes>
           <max-backups>1</max-backups>
           <request-backup>true</request-backup>
           <master> 
           </master>
           <slave>
           </slave>
        </colocated>
     </replication>
  </ha-policy>
 <cluster-user>ACTIVEMQ.CLUSTER.ADMIN.USER</cluster-user>
 <cluster-password>123456</cluster-password>
 <cluster-connections>
     <cluster-connection name="my-cluster">
        <connector-ref>netty-connector</connector-ref>
        <retry-interval>500</retry-interval>
        <use-duplicate-detection>true</use-duplicate-detection>
        <message-load-balancing>STRICT</message-load-balancing>
        <max-hops>1</max-hops>
        <static-connectors>
           <connector-ref>server1-connector</connector-ref>
        </static-connectors>
     </cluster-connection>
  </cluster-connections>

  <acceptors>
     <!-- Acceptor for every supported protocol -->
     <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>

     <!-- AMQP Acceptor.  Listens on default AMQP port for AMQP traffic.-->
     <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>

     <!-- STOMP Acceptor. -->
     <acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>

     <!-- HornetQ Compatibility Acceptor.  Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
     <acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>

     <!-- MQTT Acceptor -->
     <acceptor name="mqtt">tcp://0.0.0.0:4883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>

  </acceptors>


  <security-settings>
     <security-setting match="#">
        <permission type="createNonDurableQueue" roles="amq"/>
        <permission type="deleteNonDurableQueue" roles="amq"/>
        <permission type="createDurableQueue" roles="amq"/>
        <permission type="deleteDurableQueue" roles="amq"/>
        <permission type="createAddress" roles="amq"/>
        <permission type="deleteAddress" roles="amq"/>
        <permission type="consume" roles="amq"/>
        <permission type="browse" roles="amq"/>
        <permission type="send" roles="amq"/>
        <!-- we need this otherwise ./artemis data imp wouldn't work -->
        <permission type="manage" roles="amq"/>
     </security-setting>
  </security-settings>

这里是发送消息的 Srping Boot java 代码:

@Configuration
public class ArtemisProducerConfig extends BaseObject 
@Value("$artemis.broker-url")
private String brokerUrl;

@Bean
public ActiveMQConnectionFactory senderActiveMQConnectionFactory() 
    return new ActiveMQConnectionFactory(brokerUrl);


@Bean
public CachingConnectionFactory cachingConnectionFactory() 
    return new CachingConnectionFactory(senderActiveMQConnectionFactory());


@Bean
public JmsTemplate jmsTemplate() 
    JmsTemplate template =  new JmsTemplate(cachingConnectionFactory());
    template.setExplicitQosEnabled(true);
    template.setDeliveryPersistent(true);
    return template;




jmsTemplate.convertAndSend("test.address::test.queue", inputData.getData(), new MessagePostProcessor() 

        @Override
        public Message postProcessMessage(Message message) throws JMSException 
            // TODO Auto-generated method stub
            message.setJMSCorrelationID(inputData.getCorrID());
            return message;
        
    );

消息发送到 Broker1 成功,我可以在网站http://10.1.1.130:8161 上查看。 但是在 Broker2 上现在没有可用的消息。我知道消息必须备份到 Broker2 才能满足 HA。

有人可以帮我举个例子,将 Artemis 配置为通过网络托管复制吗? 谢谢!

【问题讨论】:

【参考方案1】:

ActiveMQ Artemis 使用主动/被动方案来实现高可用性。因此,在复制用例中,一个主动的“活动”代理有一个被动的“从属”代理,它会将消息复制到该代理。在复制 & colocated 配置中,每个 JVM 实际上有 2 个代理(即两个代理位于同一个 JVM 中)。一个代理处于活动状态,另一个代理充当集群中另一个代理的备份。在 live broker 失败之前,您将不会“看到”从属服务器上的消息,此时从属服务器将激活并成为主服务器。

要确认复制是否按预期进行,您可以检查主服务器和从服务器上的日志文件。奴隶首先会有这样的东西:

INFO  [org.apache.activemq.artemis.core.server] AMQ221109: Apache ActiveMQ Artemis Backup Server version X.X.X [null] started, waiting live to fail before it gets active

那么直播会有这样的:

INFO  [org.apache.activemq.artemis.core.server] AMQ221025: Replication: sending NiosequentialFile /path/to/data/journal/activemq-data-2.amq (size=10,485,760) to replica.
INFO  [org.apache.activemq.artemis.core.server] AMQ221025: Replication: sending NIOSequentialFile /path/to/data/bindings/activemq-bindings-3.bindings (size=1,048,576) to replica.
INFO  [org.apache.activemq.artemis.core.server] AMQ221025: Replication: sending NIOSequentialFile /path/to/data/bindings/activemq-bindings-2.bindings (size=1,048,576) to replica.
INFO  [org.apache.activemq.artemis.core.server] AMQ221024: Backup server ActiveMQServerImpl::serverUUID=8d7477d0-1518-11ea-abd1-a0afbd82eaba is synchronized with live-server.

那么最终奴隶将拥有:

INFO  [org.apache.activemq.artemis.core.server] AMQ221031: backup announced

【讨论】:

感谢您的回答。我可以看到日志文件已经有你说的日志:AMQ221031:备份宣布...但据我了解,我的配置是同位的,这意味着,Broker1 是 Broker2 的实时备份,反之亦然。而且,为了确保可用性,所有消息都需要复制到所有可用的 Broker。因为,如果消息没有复制到其他 Broker,如果服务器突然死机,消息可能会丢失。此行为不能保证可用性。 在我们的微服务系统中,可能Producer向Broker1发送消息,Consumer从Broker2接收,现在Consumer可能永远收不到消息,因为消息没有复制到Broker2。 我更新了我的答案以解决您的 cmets。我希望澄清。我建议您运行一些功能测试(即设置一个具有同地 HA 和少数客户端的集群并杀死其中一个代理),以亲自查看复制是否按预期工作。 您提到了一个用例,其中生产者向 Broker1 发送消息,而消费者尝试从 Broker2 接收消息但由于据称复制不起作用而无法接收。这不是复制所做的。这是消息负载平衡或重新分配的一个用例,这两个都在clustering documentation 中讨论。 消息复制和再分发在 ActiveMQ Artemis 中是完全不同的概念。该文档非常详细地介绍了对基于过滤器/选择器的重新分配的支持。请参阅标题为"Redistribution and filters (selectors)" 的部分。

以上是关于ActiveMQ Artemis 复制无法正常工作的主要内容,如果未能解决你的问题,请参考以下文章

使用ActiveMQ Artemis进行重连

Artemis安装

Centos 下activemq 升级到apache-artemis

如何在 ActiveMQ Artemis 中调度消息

ActiveMQ 经典到 ActiveMQ Artemis 故障转移不起作用

Apache ActiveMQ Artemis简介