ActiveMq

Posted mingyuewu

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMq相关的知识,希望对你有一定的参考价值。

ActiveMq

一、MQ简介

消息队列种类有:Kafka RabbitMQ RocketMq ActiveMQ。ActiveMq是Apache下的一个开源消息队列。

RabbitMq消息堆积支持不好,erlang语言,性能一般
ActiveMq老牌MQ,性能不及后起之秀
RocketMq实时性好,文档多
Kafka兼容性,性能最好,延迟高,不适合在线业务(处理消息时先攒一些,再一起处理)


producer: 纯 Java 语言编写的 JMS 接口实现(比如 ActiveMQ 就是)。Java Message Service,java消息服务,是 Java 平台上有关面向 MOM 的技术规范,旨在通过提供标准的产生、发送、接收和处理消息的 API 简化企业应用的开发,类似于 JDBC 和关系型数据库通信方式的抽象。

二、参考文档

官网https://activemq.apache.org/
文档目录https://activemq.apache.org/components/classic/documentation
安装https://activemq.apache.org/getting-started
低版本下载地址https://repository.apache.org/content/repositories/snapshots/org/apache/activemq/apache-activemq/

三、软件安装路径和端口规划

端口ActiveMQ’s default port is 61616 监控网站端口:8161
家目录/usr/local/activemq
日志文件/usr/local/activemq/data/activemq.log
pid文件/usr/local/activemq/data/activemq.pid
配置文件/usr/local/activemq/conf/activemq.xml
消息默认持久化存储位置(日志存储)/usr/local/activemq/data/kahadb

四、安装步骤

1 下载安装包

安装源码包,编译的时候maven报错,不能解决依赖关系,一些jar包找不着,所以下载了二进制包,解压即可使用。

低版本:https://repository.apache.org/content/repositories/snapshots/org/apache/activemq/apache-activemq/

最新版本:https://activemq.apache.org/components/classic/download/

2 解压

tar zxvf activemq.x.x-src.tar.gz
cd [activemq_install_dir]

3 启动activemq

#run ActiveMQ as a foregroud process:
cd [activemq_install_dir]/bin
./activemq console
#run ActiveMQ as a daemon process:
cd [activemq_install_dir]/bin
./activemq start
## 其他命令
 ./activemq  status
 ./activemq  stop
 ./activemq  restart

4 验证运行情况

(1 打开管理网站

url http://127.0.0.1:8161/admin/
用户名,密码都是admin

(2 查看日志文件或控制台输出

tail  -200  /usr/local/activemq/data/XXXX.log
#如果运行成功,有如下字段
Apache ActiveMQ 5.11.1 (localhost, ID:ntbk11111-50816-1428933306116-0:1) started | org.apache.activemq.broker.BrokerService | main

(3 查看端口

netstat  -antp  | grep  61616

(4 ./activemq status

[root@node01 bin]# pwd
/usr/local/activemq/bin
[root@node01 bin]# ./activemq status
INFO: Loading '/usr/local/activemq//bin/env'
INFO: Using java '/usr/local/jdk1.8.0_231/bin/java'
ActiveMQ is running (pid '22261')

5 关闭activemq

cd [activemq_install_dir]/bin
./activemq stop

五、JMS概念理解

1 JMS ( java message service )java 消息服务

java和MQ通信的API叫JMS,java和数据库通信的API叫JDBC

2 JMS消息模型

(1)点对点模型(P2P) queue队列模型

(2)发布订阅模式(Publish/Subcribe),也叫Topic主题模型

包含三个角色:主题(Topic),发布者(Publisher),订阅者(Subscriber)。多个发布者将消息发送到topic,系统将这些消息投递到订阅此topic的订阅者。

先订阅主题,再来发送消息

3 JMS API图解

六、activemq消息持久化

参考视频;https://www.bilibili.com/video/BV1vJ41177j1?p=26&spm_id_from=pageDriver

(1 memory

消息存储基于内存的消息存储方式

(2 基于日志消息存储方式

KahaDB是ActiveMQ默认日志存储方式,它提供了容量的提升和恢复能力。

[root@node01 kahadb]# pwd
/usr/local/activemq/data/kahadb
[root@node01 kahadb]# ll
total 40
-rw-r--r-- 1 root root 33554432 Aug 10 17:29 db-1.log
-rw-r--r-- 1 root root    12288 Aug 11 13:21 db.data
-rw-r--r-- 1 root root    12304 Aug 11 13:21 db.redo
-rw-r--r-- 1 root root        8 Aug 10 17:29 lock

(3 基于JDBC的消息存储方

数据存储与数据库中(如mysql)



(4 配置 ActiveMq的持久化

在java代码中修改activemq持久化配置 application.yml
在服务器端修改activemq配置文件activemq.xml中的持久化配置器PersistenceAdopter

持久化到日志数据库中

持久化到JDBC数据库中

七、activemq配置文件

<!-- START SNIPPET: example -->
<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- Allows us to use system properties as variables in this configuration file -->
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.conf}/credentials.properties</value>
        </property>
    </bean>

   <!-- Allows accessing the server log -->
    <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
          lazy-init="false" scope="singleton"
          init-method="start" destroy-method="stop">
    </bean>

    <!--
        The <broker> element is used to configure the ActiveMQ broker.
        配置broker,name,dataDirectory
    -->
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" >
                    <!-- The constantPendingMessageLimitStrategy is used to prevent
                         slow topic consumers to block producers and affect other consumers
                         by limiting the number of messages that are retained
                         For more information, see:
                         http://activemq.apache.org/slow-consumer-handling.html
                    -->
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>


        <!--
            The managementContext is used to configure how ActiveMQ is exposed in
            JMX. By default, ActiveMQ uses the MBean server that is started by
            the JVM. For more information, see:

            http://activemq.apache.org/jmx.html
        -->
        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>

        <!--
            Configure message persistence for the broker. The default persistence
            mechanism is the KahaDB store (identified by the kahaDB tag).
		   配置消息持久化策略,内存、日志kahadb、JDBC数据库
            http://activemq.apache.org/persistence.html
        -->
        <persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb"/>
        </persistenceAdapter>


          <!-- 限制broker对系统资源的使用
            The systemUsage controls the maximum amount of space the broker will
            use before disabling caching and/or slowing down producers. For more information, see:
            http://activemq.apache.org/producer-flow-control.html
          -->
          <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage percentOfJvmHeap="70" />
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="100 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="50 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <!--
            The transport connectors expose ActiveMQ over a given protocol to
            clients and other brokers. For more information, see:

            http://activemq.apache.org/configuring-transports.html
        -->
        <transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>

        <!-- destroy the spring context on shutdown to stop jetty -->
        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
        </shutdownHooks>

    </broker>

    <!--
        Enable web consoles, REST and Ajax APIs and demos
        The web consoles requires by default login, you can disable this in the jetty.xml file

        Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
    -->
    <import resource="jetty.xml"/>

</beans>

以上是关于ActiveMq的主要内容,如果未能解决你的问题,请参考以下文章

wildfly 实践5 ---分布式服务中的JMS服务访问

ActiveMQ的安全机制使用及其源代码分析 [转]

ActiveMQ——activemq的使用java代码实例

ActiveMQ入门系列二:入门代码实例(点对点模式)

ActiveMQ入门案例-生产者代码实现

ActiveMQ queue 代码示例