ActiveMQ 集群和主从

Posted 偶尔发呆

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMQ 集群和主从相关的知识,希望对你有一定的参考价值。

举例说明:
假设有 3 个 broker 节点,分别是61616,61618, 61620,其中 61616 和 61618 组成主、从节点,而 61616(或61618)和 61620 构成集群。
61616 和 61618 使用 jdbc 持久化,61620 使用 kahaDB。

这样混合配置:能更好地理解主从和集群的区别,61616 和 61618 在同一时刻只有一个 broker 提供服务,而集群是同时在提供服务,消息在集群的节点之间流转。

61616 配置:

<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans 
        http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
        http://activemq.apache.org/schema/core 
        http://activemq.apache.org/schema/core/activemq-core.xsd">
  
  <broker useJmx="false" brokerName="jdbcBroker61616" xmlns="http://activemq.apache.org/schema/core">
    <persistenceAdapter>
       <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql-ds"/>
    </persistenceAdapter>

    <networkConnectors>
      <networkConnector uri="multicast://default"
        dynamicOnly="true" 
        networkTTL="3" 
        prefetchSize="1" 
        decreaseNetworkConsumerPriority="true" />
    </networkConnectors>
        
    <transportConnectors>
       <transportConnector name="openwire" uri="tcp://0.0.0.0:61616" discoveryUri="multicast://default"/>
    </transportConnectors>
  </broker>

  <!-- MySql DataSource Sample Setup -->
  <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://192.168.40.8:3306/db_zhang?relaxAutoCommit=true"/>
    <property name="username" value="root"/>
    <property name="password" value="root"/>
    <property name="maxActive" value="200"/>
    <property name="poolPreparedStatements" value="true"/>
  </bean>
</beans>

61618 配置:

<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans 
                http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
                http://activemq.apache.org/schema/core 
                http://activemq.apache.org/schema/core/activemq-core.xsd">
  
  <broker useJmx="false" brokerName="jdbcBroker61618" xmlns="http://activemq.apache.org/schema/core">
    <persistenceAdapter>
       <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql-ds"/>
    </persistenceAdapter>
    <networkConnectors>
      <networkConnector uri="multicast://default"
        dynamicOnly="true" 
        networkTTL="3" 
        prefetchSize="1" 
        decreaseNetworkConsumerPriority="true" />
    </networkConnectors>
        
    <transportConnectors>
       <transportConnector name="openwire" uri="tcp://0.0.0.0:61618" discoveryUri="multicast://default"/>
    </transportConnectors>
  </broker>
 
  <!-- MySql DataSource Sample Setup -->
  <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://192.168.40.8:3306/db_zhang?relaxAutoCommit=true"/>
    <property name="username" value="root"/>
    <property name="password" value="root"/>
    <property name="maxActive" value="200"/>
    <property name="poolPreparedStatements" value="true"/>
  </bean>
</beans>

61620 配置:

<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd   
  http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">

    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations" value="classpath:mq.properties"/>
    </bean>

    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="dynamic-broker2" dataDirectory="${activemq.base}/data">

        <!-- Destination specific policies using destination names or wildcards -->
        <destinationPolicy>
            <policyMap>
                <policyEntries>
                    <policyEntry queue=">" producerFlowControl="true" memoryLimit="20mb">
                        <deadLetterStrategy>
                          <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" />
                        </deadLetterStrategy>
                    </policyEntry>
                    <policyEntry topic=">" producerFlowControl="true" memoryLimit="20mb">
                    </policyEntry>
                </policyEntries>
            </policyMap>
        </destinationPolicy>

        <!-- Use the following to configure how ActiveMQ is exposed in JMX -->
        <managementContext>
            <managementContext createConnector="true" connectorPort="1100"/>
        </managementContext>

        <!--
            Configure network connector to use multicast protocol
            For more information, see
            
            http://activemq.apache.org/multicast-transport-reference.html
        -->
        <networkConnectors>
          <networkConnector uri="multicast://default"
            dynamicOnly="true" 
            networkTTL="3" 
            prefetchSize="1" 
            decreaseNetworkConsumerPriority="true" />
        </networkConnectors>
        
    
        <persistenceAdapter>
            <kahaDB directory="${activemq.base}/data/dynamic-broker2/kahadb" />
        </persistenceAdapter>
        
        <!--  The maximum amount of space the broker will use before slowing down producers -->
        <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage limit="20 mb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="1 gb" name="foo"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="100 mb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <!-- 
            The transport connectors ActiveMQ will listen to
            Configure discovery URI to use multicast protocol
        -->
        <transportConnectors>
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61620" discoveryUri="multicast://default" />
        </transportConnectors>

    </broker>
</beans>

其中一个 producer 配置:

new ActiveMQConnectionFactory("failover:(tcp://localhost:61616,tcp://localhost:61618)");

其中一个 consumer 配置:

new ActiveMQConnectionFactory("tcp://localhost:61620");

producer 往(61616主,61618从)发送消息,consumer 可以从 61620 获取消息。当 61616 下线后,61618 会成为主,并和 61620 构成集群。

 

以上是关于ActiveMQ 集群和主从的主要内容,如果未能解决你的问题,请参考以下文章

ActiveMQ集群

基于zookeeper的activemq的主从集群配置

ActiveMQ的集群与高可用实践

四ActiveMQ Zookeeper集群

消息中间件一 ActiveMQ简单使用和安装

LinuxWindows安装ActiveMQ