ActiveMQ中持久化协议 高可用集群
Posted 踩踩踩从踩
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMQ中持久化协议 高可用集群相关的知识,希望对你有一定的参考价值。
前言
本篇文章会从消息在MQ中的存储、移除流程、持久化消息的持久化存储时机、解activemq支持哪些持久化存储方式、持久化方式如何配置、ActiveMQ支持的协议与传输方式,高可用集群及适用场景等来解析ActiveMQ
ActiveMQ中持久化
消息在设置过持久化模式,则会持久化起来。
消息传递流程
这里在activeMQ中会存储,并返回ack进行确认发送成功,生产者会一直等待着。消息存储起来不一定是非持久化或者是持久化、这里消息如果是持久化则,需要存到磁盘才返回ack
消费者订阅过后,MQ会推送消息给消费者;最后消费成功才删除消息
ActiveMQ支持的持久化方式
- AMQ
- JDBC ActiveMQ V4 加入
- KahaDB ActiveMQ V5.3 加入,5.4开始为默认持久化方式
- LevelDB ActiveMQ V5.8 加入 官方已废弃并不在支持
- Replicated LevelDB Store ActiveMQ V5.8 加入 官方已废弃并不在支持
配置方式
<xs:element name="persistenceAdapter" maxOccurs="1" minOccurs="0">
<xs:annotation>
<xs:documentation>
<![CDATA[ Sets the persistence adaptor implementation to use for this broker]]>
</xs:documentation>
</xs:annotation>
<xs:complexType>
<xs:choice maxOccurs="1" minOccurs="0">
<xs:element ref="tns:jdbcPersistenceAdapter"/>
<xs:element ref="tns:journalPersistenceAdapter"/>
<xs:element ref="tns:kahaDB"/>
<xs:element ref="tns:levelDB"/>
<xs:element ref="tns:mKahaDB"/>
<xs:element ref="tns:memoryPersistenceAdapter"/>
<xs:element ref="tns:replicatedLevelDB"/>
<xs:any namespace="##other"/>
</xs:choice>
</xs:complexType>
</xs:element>
AMQ
<broker brokerName="broker" >
<persistenceAdapter>
<amqPersistenceAdapter directory="${activemq.base}/activemq-data" maxFileLength="32mb"/>
</persistenceAdapter>
</broker>
这里包含了 数据日志:顺序写入的,对于持久化来说,每次都要把日志放到磁盘上,查询就会比较慢, 为解决慢的问题,会把日志放到缓存中;建立索引是为了快速的查找日志,而出现的。
AMQ有缺点就是会产生大量的索引文件,重启过后,会非常耗时;都是独立存储;缓存是从索引中加载的,默认是kahadb格式的
KahaDB
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
默认的生成结构
db log files:以db-递增数字.log命名。
archive directory: 当配置支持archiving(默认不支持)并且存在,该文件夹才会创建。用于存储不再 需要的data logs。
db.data:存储btree索引
db.redo:用于hard-stop broker后,btree索引的重建
相对AMQ来说存储的更加小,所有目标的消息都存储在一起。
将日志文件大小等等属性。都可以使用,需要时在配置
如果超出大小,不会覆盖而是采用 _1后缀进行新建文件
JDBC
- 引入mysql的驱动jar,放到 activemq的 lib目录下
- 配置 activemq.xml
<broker ...>
...
<persistenceAdapter>
<!-- <kahaDB directory="${activemq.data}/kahadb"/> -->
<jdbcPersistenceAdapter dataSource="#mysql-ds" />
</persistenceAdapter>
...
</broker>
<!-- MySql DataSource Sample Setup -->
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<!-- 【注意】一定要带参数 relaxAutoCommit=true -->
<property name="url" value="jdbc:mysql://localhost/activemq? relaxAutoCommit=true"/>
<property name="username" value="activemq"/>
<property name="password" value="activemq"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
并且会在数据库中创建几个表用以存储日志
lock表存分布式锁的。 msg就是存消息的。
Multi(m) kahaDB Persistence Adapter
配置
kahaDB的每个实例都可以独立配置。如果没有向服务器提供目的地filteredKahaDB,隐式默认值将匹配任何目标、队列或主题。这是一个方便的一网打尽。如果找不到匹配的持久性适配器,则目标创建将失败除了一个例外。filteredKahaDB与每个目标共享其通配符匹配规则政策。在ActiveMQ 5.15中,filteredKahaDB支持名为usage的StoreUsage属性。这允许要对匹配队列施加的单个磁盘限制。
<broker brokerName="broker">
<persistenceAdapter>
<mKahaDB directory="${activemq.base}/data/kahadb">
<filteredPersistenceAdapters>
<!-- match all queues -->
<filteredKahaDB queue=">">
<usage>
<storeUsage limit="1g" />
</usage>
<persistenceAdapter>
<kahaDB journalMaxFileLength="32mb"/>
</persistenceAdapter>
</filteredKahaDB>
<!-- match all destinations -->
<filteredKahaDB>
<persistenceAdapter>
<kahaDB enableJournalDiskSyncs="false"/>
</persistenceAdapter>
</filteredKahaDB>
</filteredPersistenceAdapters>
</mKahaDB>
</persistenceAdapter>
</broker>
做一个通配的写法
自动每目标持久性适配器
在catch all上设置perDestination=“true”,即当未设置明确的目的地时,filteredKahaDB条目。每个匹配的目的地都将被分配其自己的实例。
做通配符的形式
<broker brokerName="broker">
<persistenceAdapter>
<mKahaDB directory="${activemq.base}/data/kahadb">
<filteredPersistenceAdapters> <!-- kahaDB per destinations -->
<filteredKahaDB perDestination="true">
<persistenceAdapter>
<kahaDB journalMaxFileLength="32mb"/>
</persistenceAdapter>
</filteredKahaDB>
</filteredPersistenceAdapters>
</mKahaDB>
</persistenceAdapter>
</broker>
协议
官方文档:
协议配置方式
在默认配置文件中,帮我们开启了大量的协议,根据不同端口号去选择不同协议。这里可以根据自己进行改变即可。
<!--The transport connectors expose ActiveMQ over a given protocol to clients and other brokers. For more information, see: http://activemq.apache.org/configuring-transports.html -->
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672? maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613? maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883? maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614? maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
基本是保持默认就可以,也可以根据需要进行修改
OpenWire
通过字节数组的方式
AMQP
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
<version>0.37.0</version>
</dependency>
// 1、创建连接工厂 JmsConnectionFactory connectionFactory = new JmsConnectionFactory(null, null, brokerUrl);
然后其他的使用方式就不用做改变
// 1、创建连接工厂
connectionFactory = new JmsConnectionFactory(null, null, brokerUrl);
// 2、创建连接 conn = connectionFactory.createConnection();
conn.start();
// 一定要start
// 3、创建会话(可以创建一个或者多个session)
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4、创建消息发送目标 (Topic or Queue)
Destination destination = session.createQueue(destinationUrl);
........
支持 nio部分
这里就能看出 ActiveMQ是基于AMQP1.0版本实现的,而rabbitMQ是基于0.9版本实现的。
MQTT
客户端使用:
- 引入mqtt客户端jar
<dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
<version>1.15</version>
</dependency>
- 编写客户端代码 这个代码比较底层化的代码
public static void main(String[] args) throws Exception {
MQTT mqtt = new MQTT(); mqtt.setHost("localhost", 1883);
// mqtt.setUserName(user);
// mqtt.setPassword(password);
FutureConnection connection = mqtt.futureConnection();
connection.connect().await();
UTF8Buffer topic = new UTF8Buffer("foo/blah/bar");
Buffer msg = new AsciiBuffer("mqtt message");
Future<?> f = connection.publish(topic, msg, QoS.AT_LEAST_ONCE, false);
f.await();
connection.disconnect().await();
System.exit(0);
}
服务端
AUTO
安全
认证
简单认证
<plugins>
<!-- Configure authentication; Username, passwords and groups -->
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="system" password="manager" groups="users,admins"/>
<authenticationUser username="user" password="password" groups="users"/>
<authenticationUser username="guest" password="password" groups="guests"/>
</users>
</simpleAuthenticationPlugin>
</plugins>
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(username,password,url);
<simpleAuthenticationPlugin anonymousAccessAllowed="true">.....
- 在activemq.xml的broker中配置JAAS插件。
<plugins>
<!--use JAAS to authenticate using the login.config file on the classpath to configure JAAS -->
<jaasAuthenticationPlugin configuration="activemq" />
</plugins>
activemq {
org.apache.activemq.jaas.PropertiesLoginModule required
org.apache.activemq.jaas.properties.user="users.properties"
org.apache.activemq.jaas.properties.group="groups.properties";
};
- 在conf/users.properties中配置用户
#用户名=密码
admin=admin
user=password
guest=guest
- 在conf/groups.properties中配置用户组(角色)
#组名=用户1,用户2 组名自定义
admins=admin
users=admin,user
guests=guest
权限控制
这里都可以在配置中添加
ActiveMQ高可用集群
Master Slave
ActiveMQ提供了主从集群机制来实现高可用。
主和从通过分布式锁进行选择的,存在公用的数据实现的,而且这个中间数据不能被杀死
- 多个Broker实例共享存储
- 多个Broker通过抢独占锁来成为Master
- Master节点对外提供服务,Slave节点暂停等待独占锁
- Master节点故障,非持久化消息将丢失,所以一般要用持久化消息。
- 客户端以多Broker故障恢复方式进行连接
failover:(tcp://broker1:61616,tcp://broker2:61616,tcp://broker3:61616)
Broke1 重启
- Shared File System Master Slave 独占锁 是共享存储目录下的lock文件
- JDBC Master Slave 独占锁为 activemq_lock表记录
之前说过在日志记录中 KahaDB 目录下面有个lock这个lock就是分布式锁实现的。
大量高并发 场景下,这个集群还是不行的。没有负载均衡的。不适合的。
分布式队列和主题
分布式队列 和主题就采用该方式 解决高并发的问题
从1.1开始,ActiveMQ支持代理网络,这使我们能够支持跨代理网络的分布式队列和主题。这允许客户端连接到网络中的任何代理,并在必要时故障转移到另一个代理存在一个失败-从客户机的角度提供一个HA代理集群
- 独立的Broker彼此相连;
- 客户端采用Failover方式连接任意一个Broker;
- 客户端生产的消息发送到它连接的broker,并存储在该broker上;
- 消费者客户端可以连接任意Broker来消费目标的消息。
存储/转发中的分布式队列
当我们在队列上发布消息时,它存储在出版商正在沟通。然后,如果该代理被配置为存储/转发到其他代理对于客户端,代理将把它发送给这些客户端中的一个(可以是节点或代理)取决于调度算法)。此分派算法将继续,直到消息被删除为止最终由客户机发送和使用。在任何时间点,在消息被消费之前,消息将只存在于一个代理的存储中。注意只有在其他代理上有消费者时,才会将消息分发到这些代理上。e、 g.如果A上的队列中有代理A、B、C和发布者。如果队列中有消费者在A和B上,队列的消息将分布在代理A和B上;一些消息将发送到B,一些消息将在A上使用,没有消息将发送到C。如果队列中有消费者从C开始,然后消息也将在那里流动。如果消费者停止,则不显示更多消息被派往C。
<broker brokerName="receiver" persistent="false" useJmx="false">
<networkConnectors>
<!-- 配置要网络连接的其他Broker -->
<networkConnector uri="static: (tcp://host1:61616,tcp://host2:61616,tcp://..)"/>
</networkConnectors>
...
</broker>
uri="static:(tcp://host1:61616,tcp://host2:61616)?maxReconnectDelay=5000&useExponentialBackOff=false"
multicast 动态发现方式:
<networkConnectors>
<networkConnector uri="multicast://default"/>
</networkConnectors>
所有broker开启multicast:
客户端连接:
failover:(tcp://broker1:61616,tcp://broker2:61616,tcp://broker3:61616)
Broker网络连接的不足:缺乏高可用
Networks + Master-Slave
将两种就进行组合,才能达到高可用。
networkConnectors 配置:
客户端连接:
failover:(tcp://broker1:61616,tcp://broker2:61616,tcp://broker3:61616,...)? randomize=true
以上是关于ActiveMQ中持久化协议 高可用集群的主要内容,如果未能解决你的问题,请参考以下文章