LinuxWindows安装ActiveMQ

Posted 用生命研发技术

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了LinuxWindows安装ActiveMQ相关的知识,希望对你有一定的参考价值。

一、下载

下载地址:https://activemq.apache.org/components/classic/download/

选择相应的系统版本

二、Linux安装

2.1、下载方式

  • 下载tar.gz包到本地,然后上传到服务器
  • 通过wget下载

安装路径:/opt/home/mq

## 下载
wget https://dlcdn.apache.org/activemq/5.16.3/apache-activemq-5.16.3-bin.tar.gz


## 复制到其它服务器
scp -r /opt/home/mq/apache-activemq-5.16.3-bin.tar.gz root@192.168.0.5:/opt/home/mq/

## 解压
tar -zxvf apache-activemq-5.16.3-bin.tar.gz

## 切换到apache-activemq-5.16.3/bin/目录下
cd apache-activemq-5.16.3/bin/

## 启动
./activemq start

## 停止
./activemq stop

## 重启
./activemq restart

## 查看状态
./activemq status


## 检查端口是否使用
netstat -lnp | grep 8161
netstat -lnp | grep 61616

2.2、测试

浏览器访问:http://192.168.0.3:8161/

发现无法访问此网站,防火墙放行端口也无法访问

2.3、配置

这是因为 Activemq 默认只能当前服务器访问,如果需要外部访问,需要修改jetty.xml文件

## 修改apache-activemq-5.16.3/conf/jetty.xml文件
vim /opt/home/mq/apache-activemq-5.16.3/conf/jetty.xml

## 找到id为jettyPort的bean 将其中的127.0.0.1 改为 0.0.0.0
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8161"/>
</bean>

## 重启ActiveMQ
./activemq restart

再通过浏览器访问:http://192.168.0.3:8161/
默认用户名/密码:admin/admin

三、Windows安装

下载解压后,进入bin\\win64下,双击activemq.bat,启动

访问:http://127.0.0.1:8161
选择Manage ActiveMQ broker(管理ActiveMQ代理),输入账号和密码登陆。
默认用户名/密码:admin/admin

四、ActiveMQ集群

集群分为两种方式:

  • 伪集群:集群节点都搭在一台机器上

  • 真集群:集群节点分布在多台机器上

4.1、为什么使用集群

  • 实现高可用,以排除单点故障引起的服务中断。

  • 实现负载均衡,以提升效率为更多的客户提供服务。

4.2、Broker Clusters 部署

Broker-Cluster的部署方式就可以解决负载均衡的问题。Broker-Cluster部署方式中,各个broker通过网络互相连接,并共享queue,保证消息同步。

各个broker进行消息同步使用的是NetworkConnection (网络连接器),主要用于配置各个broker之间的网络通讯方式,用于服务器传递信息。 分为静态连接器和动态连接器。
静态连接器

<networkConnectors>
    <networkConnector uri="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)"/> 
</networkConnectors>

动态连接器

<!-- 网络连接器 -->
<networkConnectors>
    <networkConnector uri="multicast://default"/> 
</networkConnectors>
<!-- 传输连接器 -->
<transprotConnectors>
    <transprotConnector uri="tcp://localhost:0" discoveryUri="multicast://default" />
</transprotConnectors>

静态连接器过于局限,动态连接器可随意扩展服务器连接。

4.3、Master Slave 部署(主从)

通过部署多个broker实例,选举产生一个master和多个slave,master宕机后由slave接管服务来达到高可用性。Master-Slave的方式虽然能解决多服务热备的高可用问题,但无法解决负载均衡和分布式的问题。Broker Cluster的部署方式刚好可以解决负载均衡的问题。一般两者结合使用。

这里主要介绍2种配置方案

  • Shared File System Master slave 模式
  • JDBC Store Master Slave 模式

4.3.1、Share storage master slave(共享存储)

此模式中Master和Slave的数据是共享的(相当于共享同一个数据库),当master失效后,slave会自动接管服务,无需手动进行数据的copy与同步,因为master存储数据之后,这些数据在任何时候对slave都是可见的。
master与slave之间,通过共享文件的“排他锁”或者分布式排他锁(ZooKeeper)来决定Broker的状态与角色,获取锁权限的Broker作为master,其它的Broker则作为slave。如果master失效,它必将失去锁权限,那么其它的slave将通过锁竞争来选举新master,没有获取锁权限的Broker作为slave,并等待锁的释放(间歇性尝试获取锁)。

1、Shared File System Master Slave模式(只适合单台主机部署,不适合多台主机部署)

这种方式是最常用的模式,架构简单,可靠实用。我们只需要一个SAN文件系统即可。使用文件系统来共享数据文件,多个Broker共享同一个文件系统。

<!-- 
	假设在本地启动两个broker,配置文件activemq.xml 里面的持久化文件目录都设置成同一个即可。  
	这里默认使用的kahaDB ,你也可以用levelDB,不推荐AMQDB  
-->
<persistenceAdapter>  
     <kahaDB directory=/opt/home/mq/apache-activemq-5.16.3/data/shared_kahadb" />  
</persistenceAdapter>

2、JDBC Store Master Slave模式(适合多台主机部署)
数据存储用的是数据库(mysql/Oracle等),相对于日志文件而言,JDBC Store通常认为是低效的。配置如下

<broker brokerName="broker-locahost-0">  
	...
    <persistenceAdapter>
		 <!--  这个是可以自动生成表结构的-->
		<jdbcPersistenceAdapter dataSource="#mysql-ds" /> 
    </persistenceAdapter>
</broker>  
 <!-- 配置jdbc数据库连接 -->
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
	<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
	<property name="url" value="jdbc:mysql://192.168.0.3:3306/activemq?relaxAutoCommit=true"/>
	<property name="username" value="root"/>
	<property name="password" value="root"/>
	<property name="poolPreparedStatements" value="true"/>
</bean>

<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
	<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
	<property name="url" value="jdbc:mysql://123.57.80.91:3306/activemq?relaxAutoCommit=true"/>
	<property name="username" value="root"/>
	<property name="password" value="Java_521"/>
	<property name="poolPreparedStatements" value="true"/>
</bean>

下载 mysql-connector-java-5.1.47.jar放到/opt/home/mq/apache-activemq-5.16.3/lib目录下
新建activemq数据库,

-- 如果存在数据库删除
DROP DATABASE IF EXISTS `activemq`;
-- 创建数据库
CREATE DATABASE `activemq` CHARACTER SET 'utf8mb4' COLLATE 'utf8mb4_general_ci';

-- 使用activemq数据库
USE activemq;

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for activemq_acks
-- ----------------------------
DROP TABLE IF EXISTS `activemq_acks`;
CREATE TABLE `activemq_acks`  (
  `CONTAINER` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NOT NULL,
  `SUB_DEST` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL,
  `CLIENT_ID` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NOT NULL,
  `SUB_NAME` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NOT NULL,
  `SELECTOR` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL,
  `LAST_ACKED_ID` bigint(20) NULL DEFAULT NULL,
  `PRIORITY` bigint(20) NOT NULL DEFAULT 5,
  `XID` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL,
  PRIMARY KEY (`CONTAINER`, `CLIENT_ID`, `SUB_NAME`, `PRIORITY`) USING BTREE,
  INDEX `ACTIVEMQ_ACKS_XIDX`(`XID`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = latin1 COLLATE = latin1_swedish_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of activemq_acks
-- ----------------------------

-- ----------------------------
-- Table structure for activemq_lock
-- ----------------------------
DROP TABLE IF EXISTS `activemq_lock`;
CREATE TABLE `activemq_lock`  (
  `ID` bigint(20) NOT NULL,
  `TIME` bigint(20) NULL DEFAULT NULL,
  `BROKER_NAME` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL,
  PRIMARY KEY (`ID`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = latin1 COLLATE = latin1_swedish_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of activemq_lock
-- ----------------------------
INSERT INTO `activemq_lock` VALUES (1, NULL, NULL);

-- ----------------------------
-- Table structure for activemq_msgs
-- ----------------------------
DROP TABLE IF EXISTS `activemq_msgs`;
CREATE TABLE `activemq_msgs`  (
  `ID` bigint(20) NOT NULL,
  `CONTAINER` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NOT NULL,
  `MSGID_PROD` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL,
  `MSGID_SEQ` bigint(20) NULL DEFAULT NULL,
  `EXPIRATION` bigint(20) NULL DEFAULT NULL,
  `MSG` blob NULL,
  `PRIORITY` bigint(20) NULL DEFAULT NULL,
  `XID` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL,
  PRIMARY KEY (`ID`) USING BTREE,
  INDEX `ACTIVEMQ_MSGS_MIDX`(`MSGID_PROD`, `MSGID_SEQ`) USING BTREE,
  INDEX `ACTIVEMQ_MSGS_CIDX`(`CONTAINER`) USING BTREE,
  INDEX `ACTIVEMQ_MSGS_EIDX`(`EXPIRATION`) USING BTREE,
  INDEX `ACTIVEMQ_MSGS_PIDX`(`PRIORITY`) USING BTREE,
  INDEX `ACTIVEMQ_MSGS_XIDX`(`XID`) USING BTREE,
  INDEX `ACTIVEMQ_MSGS_IIDX`(`ID`, `XID`, `CONTAINER`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = latin1 COLLATE = latin1_swedish_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of activemq_msgs
-- ----------------------------

SET FOREIGN_KEY_CHECKS = 1;

数据表说明:

ACTIVEMQ_MSGS
-- 消息表,缺省表名ACTIVEMQ_MSGS,Queue和Topic都存在里面
-- ID:自增的数据库主键
-- CONTAINER:消息的Destination
-- MSGID_PROD:消息发送者的主键
-- MSG_SEQ:是发送消息的顺序,MSGID_PROD+MSG_SEQ可以组成JMS的MessageID
-- EXPIRATION:消息的过期时间,存储的是从1970-01-01到现在的毫秒数
-- MSG:消息本体的Java序列化对象的二进制数据
-- PRIORITY:优先级,从0-9,数值越大优先级越高

ACTIVEMQ_ACKS
-- 用于存储订阅关系,如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存
-- CONTAINER: 消息的Destination
-- SUB_DEST: 如果是使用Static集群, 这个字段会有集群其他系统的信息
-- CLIENT_ID : 每个订阅者都必须有一个唯一的客户端ID用以区分
-- SUB_NAME: 订阅者名称
-- SELECTOR: 选择器,可以选择只消费满足条件的消息, 条件可以用自定义属性实现, 可支持多属性AND和OR操作
-- LAST_ACKED_ID: 记录消费过的ID
-- ACTIVEMQ_ACKS表存储订阅的消息和最后一个持久订阅接收的消息ID

ACTIVEMQ_LOCK
-- 在集群环境中才有用, 只有一个Broker可以获得消息, 称为Master Broker, 其他的只能作为备份等待Master Broker不可用, 才可能成为下一个Master Broker, 这个表用于记录哪个Broker是当前的Master Broker.

启动activemq,如果启动报错,尝试将数据库编码改为latin1 或者ASCII

./activemq start

测试代码

public class Producer {
    private static final String ACTIVEMQ_URL = "failover:(tcp://192.168.0.3:61616,tcp://192.168.0.4:61616,tcp://192.168.0.5:61616)?randomize=true";
    private static final String ACTIVEMQ_QUEUE_NAME = "queue-test";

    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
        MessageProducer producer = session.createProducer(queue);
        for (int i = 0; i < 10; i++) {
            TextMessage textMessage = session.createTextMessage("***发送消息顺序: " + i + "***发送消息优先级:" + i);
            textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 5000L);//设置延迟时间
           // producer.send(textMessage);
            producer.send(queue, textMessage);
        }
        System.out.println("消息发送完成");
        producer.close();
        session.close();
        connection.close();
    }
}
public class Consumer {
    private static final String ACTIVEMQ_URL = "failover:(tcp://192.168.0.3:61616,tcp://192.168.0.4:61616,tcp://192.168.0.5:61616)?randomize=true";
    private static final String ACTIVEMQ_QUEUE_NAME = "queue-test";

    public static void main(String[] args) throws JMSException, IOException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
        MessageConsumer consumer = session.createConsumer(queue);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if (message instanceof TextMessage) {
                    try {
                        TextMessage textMessage = (TextMessage) message;
                        String messageText = textMessage.getText();
                        System.out.println("***消费者接收消息: " + messageText);
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }

                }
            }
        });

        //保证控制台不灭
        System.in.read();
    }
}

4.3.2、Replicated LevelDB Store(使用ZooKeeper协调多个Broker)

基于复制的LevelDB Store模式是ActiveMQ 5.9以后新增的特性,这是ActiveMQ全力打造的HA存储引擎。 一般都使用这种方式。由于利用zk 进行配置管理,可以方便监控,同时配置也相对简单。

使用ZooKeeper(集群)注册所有的ActiveMQ Broker。只有其中的一个Broker可以对外提供服务(也就是Master节点),其他的Broker处于待机状态,被视为Slave。如果Master因故障而不能提供服务,则利用ZooKeeper的内部选举机制会从Slave中选举出一个Broker充当Master节点,继续对外提供服务。

由于基于ZooKeeper(通常ZooKeeper集群至少需要3个实例,才能保证ZooKeeper本身的高可用性),所以Broker最低需要3个。
警告

Warning
The LevelDB store has been deprecated and is no longer supported or recommended for use. The recommended store is KahaDB
意思是:
LevelDB存储已被弃用,不再支持或建议使用。推荐的使用KahaDB
参考:http://activemq.apache.org/replicated-leveldb-store.html

4.3.3、KahaDB

KahaDB 是一个基于文件的持久性数据库,它位于使用它的消息代理本地。它已针对快速持久性进行了优化。它是自ActiveMQ 5.4以来的默认存储机制。KahaDB 使用更少的文件描述符并提供比其前身AMQ 消息存储更快的恢复。
配置

<broker brokerName="broker">
    <persistenceAdapter>
      <kahaDB directory="activemq-data" journalMaxFileLength="32mb"/>
    </persistenceAdapter>
 </broker>

KahaDB 属性

属性默认注释
archiveCorruptedIndexfalse如果true,启动时发现的损坏索引将被存档(不删除)。
archiveDataLogsfalse如果true, 会将消息数据日志移动到存档目录而不是将其删除。
checkForCorruptJournalFilesfalse如果true,将在启动时检查损坏的日志文件并尝试恢

以上是关于LinuxWindows安装ActiveMQ的主要内容,如果未能解决你的问题,请参考以下文章

linux与linuxwindows之间文件共享的几种方式

ActiveMQ入门:认识并安装ActiveMQ(Windows下)

ActiveMQ入门系列二:入门代码实例(点对点模式)

Apache ActiveMQ实战-基本安装配置与消息类型

LinuxWindows常见端口号大全

NodeJS学习指南

(c)2006-2024 SYSTEM All Rights Reserved IT常识