activeMq持久化方案

Posted bilifuture

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了activeMq持久化方案相关的知识,希望对你有一定的参考价值。

ActiveMq消息持久化方案

非持久化消息存储

正常情况下,非持久化消息是存储在内存中的,持久化消息是存储在文件中的。能够存储的最大消息数据在${ActiveMQ_HOME}/conf/activemq.xml文件中的systemUsage节点SystemUsage配置设置了一些系统内存和硬盘容量

<systemUsage>  <systemUsage>  <memoryUsage>  //设置整个ActiveMQ节点的“可用内存限制”。这个值不能超过ActiveMQ本身设置的最大内存大小。其中的percentOfJvmHeap属性表示百分比。占用70%的堆内存 <memoryUsage percentOfJvmHeap="70" />  </memoryUsage>  <storeUsage>  //该标记设置整个ActiveMQ节点,用于存储“持久化消息”的“可用磁盘空间”。该子标记的limit属性必须要进行设置  <storeUsage limit="100 gb"/>  </storeUsage>  <tempUsage>  //一旦ActiveMQ服务节点存储的消息达到了memoryUsage的限制,非持久化消息就会被转储到 temp store区域, //虽然非持久化消息不进行持久化存储,但是ActiveMQ为了防止“数据洪峰”出现时非持久化消息大量堆积致使内存耗尽的情况出现, //还是会将非持久化消息写入到磁盘的临时区域——temp store。这个子标记就是为了设置这个temp store区域的“可用磁盘空间限制”  <tempUsage limit="50 gb"/>  </tempUsage>  </systemUsage> </systemUsage>

从上面的配置一个结论,当非持久化消息堆积到一定程度的时候,也就是内存超过指定的设置阀值时,ActiveMQ会将内存中的非持久化消息写入到临时文件,以便腾出内存。但是它和持久化消息的区别是,重启之后,持久化消息会从文件中恢复,非持久化的临时文件会直接删除


持久化消息存储

ActiveMQ的内核是Java编写的,也就是说如果服务端没有Java运行环境ActiveMQ是无法运行的。ActiveMQ启动时,启动脚本使用wrapper包装器来启动JVMJVM相关的配置信息在启动目录的“wrapper.conf”配置文件中。各位读者可以通过改变其中的配置项,设置JVM的初始内存大小和最大内存大小(当然还可以进行其他和JVM有关的设置,例如开启debug模式)。

activeMq持久化方案

消息持久性对于可靠消息传递来说是一种比较好的方法,即时发送者和接者不是同时在线或者消息中心在发送者发送消息后宕机了,在消息中心重启后仍然可以将消息发送出去。消息持久性的原理很简单,就是在发送消息出去后,消息中心首先将消息存储在本地文件、内存或者远程数据库,然后把消息发送给接受者,发送成功后再把消息从存储中删除,失败则继续尝试。

ActiveMQ支持的存储方案:KahaDB存储(默认存储方式)、JDBC存储、Memory存储、LevelDB存储、JDBC With ActiveMQ Journal。其中KahaDBLevelDB的工作原理基本类似,都采用内存+磁盘介质的方案:内存用于存放信息的位置索引,磁盘介质上存放消息内容。而关系型数据库的方案,ActiveMQ将完全通过JDBC对数据库进行操作完成消息的存储和修改。


KahaDb

同样在ActiveMq.xml中配置如下:

 <!-- Configure message persistence for the broker. The default persistence mechanism is the KahaDB store (identified by the kahaDB tag). For more information, see: http://activemq.apache.org/persistence.html --> <persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter>

KahaDB主要元素包括:一个内存Metadata Cache用来在内存中检索消息的存储位置、若干用于记录消息内容的Data log文件、一个在磁盘上检索消息存储位置的Metadata Store、还有一个用于在系统异常关闭后恢复Btree结构的redo文件。

activeMq持久化方案

data/kahadb这个目录下,会生成四个文件

db.data 它是消息的索引文件,本质上是B-TreeB树),使用B-Tree作为索引指向db-*.log里面存储的消息

db.redo 用来进行消息恢复

db-*.log 存储消息内容。新的数据以APPEND的方式追加到日志文件末尾。属于顺序写入,因此消息存储是比较快的。默认是32M,达到阀值会自动递增

lock文件锁,表示当前获得kahadb读写权限的broker

 

常见参数配置:

参数名称

默认值

描述

directory

activemq-data

消息文件和日志的存储目录

indexWriteBatchSize

1000

Metadata cache区域和Metadata store区域不同的索引数量达到这个值后,Metadata cache将会发起checkpoint同步

indexCacheSize

10000

内存中,索引的页大小。超过这个大小Metadata cache将会发起checkpoint同步

 

enableIndexWriteAsync

false

索引是否异步写到消息文件中,建议不要设置为true

journalMaxFileLength

32mb

一个消息文件的大小

enableJournalDiskSyncs

true

如果为true,保证使用同步写入的方式持久化消息到journal文件中

cleanupInterval

30000

清除(清除或归档)不再使用的db-*.log文件的时间周期(毫秒)。

checkpointInterval

5000

写入索引信息到metadata store中的时间周期(毫秒)

ignoreMissingJournalfiles

false

是否忽略丢失的journal文件。如果为false,当丢失了journal文件时,broker启动时会抛异常并关闭

checkForCorruptJournalFiles

false

检查消息文件是否损坏,true,检查发现损坏会尝试修复

checksumJournalFiles

false

产生一个checksum,以便能够检测journal文件是否损坏。

JDBC方式

配置如下:

<broker> <!-- 配置ActiveMQ连接到mysql服务 --> <!-- 记得去掉原来的KahaDB或者LevelDB的配置 --> <persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql_datasource" createTablesOnStartup="true"/> </persistenceAdapter> ......</broker> <!-- 就是spring的配置文件结构 --><bean id="mysql_datasource" class="org.apache.commons.dbcp.BasicDataSource"> <property name="driverClass" value="com.mysql.jdbc.Driver"/> <property name="jdbcUrl" value="jdbc:mysql://localhost:3306/activemqdb?relaxAutoCommit=true&amp;useUnicode=true&amp;characterEncoding=utf-8"/> <property name="user" value="root"/> <property name="password" value="123456"/> <property name="minPoolSize" value="10"/> <property name="maxPoolSize" value="30"/> <property name="initialPoolSize" value="10"/></bean>

添加Jar包依赖

在配置关系型数据库作为ActiveMQ的持久化存储方案时,要注意几个事项:

1配置信息建议放置在您的jetty.xml配置文件中,也可以放置在activemq.xml配置文件中。除此之外,还要记得需要使用到的相关jar文件放置到ActiveMQ安装路径下的./lib目录。例如使用mysql + dpcp的配置中,需要的jar包至少包括:mysql-jdbc驱动的jar包和dbcpjar包。

2jdbcPersistenceAdapter标签中,我们设置了createTablesOnStartup属性为true,这是为了在第一次启动ActiveMQ时,ActiveMQ服务节点会自动创建所需要的数据表。启动完成后,可以去掉这个属性,或者更改createTablesOnStartup属性为false

3在配置和测试的过程中,可能这样的问题:“java.lang.IllegalStateException: BeanFactory not initialized or already closed”这是因为您的操作系统的机器名中有“_”符号。更改机器名并且重启后,即可解决问题。

 

LevelDb存储

LevelDb是能够处理十亿级别规模Key-Value型数据持久性存储的C++ 程序库,由Google发起并开源。LevelDB只能由本操作系统的其他进程调用,所以它不具有网络性。如果您需要网络上的远程操作LevelDB,那么就要自行封装服务层。

配置如下:

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">  <persistenceAdapter> <levelDB directory="${activemq.data}/levelDB"/> </persistenceAdapter></broker>

LevelDB中的核心设计算法是跳跃表(Skip List),核心操作策略是对磁盘上的数据日志结构进行归并(LSM)。跳跃表实际上是二叉平衡树的一种变形结构,它通过将一个有序链表进行升维操作,从而减少每一层上需要遍历的数据数量,达到快速查找的目的。

 

Memory消息存储

内存消息存储主要是存储所有的持久化的消息在内存中。persistent=false,表示不设置持久化存储,直接存储到内存中,配置如下:

<broker xmlns="http://activemq.apache.org/schema/core" persistent="false" brokerName="localhost" dataDirectory="activemq-data">


JDBC with ActiveMQ Journal

这种方式克服了JDBC Store的不足,JDBC每次消息过来,都需要去写库和读库。ActiveMQ Journal,使用高速缓存写入技术,大大提高了性能。当消费者的消费速度能够及时跟上生产者消息的生产速度时,journal文件能够大大减少需要写入到DB中的消息。举个例子,生产者生产了1000条消息,这1000条消息会保存到journal文件,如果消费者的消费速度很快的情况下,在journal文件还没有同步到DB之前,消费者已经消费了90%的以上的消息,那么这个时候只需要同步剩余的10%的消息到DB。如果消费者的消费速度很慢,这个时候journal文件可以使消息以批量方式写到DB配置如下:

<!-- <persistenceAdapter>--><!-- &lt;!&ndash;<kahaDB directory="activemq-data/kahadb"/>&ndash;&gt;--><!-- &lt;!&ndash;<jdbcPersistenceAdapter dataSource="#mysql_datasource" createTablesOnStartup="true"/>&ndash;&gt;--><!-- &lt;!&ndash; <levelDB directory="activemq-data"/>&ndash;&gt;--><!-- </persistenceAdapter>--> <persistenceFactory> <journalPersistenceAdapterFactory dataSource="#mysql_datasource" createTablesOnStartup="true"/> </persistenceFactory>






以上是关于activeMq持久化方案的主要内容,如果未能解决你的问题,请参考以下文章

Activemq 是不是保证持久化?

activeMQ 持久化配置

ActiveMQ:批量发布消息,持久但不异步?

SpringBoot整合ActiveMQ开启持久化

ActiveMQ持久化消息的三种方式

ActiveMQ+ZooKeeper搭建高可用集群