LinuxWindows安装ActiveMQ
Posted 用生命研发技术
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了LinuxWindows安装ActiveMQ相关的知识,希望对你有一定的参考价值。
Linux、Windows安装ActiveMQ
一、下载
下载地址:https://activemq.apache.org/components/classic/download/
选择相应的系统版本
二、Linux安装
2.1、下载方式
- 下载tar.gz包到本地,然后上传到服务器
- 通过wget下载
安装路径:/opt/home/mq
## 下载
wget https://dlcdn.apache.org/activemq/5.16.3/apache-activemq-5.16.3-bin.tar.gz
## 复制到其它服务器
scp -r /opt/home/mq/apache-activemq-5.16.3-bin.tar.gz root@192.168.0.5:/opt/home/mq/
## 解压
tar -zxvf apache-activemq-5.16.3-bin.tar.gz
## 切换到apache-activemq-5.16.3/bin/目录下
cd apache-activemq-5.16.3/bin/
## 启动
./activemq start
## 停止
./activemq stop
## 重启
./activemq restart
## 查看状态
./activemq status
## 检查端口是否使用
netstat -lnp | grep 8161
netstat -lnp | grep 61616
2.2、测试
浏览器访问:http://192.168.0.3:8161/
发现无法访问此网站,防火墙放行端口也无法访问
2.3、配置
这是因为 Activemq 默认只能当前服务器访问,如果需要外部访问,需要修改jetty.xml文件
## 修改apache-activemq-5.16.3/conf/jetty.xml文件
vim /opt/home/mq/apache-activemq-5.16.3/conf/jetty.xml
## 找到id为jettyPort的bean 将其中的127.0.0.1 改为 0.0.0.0
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8161"/>
</bean>
## 重启ActiveMQ
./activemq restart
再通过浏览器访问:http://192.168.0.3:8161/
默认用户名/密码:admin/admin
三、Windows安装
下载解压后,进入bin\\win64下,双击activemq.bat,启动
访问:http://127.0.0.1:8161
选择Manage ActiveMQ broker(管理ActiveMQ代理),输入账号和密码登陆。
默认用户名/密码:admin/admin
四、ActiveMQ集群
集群分为两种方式:
-
伪集群:集群节点都搭在一台机器上
-
真集群:集群节点分布在多台机器上
4.1、为什么使用集群
-
实现高可用,以排除单点故障引起的服务中断。
-
实现负载均衡,以提升效率为更多的客户提供服务。
4.2、Broker Clusters 部署
Broker-Cluster的部署方式就可以解决负载均衡的问题。Broker-Cluster部署方式中,各个broker通过网络互相连接,并共享queue,保证消息同步。
各个broker进行消息同步使用的是NetworkConnection (网络连接器),主要用于配置各个broker之间的网络通讯方式,用于服务器传递信息。 分为静态连接器和动态连接器。
静态连接器
<networkConnectors>
<networkConnector uri="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)"/>
</networkConnectors>
动态连接器
<!-- 网络连接器 -->
<networkConnectors>
<networkConnector uri="multicast://default"/>
</networkConnectors>
<!-- 传输连接器 -->
<transprotConnectors>
<transprotConnector uri="tcp://localhost:0" discoveryUri="multicast://default" />
</transprotConnectors>
静态连接器过于局限,动态连接器可随意扩展服务器连接。
4.3、Master Slave 部署(主从)
通过部署多个broker实例,选举产生一个master和多个slave,master宕机后由slave接管服务来达到高可用性。Master-Slave的方式虽然能解决多服务热备的高可用问题,但无法解决负载均衡和分布式的问题。Broker Cluster的部署方式刚好可以解决负载均衡的问题。一般两者结合使用。
这里主要介绍2种配置方案
- Shared File System Master slave 模式
- JDBC Store Master Slave 模式
4.3.1、Share storage master slave(共享存储)
此模式中Master和Slave的数据是共享的(相当于共享同一个数据库),当master失效后,slave会自动接管服务,无需手动进行数据的copy与同步,因为master存储数据之后,这些数据在任何时候对slave都是可见的。
master与slave之间,通过共享文件的“排他锁”或者分布式排他锁(ZooKeeper)来决定Broker的状态与角色,获取锁权限的Broker作为master,其它的Broker则作为slave。如果master失效,它必将失去锁权限,那么其它的slave将通过锁竞争来选举新master,没有获取锁权限的Broker作为slave,并等待锁的释放(间歇性尝试获取锁)。
1、Shared File System Master Slave模式(只适合单台主机部署,不适合多台主机部署)
这种方式是最常用的模式,架构简单,可靠实用。我们只需要一个SAN文件系统即可。使用文件系统来共享数据文件,多个Broker共享同一个文件系统。
<!--
假设在本地启动两个broker,配置文件activemq.xml 里面的持久化文件目录都设置成同一个即可。
这里默认使用的kahaDB ,你也可以用levelDB,不推荐AMQDB
-->
<persistenceAdapter>
<kahaDB directory=/opt/home/mq/apache-activemq-5.16.3/data/shared_kahadb" />
</persistenceAdapter>
2、JDBC Store Master Slave模式(适合多台主机部署)
数据存储用的是数据库(mysql/Oracle等),相对于日志文件而言,JDBC Store通常认为是低效的。配置如下
<broker brokerName="broker-locahost-0">
...
<persistenceAdapter>
<!-- 这个是可以自动生成表结构的-->
<jdbcPersistenceAdapter dataSource="#mysql-ds" />
</persistenceAdapter>
</broker>
<!-- 配置jdbc数据库连接 -->
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://192.168.0.3:3306/activemq?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="root"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://123.57.80.91:3306/activemq?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="Java_521"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
下载 mysql-connector-java-5.1.47.jar
放到/opt/home/mq/apache-activemq-5.16.3/lib
目录下
新建activemq数据库,
-- 如果存在数据库删除
DROP DATABASE IF EXISTS `activemq`;
-- 创建数据库
CREATE DATABASE `activemq` CHARACTER SET 'utf8mb4' COLLATE 'utf8mb4_general_ci';
-- 使用activemq数据库
USE activemq;
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for activemq_acks
-- ----------------------------
DROP TABLE IF EXISTS `activemq_acks`;
CREATE TABLE `activemq_acks` (
`CONTAINER` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NOT NULL,
`SUB_DEST` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL,
`CLIENT_ID` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NOT NULL,
`SUB_NAME` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NOT NULL,
`SELECTOR` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL,
`LAST_ACKED_ID` bigint(20) NULL DEFAULT NULL,
`PRIORITY` bigint(20) NOT NULL DEFAULT 5,
`XID` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL,
PRIMARY KEY (`CONTAINER`, `CLIENT_ID`, `SUB_NAME`, `PRIORITY`) USING BTREE,
INDEX `ACTIVEMQ_ACKS_XIDX`(`XID`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = latin1 COLLATE = latin1_swedish_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of activemq_acks
-- ----------------------------
-- ----------------------------
-- Table structure for activemq_lock
-- ----------------------------
DROP TABLE IF EXISTS `activemq_lock`;
CREATE TABLE `activemq_lock` (
`ID` bigint(20) NOT NULL,
`TIME` bigint(20) NULL DEFAULT NULL,
`BROKER_NAME` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL,
PRIMARY KEY (`ID`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = latin1 COLLATE = latin1_swedish_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of activemq_lock
-- ----------------------------
INSERT INTO `activemq_lock` VALUES (1, NULL, NULL);
-- ----------------------------
-- Table structure for activemq_msgs
-- ----------------------------
DROP TABLE IF EXISTS `activemq_msgs`;
CREATE TABLE `activemq_msgs` (
`ID` bigint(20) NOT NULL,
`CONTAINER` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NOT NULL,
`MSGID_PROD` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL,
`MSGID_SEQ` bigint(20) NULL DEFAULT NULL,
`EXPIRATION` bigint(20) NULL DEFAULT NULL,
`MSG` blob NULL,
`PRIORITY` bigint(20) NULL DEFAULT NULL,
`XID` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL,
PRIMARY KEY (`ID`) USING BTREE,
INDEX `ACTIVEMQ_MSGS_MIDX`(`MSGID_PROD`, `MSGID_SEQ`) USING BTREE,
INDEX `ACTIVEMQ_MSGS_CIDX`(`CONTAINER`) USING BTREE,
INDEX `ACTIVEMQ_MSGS_EIDX`(`EXPIRATION`) USING BTREE,
INDEX `ACTIVEMQ_MSGS_PIDX`(`PRIORITY`) USING BTREE,
INDEX `ACTIVEMQ_MSGS_XIDX`(`XID`) USING BTREE,
INDEX `ACTIVEMQ_MSGS_IIDX`(`ID`, `XID`, `CONTAINER`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = latin1 COLLATE = latin1_swedish_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of activemq_msgs
-- ----------------------------
SET FOREIGN_KEY_CHECKS = 1;
数据表说明:
ACTIVEMQ_MSGS
-- 消息表,缺省表名ACTIVEMQ_MSGS,Queue和Topic都存在里面
-- ID:自增的数据库主键
-- CONTAINER:消息的Destination
-- MSGID_PROD:消息发送者的主键
-- MSG_SEQ:是发送消息的顺序,MSGID_PROD+MSG_SEQ可以组成JMS的MessageID
-- EXPIRATION:消息的过期时间,存储的是从1970-01-01到现在的毫秒数
-- MSG:消息本体的Java序列化对象的二进制数据
-- PRIORITY:优先级,从0-9,数值越大优先级越高
ACTIVEMQ_ACKS
-- 用于存储订阅关系,如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存
-- CONTAINER: 消息的Destination
-- SUB_DEST: 如果是使用Static集群, 这个字段会有集群其他系统的信息
-- CLIENT_ID : 每个订阅者都必须有一个唯一的客户端ID用以区分
-- SUB_NAME: 订阅者名称
-- SELECTOR: 选择器,可以选择只消费满足条件的消息, 条件可以用自定义属性实现, 可支持多属性AND和OR操作
-- LAST_ACKED_ID: 记录消费过的ID
-- ACTIVEMQ_ACKS表存储订阅的消息和最后一个持久订阅接收的消息ID
ACTIVEMQ_LOCK
-- 在集群环境中才有用, 只有一个Broker可以获得消息, 称为Master Broker, 其他的只能作为备份等待Master Broker不可用, 才可能成为下一个Master Broker, 这个表用于记录哪个Broker是当前的Master Broker.
启动activemq,如果启动报错,尝试将数据库编码改为latin1
或者ASCII
./activemq start
测试代码
public class Producer {
private static final String ACTIVEMQ_URL = "failover:(tcp://192.168.0.3:61616,tcp://192.168.0.4:61616,tcp://192.168.0.5:61616)?randomize=true";
private static final String ACTIVEMQ_QUEUE_NAME = "queue-test";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 10; i++) {
TextMessage textMessage = session.createTextMessage("***发送消息顺序: " + i + "***发送消息优先级:" + i);
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 5000L);//设置延迟时间
// producer.send(textMessage);
producer.send(queue, textMessage);
}
System.out.println("消息发送完成");
producer.close();
session.close();
connection.close();
}
}
public class Consumer {
private static final String ACTIVEMQ_URL = "failover:(tcp://192.168.0.3:61616,tcp://192.168.0.4:61616,tcp://192.168.0.5:61616)?randomize=true";
private static final String ACTIVEMQ_QUEUE_NAME = "queue-test";
public static void main(String[] args) throws JMSException, IOException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
TextMessage textMessage = (TextMessage) message;
String messageText = textMessage.getText();
System.out.println("***消费者接收消息: " + messageText);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
//保证控制台不灭
System.in.read();
}
}
4.3.2、Replicated LevelDB Store(使用ZooKeeper协调多个Broker)
基于复制的LevelDB Store模式是ActiveMQ 5.9以后新增的特性,这是ActiveMQ全力打造的HA存储引擎。 一般都使用这种方式。由于利用zk 进行配置管理,可以方便监控,同时配置也相对简单。
使用ZooKeeper(集群)注册所有的ActiveMQ Broker。只有其中的一个Broker可以对外提供服务(也就是Master节点),其他的Broker处于待机状态,被视为Slave。如果Master因故障而不能提供服务,则利用ZooKeeper的内部选举机制会从Slave中选举出一个Broker充当Master节点,继续对外提供服务。
由于基于ZooKeeper(通常ZooKeeper集群至少需要3个实例,才能保证ZooKeeper本身的高可用性),所以Broker最低需要3个。
警告
Warning
The LevelDB store has been deprecated and is no longer supported or recommended for use. The recommended store is KahaDB
意思是:
LevelDB存储已被弃用,不再支持或建议使用。推荐的使用KahaDB
参考:http://activemq.apache.org/replicated-leveldb-store.html
4.3.3、KahaDB
KahaDB 是一个基于文件的持久性数据库,它位于使用它的消息代理本地。它已针对快速持久性进行了优化。它是自ActiveMQ 5.4以来的默认存储机制
。KahaDB 使用更少的文件描述符并提供比其前身AMQ 消息存储更快的恢复。
配置
<broker brokerName="broker">
<persistenceAdapter>
<kahaDB directory="activemq-data" journalMaxFileLength="32mb"/>
</persistenceAdapter>
</broker>
KahaDB 属性
属性 | 默认 | 注释 |
---|---|---|
archiveCorruptedIndex | false | 如果true ,启动时发现的损坏索引将被存档(不删除)。 |
archiveDataLogs | false | 如果true , 会将消息数据日志移动到存档目录而不是将其删除。 |
checkForCorruptJournalFiles | false | 如果true ,将在启动时检查损坏的日志文件并尝试恢以上是关于LinuxWindows安装ActiveMQ的主要内容,如果未能解决你的问题,请参考以下文章 |