ActiveMQ中持久化协议 高可用集群

Posted 踩踩踩从踩

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMQ中持久化协议 高可用集群相关的知识,希望对你有一定的参考价值。

前言

本篇文章会从消息在MQ中的存储、移除流程、持久化消息的持久化存储时机、解activemq支持哪些持久化存储方式、持久化方式如何配置、ActiveMQ支持的协议与传输方式,高可用集群及适用场景等来解析ActiveMQ

ActiveMQ中持久化

消息在设置过持久化模式,则会持久化起来。

消息传递流程

这里在activeMQ中会存储,并返回ack进行确认发送成功,生产者会一直等待着。消息存储起来不一定是非持久化或者是持久化、这里消息如果是持久化则,需要存到磁盘才返回ack

消费者订阅过后,MQ会推送消息给消费者;最后消费成功才删除消息

ActiveMQ支持的持久化方式

ActiveMQ支持的持久化方式

  • AMQ
  • JDBC ActiveMQ V4 加入
  • KahaDB ActiveMQ V5.3 加入,5.4开始为默认持久化方式
  • LevelDB ActiveMQ V5.8 加入 官方已废弃并不在支持
  • Replicated LevelDB Store ActiveMQ V5.8 加入 官方已废弃并不在支持

 配置方式

配置方式

<xs:element name="persistenceAdapter" maxOccurs="1" minOccurs="0"> 
    <xs:annotation> 
        <xs:documentation> 
        <![CDATA[ Sets the persistence adaptor implementation to use for this broker]]>             
       </xs:documentation> 
     </xs:annotation> 
     <xs:complexType> 
        <xs:choice maxOccurs="1" minOccurs="0"> 
            <xs:element ref="tns:jdbcPersistenceAdapter"/> 
            <xs:element ref="tns:journalPersistenceAdapter"/> 
            <xs:element ref="tns:kahaDB"/> 
            <xs:element ref="tns:levelDB"/> 
            <xs:element ref="tns:mKahaDB"/> 
            <xs:element ref="tns:memoryPersistenceAdapter"/> 
            <xs:element ref="tns:replicatedLevelDB"/> 
            <xs:any namespace="##other"/> 
        </xs:choice> 
     </xs:complexType> 
</xs:element>

AMQ

基于文件存储。它具有写入速度快和容易恢复的特点,但是由于其重建索引时间过长,而且索引文件占 用磁盘空间过大,所以已经不推荐使用。
<broker brokerName="broker" > 
    <persistenceAdapter> 
        <amqPersistenceAdapter directory="${activemq.base}/activemq-data" maxFileLength="32mb"/> 
    </persistenceAdapter> 
</broker>

这里包含了 数据日志:顺序写入的,对于持久化来说,每次都要把日志放到磁盘上,查询就会比较慢, 为解决慢的问题,会把日志放到缓存中;建立索引是为了快速的查找日志,而出现的。

AMQ有缺点就是会产生大量的索引文件,重启过后,会非常耗时;都是独立存储;缓存是从索引中加载的,默认是kahadb格式的

KahaDB

KahaDB 是一个基于文件的持久性数据库,它位于使用它的消息代理的本地。它已经为快速持久性进行 了优化。它是ActiveMQ 5.4 以来的默认存储机制。 KahaDB 使用更少的文件描述符,并且比它的前身 AMQ消息存储提供更快的恢复。 【官方推荐使用的持久化存储方式】
<persistenceAdapter> 
    <kahaDB directory="${activemq.data}/kahadb"/> 
</persistenceAdapter>

默认的生成结构

db log files:以db-递增数字.log命名。
 archive directory: 当配置支持archiving(默认不支持)并且存在,该文件夹才会创建。用于存储不再 需要的data logs。 
db.data:存储btree索引
 db.redo:用于hard-stop broker后,btree索引的重建

相对AMQ来说存储的更加小,所有目标的消息都存储在一起。

将日志文件大小等等属性。都可以使用,需要时在配置

 如果超出大小,不会覆盖而是采用 _1后缀进行新建文件

JDBC

对于长期的持久性,建议使用 JDBC 和高性能日志。如果愿意,可以只使用 JDBC ,但是它非常慢。
JDBC + 高性能日志用: journalPersistenceAdapter 参考: http://activemq.apache.org/persistence
这个就支持两种方式,一是磁盘,二是数据库中;并不是存两份,

 

MySQL 示例
  • 引入mysql的驱动jar,放到 activemq lib目录下
  • 配置 activemq.xml
<broker ...> 
    ... 
    <persistenceAdapter> 
    <!-- <kahaDB directory="${activemq.data}/kahadb"/> --> 
    <jdbcPersistenceAdapter dataSource="#mysql-ds" /> 
    </persistenceAdapter> 
     ... 
</broker>


<!-- MySql DataSource Sample Setup --> 
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"> 
        <property name="driverClassName" value="com.mysql.jdbc.Driver"/> 
    <!-- 【注意】一定要带参数 relaxAutoCommit=true --> 
        <property name="url" value="jdbc:mysql://localhost/activemq? relaxAutoCommit=true"/> 
        <property name="username" value="activemq"/> 
        <property name="password" value="activemq"/> 
        <property name="poolPreparedStatements" value="true"/> 
</bean>

 并且会在数据库中创建几个表用以存储日志

 lock表存分布式锁的。 msg就是存消息的。

 Multi(m) kahaDB Persistence Adapter

kahaDB 应用,也是由于kahaDB是存在一个地方的,但是,对于不同的消息混在一起的,存储时间效率不同都放在一起,
ActiveMQ 5.6: 可以跨多个 kahdb 持久性适配器分发目标存储。你什么时候会这么做 ? 如果您有一个快速 的生产者/ 消费者目的地和另一个定期的生产者目的地,它们具有不规则的批处理消费,那么随着未消费的消息分布在多个日志文件中,磁盘使用可能会失控。每个一个单独的日志,可以确保最小限度地使用日志。此外,有些目的地可能很重要,需要磁盘同步,而有些则不是。在这些情况下,您可以使用mKahaDB持久性适配器,并使用通配符过滤目的地,就像使用目的地策略一样。
不规则的批处理消息。
最后就会导致大量的消息堆积。

配置

kahaDB的每个实例都可以独立配置。如果没有向服务器提供目的地filteredKahaDB,隐式默认值将匹配任何目标、队列或主题。这是一个方便的一网打尽。如果找不到匹配的持久性适配器,则目标创建将失败除了一个例外。filteredKahaDB与每个目标共享其通配符匹配规则政策。在ActiveMQ 5.15中,filteredKahaDB支持名为usage的StoreUsage属性。这允许要对匹配队列施加的单个磁盘限制。

<broker brokerName="broker"> 
    <persistenceAdapter> 
    <mKahaDB directory="${activemq.base}/data/kahadb"> 
    <filteredPersistenceAdapters> 
      <!-- match all queues --> 
        <filteredKahaDB queue=">"> 
            <usage> 
                <storeUsage limit="1g" /> 
            </usage> 
            <persistenceAdapter> 
                <kahaDB journalMaxFileLength="32mb"/> 
            </persistenceAdapter> 
        </filteredKahaDB> 
        <!-- match all destinations --> 
        <filteredKahaDB> 
            <persistenceAdapter> 
                <kahaDB enableJournalDiskSyncs="false"/> 
            </persistenceAdapter> 
        </filteredKahaDB> 
    </filteredPersistenceAdapters> 
    </mKahaDB> 
    </persistenceAdapter> 
</broker>

做一个通配的写法

自动每目标持久性适配器

在catch all上设置perDestination=“true”,即当未设置明确的目的地时,filteredKahaDB条目。每个匹配的目的地都将被分配其自己的实例。

做通配符的形式

<broker brokerName="broker"> 
    <persistenceAdapter> 
        <mKahaDB directory="${activemq.base}/data/kahadb"> 
        <filteredPersistenceAdapters> <!-- kahaDB per destinations --> 
            <filteredKahaDB perDestination="true"> 
                <persistenceAdapter> 
                    <kahaDB journalMaxFileLength="32mb"/> 
                </persistenceAdapter> 
            </filteredKahaDB> 
        </filteredPersistenceAdapters>
        </mKahaDB> 
    </persistenceAdapter> 
</broker>

协议

官方文档:

https://activemq.apache.org/uri-protocols
https://activemq.apache.org/protocols
http://activemq.apache.org/confifiguring-transports.html
ActiveMQ 支持多种协议,包括下面的
AMQP 、AUTO 、MQTT 、OpenWire 、REST 、RSS and Atom 、Stomp 、WSIF 、WS Notifification 、XMPP
当选择使用某种协议时,一定要从 ActiveMQ 官网了解清楚它实现的该协议的那个版本。
在官网中有大量协议介绍

协议配置方式

官方文档

在默认配置文件中,帮我们开启了大量的协议,根据不同端口号去选择不同协议。这里可以根据自己进行改变即可。

 

<!--The transport connectors expose ActiveMQ over a given protocol to clients and other brokers. For more information, see: http://activemq.apache.org/configuring-transports.html --> 
<transportConnectors> 
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->     
      <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> 
    <transportConnector name="amqp" uri="amqp://0.0.0.0:5672? maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="stomp" uri="stomp://0.0.0.0:61613? maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> 
    <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883? maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> 
    <transportConnector name="ws" uri="ws://0.0.0.0:61614? maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> 
</transportConnectors>

基本是保持默认就可以,也可以根据需要进行修改

OpenWire

OpenWire Apache 的一种跨语言的协议,允许从不同的语言和平台访问 ActiveMQ ,是 ActiveMQ 4.x 以后的版本默认的传输协议
包括下面的官方文档 以及简介
http://activemq.apache.org/openwire
http://activemq.apache.org/openwire-version-2-specifification.html
http://activemq.apache.org/wire-protocol.html

通过字节数组的方式

AMQP 

服务端配置说明

客户端使用:
引入 AQMP 实现客户端
<dependency> 
    <groupId>org.apache.qpid</groupId> 
    <artifactId>qpid-jms-client</artifactId> 
    <version>0.37.0</version> 
</dependency>
连接工厂换成如下的 JmsConnectionFactory
// 1、创建连接工厂 JmsConnectionFactory connectionFactory = new JmsConnectionFactory(null, null, brokerUrl);

然后其他的使用方式就不用做改变

// 1、创建连接工厂 
connectionFactory = new JmsConnectionFactory(null, null, brokerUrl); 
// 2、创建连接 conn = connectionFactory.createConnection(); 
conn.start(); 
// 一定要start 
// 3、创建会话(可以创建一个或者多个session) 
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); 
// 4、创建消息发送目标 (Topic or Queue) 
Destination destination = session.createQueue(destinationUrl);

........

支持 nio部分

 

 

这里就能看出 ActiveMQ是基于AMQP1.0版本实现的,而rabbitMQ是基于0.9版本实现的。

MQTT

MQTT Message Queuing Telemetry Transport )消息队列遥测传输是 IBM 开发的一个即时通讯协
议,已成为物联网通信的标准。
它是topic,没有队列的方式

 客户端使用:

  • 引入mqtt客户端jar
<dependency> 
    <groupId>org.fusesource.mqtt-client</groupId> 
    <artifactId>mqtt-client</artifactId> 
    <version>1.15</version> 
</dependency>
  • 编写客户端代码  这个代码比较底层化的代码
publisher:
public static void main(String[] args) throws Exception {
 MQTT mqtt = new MQTT(); mqtt.setHost("localhost", 1883); 
// mqtt.setUserName(user); 
// mqtt.setPassword(password);
 FutureConnection connection = mqtt.futureConnection(); 
connection.connect().await();
 UTF8Buffer topic = new UTF8Buffer("foo/blah/bar"); 
Buffer msg = new AsciiBuffer("mqtt message"); 
Future<?> f = connection.publish(topic, msg, QoS.AT_LEAST_ONCE, false);
 f.await(); 
connection.disconnect().await(); 
System.exit(0); 
}

服务端

 

AUTO

ActiveMQ 5.13.0 开始, ActiveMQ 开始支持协议格式检测,可以自动检测 OpenWire STOMP 、 AMQP和 MQTT 。允许这 4 种类型的客户端共享一个传输。

安全

ActiveMQ 默认并未开启安全访问控制。支持对 Queue Topic 的认证、鉴权。
https://activemq.apache.org/security

认证

简单认证

activemq.xml broker 中配置简单认证的插件及用户,在配置文件中定义
<plugins>
 <!-- Configure authentication; Username, passwords and groups -->         
     <simpleAuthenticationPlugin> 
            <users> 
                <authenticationUser username="system" password="manager" groups="users,admins"/> 
                <authenticationUser username="user" password="password" groups="users"/>        
                 <authenticationUser username="guest" password="password" groups="guests"/>     
            </users> 
     </simpleAuthenticationPlugin> 
</plugins>
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(username,password,url);
支持匿名访问
<simpleAuthenticationPlugin anonymousAccessAllowed="true">.....
JAAS 方式
 
  • activemq.xmlbroker中配置JAAS插件。
<plugins> 
<!--use JAAS to authenticate using the login.config file on the classpath to configure JAAS --> 
<jaasAuthenticationPlugin configuration="activemq" />
 </plugins>
confifiguration="activemq" 指定使用 login.confifig 中的 "activemq" 配置。
conf/login.confifig
activemq { 
  org.apache.activemq.jaas.PropertiesLoginModule required 
  org.apache.activemq.jaas.properties.user="users.properties" 
  org.apache.activemq.jaas.properties.group="groups.properties"; 
};
  • conf/users.properties中配置用户  
#用户名=密码
 admin=admin
 user=password 
guest=guest
  • conf/groups.properties中配置用户组(角色)
#组名=用户1,用户2 组名自定义 
admins=admin 
users=admin,user 
guests=guest

权限控制

控制用户组对 Queue/Topic 的操作权限。
权限有: 包括 只读  只写 以及 admin的

这里都可以在配置中添加

 

 ActiveMQ高可用集群

Master Slave

主从方案  高可用集群
https://activemq.apache.org/masterslave

 ActiveMQ提供了主从集群机制来实现高可用。  

主和从通过分布式锁进行选择的,存在公用的数据实现的,而且这个中间数据不能被杀死

  • 多个Broker实例共享存储
  • 多个Broker通过抢独占锁来成为Master
  • Master节点对外提供服务,Slave节点暂停等待独占锁
  • Master节点故障,非持久化消息将丢失,所以一般要用持久化消息。
  • 客户端以多Broker故障恢复方式进行连接
failover:(tcp://broker1:61616,tcp://broker2:61616,tcp://broker3:61616)
Broke1 Master 故障

Broke1 重启

 

ActiveMQ 中支持如下两种 Master Slave 实现方式:
  • Shared File System Master Slave 独占锁 是共享存储目录下的lock文件
https://activemq.apache.org/shared-fifile-system-master-slave
  • JDBC Master Slave 独占锁为 activemq_lock表记录
https://activemq.apache.org/jdbc-master-slave

 

之前说过在日志记录中 KahaDB 目录下面有个lock这个lock就是分布式锁实现的。

大量高并发 场景下,这个集群还是不行的。没有负载均衡的。不适合的。

 分布式队列和主题

分布式队列  和主题就采用该方式 解决高并发的问题 

从1.1开始,ActiveMQ支持代理网络,这使我们能够支持跨代理网络的分布式队列和主题。这允许客户端连接到网络中的任何代理,并在必要时故障转移到另一个代理存在一个失败-从客户机的角度提供一个HA代理集群

原理说明:
  • 独立的Broker彼此相连;
  • 客户端采用Failover方式连接任意一个Broker
  • 客户端生产的消息发送到它连接的broker,并存储在该broker上;
  • 消费者客户端可以连接任意Broker来消费目标的消息。

存储/转发中的分布式队列

当我们在队列上发布消息时,它存储在出版商正在沟通。然后,如果该代理被配置为存储/转发到其他代理对于客户端,代理将把它发送给这些客户端中的一个(可以是节点或代理)取决于调度算法)。此分派算法将继续,直到消息被删除为止最终由客户机发送和使用。在任何时间点,在消息被消费之前,消息将只存在于一个代理的存储中。注意只有在其他代理上有消费者时,才会将消息分发到这些代理上。e、 g.如果A上的队列中有代理A、B、C和发布者。如果队列中有消费者在A和B上,队列的消息将分布在代理A和B上;一些消息将发送到B,一些消息将在A上使用,没有消息将发送到C。如果队列中有消费者从C开始,然后消息也将在那里流动。如果消费者停止,则不显示更多消息被派往C。

配置:
静态 IP 配置方式:
<broker brokerName="receiver" persistent="false" useJmx="false"> 
    <networkConnectors> 
    <!-- 配置要网络连接的其他Broker --> 
        <networkConnector uri="static: (tcp://host1:61616,tcp://host2:61616,tcp://..)"/>     
        </networkConnectors> 
    ...
</broker>
一些有用 的重试参数
uri="static:(tcp://host1:61616,tcp://host2:61616)?maxReconnectDelay=5000&useExponentialBackOff=false"

 multicast 动态发现方式:

<networkConnectors> 
    <networkConnector uri="multicast://default"/> 
</networkConnectors>

所有broker开启multicast

 

客户端连接:

failover:(tcp://broker1:61616,tcp://broker2:61616,tcp://broker3:61616)

 Broker网络连接的不足:缺乏高可用

Networks + Master-Slave

将两种就进行组合,才能达到高可用。

networkConnectors 配置:

 客户端连接:

failover:(tcp://broker1:61616,tcp://broker2:61616,tcp://broker3:61616,...)? randomize=true

以上是关于ActiveMQ中持久化协议 高可用集群的主要内容,如果未能解决你的问题,请参考以下文章

四ActiveMQ Zookeeper集群

ActiveMQ高可用集群方案

activemq+Zookeper高可用集群方案配置

ActiveMQ 高可用集群安装配置(ZooKeeper + LevelDB)

ActiveMQ_伪集群和主从高可用使用

activemq 高可用+集群