ActiveMQ 集群和主从
Posted 偶尔发呆
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMQ 集群和主从相关的知识,希望对你有一定的参考价值。
举例说明:
假设有 3 个 broker 节点,分别是61616,61618, 61620,其中 61616 和 61618 组成主、从节点,而 61616(或61618)和 61620 构成集群。
61616 和 61618 使用 jdbc 持久化,61620 使用 kahaDB。
这样混合配置:能更好地理解主从和集群的区别,61616 和 61618 在同一时刻只有一个 broker 提供服务,而集群是同时在提供服务,消息在集群的节点之间流转。
61616 配置:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <broker useJmx="false" brokerName="jdbcBroker61616" xmlns="http://activemq.apache.org/schema/core"> <persistenceAdapter> <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql-ds"/> </persistenceAdapter> <networkConnectors> <networkConnector uri="multicast://default" dynamicOnly="true" networkTTL="3" prefetchSize="1" decreaseNetworkConsumerPriority="true" /> </networkConnectors> <transportConnectors> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616" discoveryUri="multicast://default"/> </transportConnectors> </broker> <!-- MySql DataSource Sample Setup --> <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://192.168.40.8:3306/db_zhang?relaxAutoCommit=true"/> <property name="username" value="root"/> <property name="password" value="root"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean> </beans>
61618 配置:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <broker useJmx="false" brokerName="jdbcBroker61618" xmlns="http://activemq.apache.org/schema/core"> <persistenceAdapter> <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql-ds"/> </persistenceAdapter> <networkConnectors> <networkConnector uri="multicast://default" dynamicOnly="true" networkTTL="3" prefetchSize="1" decreaseNetworkConsumerPriority="true" /> </networkConnectors> <transportConnectors> <transportConnector name="openwire" uri="tcp://0.0.0.0:61618" discoveryUri="multicast://default"/> </transportConnectors> </broker> <!-- MySql DataSource Sample Setup --> <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://192.168.40.8:3306/db_zhang?relaxAutoCommit=true"/> <property name="username" value="root"/> <property name="password" value="root"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean> </beans>
61620 配置:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd"> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations" value="classpath:mq.properties"/> </bean> <broker xmlns="http://activemq.apache.org/schema/core" brokerName="dynamic-broker2" dataDirectory="${activemq.base}/data"> <!-- Destination specific policies using destination names or wildcards --> <destinationPolicy> <policyMap> <policyEntries> <policyEntry queue=">" producerFlowControl="true" memoryLimit="20mb"> <deadLetterStrategy> <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" /> </deadLetterStrategy> </policyEntry> <policyEntry topic=">" producerFlowControl="true" memoryLimit="20mb"> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> <!-- Use the following to configure how ActiveMQ is exposed in JMX --> <managementContext> <managementContext createConnector="true" connectorPort="1100"/> </managementContext> <!-- Configure network connector to use multicast protocol For more information, see http://activemq.apache.org/multicast-transport-reference.html --> <networkConnectors> <networkConnector uri="multicast://default" dynamicOnly="true" networkTTL="3" prefetchSize="1" decreaseNetworkConsumerPriority="true" /> </networkConnectors> <persistenceAdapter> <kahaDB directory="${activemq.base}/data/dynamic-broker2/kahadb" /> </persistenceAdapter> <!-- The maximum amount of space the broker will use before slowing down producers --> <systemUsage> <systemUsage> <memoryUsage> <memoryUsage limit="20 mb"/> </memoryUsage> <storeUsage> <storeUsage limit="1 gb" name="foo"/> </storeUsage> <tempUsage> <tempUsage limit="100 mb"/> </tempUsage> </systemUsage> </systemUsage> <!-- The transport connectors ActiveMQ will listen to Configure discovery URI to use multicast protocol --> <transportConnectors> <transportConnector name="openwire" uri="tcp://0.0.0.0:61620" discoveryUri="multicast://default" /> </transportConnectors> </broker> </beans>
其中一个 producer 配置:
new ActiveMQConnectionFactory("failover:(tcp://localhost:61616,tcp://localhost:61618)");
其中一个 consumer 配置:
new ActiveMQConnectionFactory("tcp://localhost:61620");
producer 往(61616主,61618从)发送消息,consumer 可以从 61620 获取消息。当 61616 下线后,61618 会成为主,并和 61620 构成集群。
以上是关于ActiveMQ 集群和主从的主要内容,如果未能解决你的问题,请参考以下文章