ActiveMQ

Posted 从入门小白到小黑

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMQ相关的知识,希望对你有一定的参考价值。

协议配置

ActiveMQ 支持的协议有 TCP 、 UDP、NIO、SSL、HTTP(S) 、VM 这是activemq 的activemq.xml 中配置文件设置协议的地方

<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumCon    nections=1000&amp;wireFormat.maxFrameSize=104857600"/>
         <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnect    ions=1000&amp;wireFormat.maxFrameSize=104857600"/>
         <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConn    ections=1000&amp;wireFormat.maxFrameSize=104857600"/>
         <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnect    ions=1000&amp;wireFormat.maxFrameSize=104857600"/>
         <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnection    s=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

默认是使用 openwire 也就是 tcp 连接
默认的Broker 配置,TCP 的Client 监听端口 61616 ,在网络上传输数据,必须序列化数据,消息是通过一个 write protocol 来序列化为字节流。默认情况 ActiveMQ 会把 wire protocol 叫做 Open Wire ,它的目的是促使网络上的效率和数据快速交互


  1. NIO 协议为ActiveMQ 提供更好的性能
    适合NIO 使用的场景:
    1 当有大量的Client 连接到Broker 上 , 使用NIO 比使用 tcp 需要更少的线程数量,所以使用 NIO
    2 可能对于 Broker 有一个很迟钝的网络传输, NIO 的性能高于 TCP
    连接形式:
    nio://hostname:port?key=value
https://activemq.apache.org/configuring-version-5-transports.html

修改 activemq.xml 使之支持 NIO 协议:


        <transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
           <transportConnector name="auto+nio+ssl" uri="auto+nio+ssl://0.0.0.0:5671?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>

ActiveMQ 的可持久化

将MQ 收到的消息存储到文件、硬盘、数据库 等、 则叫MQ 的持久化,这样即使服务器宕机,消息在本地还是有,仍就可以访问到。
官网 : http://activemq.apache.org/persistence
ActiveMQ 支持的消息持久化机制: 带赋值功能的 LeavelDB 、 KahaDB 、 AMQ 、 JDBC
持久化就是高可用的机制,即使服务器宕机了,消息也不会丢失

AMQ 是文件存储形式,写入快、易恢复 默认 32M 在 ActiveMQ 5.3 之后不再适用
KahaDB : 5.4 之后基于日志文件的持久化插件,默认持久化插件,提高了性能和恢复能力
KahaDB 的属性配置 : http://activemq.apache.org/kahadb
它使用一个事务日志和 索引文件来存储所有的地址

db-<数字>.log 存储数据,一个存满会再次创建 db-2 db-3 …… ,当不会有引用到数据文件的内容时,文件会被删除或归档
db.data 是一个BTree 索引,索引了消息数据记录的消息,是消息索引文件,它作为索引指向了 db-.log 里的消息
一点题外话:就像mysql 数据库,新建一张表,就有这个表对应的 .MYD 文件,作为它的数据文件,就有一个 .MYI 作为索引文件。
db.free 存储空闲页 ID 有时会被清除
db.redo 当 KahaDB 消息存储在强制退出后启动,用于恢复 BTree 索引
lock 顾名思义就是锁
四类文件+一把锁 ==》 KahaDB

LeavelDB : 希望作为以后的存储引擎,5.8 以后引进,也是基于文件的本地数据存储形式,但是比 KahaDB 更快
它比KahaDB 更快的原因是她不使用BTree 索引,而是使用本身自带的 LeavelDB 索引
题外话:为什么LeavelDB 更快,并且5.8 以后就支持,为什么还是默认 KahaDB 引擎,因为 activemq 官网本身没有定论,LeavelDB 之后又出了可复制的LeavelDB 比LeavelDB 更性能更优越,但需要基于 Zookeeper 所以这些官方还没有定论,任就使用 KahaDB

JDBC : 有一部分数据会真实的存储到数据库中
使用JDBC 的持久化,
①修改配置文件,默认 kahaDB
修改之前:

<persistenceAdapter>
       <kahaDB directory="$activemq.data/kahadb"/>  
 </persistenceAdapter>

修改之后:

        <persistenceAdapter>
            <jdbcPersistenceAdapter dataSource="#mysql-xql" createTablesOnStartup="false"/>
        </persistenceAdapter>


         <bean id="mysql-xql" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
                <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
                <property name="url" value="jdbc:mysql://xxxxxxx:3307/activemq?relaxAutoCommit=true&amp;useSSL=false&amp;serverTimezone=GMT"/>
                <property name="username" value="root"/>
                <property name="password" value="xxxx"/>
                <property name="poolPreparedStatements" value="true"/>
        </bean>

④  让linux 上activemq 可以访问到 mysql ,之后产生消息。
    ActiveMQ 启动后会自动在 mysql 的activemq 数据库下创建三张表:activemq_msgs 、activemq_acks、activemq_lock   
activemq_acks:用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存
activemq_lock:在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker
activemq_msgs:用于存储消息,Queue和Topic都存储在这个表中
  
 点对点会在数据库的数据表 ACTIVEMQ_MSGS 中加入消息的数据,且在点对点时,消息被消费就会从数据库中删除  
   但是对于主题,订阅方式接受到的消息,会在 ACTIVEMQ_MSGS 存储消息,即使MQ 服务器下线,并在 ACTIVEMQ_ACKS 中存储消费者信息 。 并且存储以 activemq 为主,当activemq 中的消息被删除后,数据库中的也会自动被删除。  


持久化消息是指:
MQ 所在的服务器down 了消息也不会丢失
持久化机制演化过程:
从最初的AMQ Message Store 方案到 ActiveMQ V4版本推出的High performance journal (高性能事务)附件,并且同步推出了关系型数据库的存储方案, ActiveMQ 5.3 版本有推出了KahaDB 的支持,(也是5.4之后的默认持久化方案),后来ActiveMQ 从5.8开始支持LevelDB ,现在5.9 提供了 Zookeeper + LevelDB 的集群化方案。
ActiveMQ 消息持久化机制有:
AMQ 基于日志文件
KahaDB 基于日志文件,5.4 之后的默认持久化
JDBC 基于第三方数据库
LevelDB : 基于文件的本地数据库存储,从5.8 之后推出了LevelDB 性能高于 KahaDB
ReplicatedLevelDB Store 从5.8之后提供了基于LevelDB 和Zookeeper 的数据复制方式,用于Master-slave方式的首数据复制选方案

但是无论使用哪种持久化方式,消息的存储逻辑都一样

小结

  • 1> 引入消息队列后 如何保证高可用性
    持久化、事务、签收、 以及带复制的 Leavel DB + zookeeper 主从集群搭建
  • 2> 异步投递 Async send
    对于一个慢消费者,使用同步有可能造成堵塞,消息消费较慢时适合用异步发送消息
    activemq 支持同步异步 发送的消息,默认异步。当你设定同步发送的方式和 未使用事务的情况下发持久化消息,这时是同步的。
    如果没有使用事务,且发送的是持久化消息,每次发送都会阻塞一个生产者直到 broker 发回一个确认,这样做保证了消息的安全送达,但是会阻塞客户端,造成很大延时 。
    在高性能要求下,可以使用异步提高producer 的性能。但会消耗较多的client 端内存,也不能完全保证消息发送成功。在 useAsyncSend = true 情况下容忍消息丢失。

// 开启异步投递
activeMQConnectionFactory.setUseAsyncSend(true);

如何在投递快还可以保证消息不丢失 ?
异步发送消息丢失的情况场景是: UseAsyncSend 为 true 使用 producer(send)持续发送消息,消息不会阻塞,生产者会认为所有的 send 消息均会被发送到 MQ ,如果MQ 突然宕机,此时生产者端尚未同步到 MQ 的消息均会丢失 。
故 正确的异步发送方法需要接收回调
同步发送和异步发送的区别就在于 :
同步发送send 不阻塞就代表消息发送成功
异步发送需要接收回执并又客户端在判断一次是否发送

在代码中接收回调的函数 :

activeMQConnectionFactory.setUseAsyncSend(true);
    ……  
    
 for (int i = 1; i < 4 ; i++) 
         textMessage = session.createTextMessage("msg--" + i);
      textMessage.setJMSMessageID(UUID.randomUUID().toString()+"--  orderr");
     String msgid = textMessage.getJMSMessageID();
            messageProducer.send(textMessage, new AsyncCallback() 
                @Override
                public void onSuccess() 
                    // 发送成功怎么样
                    System.out.println(msgid+"has been successful send ");
                

                @Override
                public void onException(JMSException e) 
                    // 发送失败怎么样
                    System.out.println(msgid+" has been failure send ");
                
            );
    

long delay = 3 * 1000 ;
long perid = 4 * 1000 ;
int repeat = 7 ;
for (int i = 1; i < 4 ; i++) 
    TextMessage textMessage = session.createTextMessage("delay msg--" + i);
    // 消息每过 3 秒投递,每 4 秒重复投递一次 ,一共重复投递 7 次
    textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,delay);
    textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD,perid);
    textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT,repeat);

    messageProducer.send(textMessage);




ActiveMQ知识概括

ActiveMQ知识概括

ActiveMQ简介

ActiveMQ安装:

  • 安装步骤:
    ①去ActiveMQ官网下载压缩包。
    ②解压压缩包到指定目录。
    ③启动ActiveMQ:service activemq start
    ④查看activemq状态:service activemq status
    ⑤关闭activemq服务:service activemq stop
  • 启动时指定日志输出文件:
    ①activemq日志默认的位置是在:%activemq安装目录%/data/activemq.log
    ②这是我们启动时指定日志输出文件:service activemq start > /usr/local/raohao/activemq.log
  • 查看程序启动是否成功的3种方式(通用):
    ①ps -ef | grep activemq
    ②netstat -anp | grep 61616
    ③lsof -i: 61616

ActiveMQ控制台:

  • 访问activemq管理页面地址:http://IP地址:8161/。默认的用户名和密码是admin/admin。
  • 备注:
    ①ActiveMQ采用61616端口提供JMS服务。
    ②ActiveMQ采用8161端口提供管理控制台服务。
  • 默认程序连接activemq(JMS服务)是不需要密码的,为了安装起见,一般都会设置密码,提高安全性。
  • ActiveMQ控制台之队列:
    ①Number Of Pending Messages:等待消费的消息,这个是未出队列的数量,公式=总接收数-总出队列数。
    ②Number Of Consumers:消费者数量,消费者端的消费者数量。
    ③Messages Enqueued:进队消息数,进队列的总消息量,包括出队列的。这个数只增不减。
    ④Messages Dequeued:出队消息数,可以理解为是消费者消费掉的数量。
  • ActiveMQ控制台之主题:
  • ActiveMQ控制台之订阅者:

Java实现ActiveMQ

pom.xml导入依赖:

<!--  activemq  所需要的jar 包-->
<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-all</artifactId>
  <version>5.15.9</version>
</dependency>
<!--  activemq 和 spring 整合的基础包 -->
<dependency>
  <groupId>org.apache.xbean</groupId>
  <artifactId>xbean-spring</artifactId>
  <version>3.16</version>
</dependency>

JMS编码总体规范:

  • 架构:
  • JMS开发的基本步骤:
    ①创建一个connection factory
    ②通过connection factory来创建JMS connection
    ③启动JMS connection
    ④通过connection创建JMS session
    ⑤创建JMS destination
    ⑥创建JMS producer或者创建JMS message并设置destination
    ⑦创建JMS consumer或者是注册一个JMS message listener
    ⑧发送或者接受JMS message(s)
    ⑨关闭所有的JMS资源(connection, session, producer, consumer等)

Destination简介:

  • Destination是目的地。下面拿jvm和mq,做个对比。目的地,我们可以理解为是数据存储的地方。
  • Destination分为两种:队列和主题。
    在点对点的消息传递域中,目的地被称为队列(queue)
    在发布订阅消息传递域中,目的地被称为主题(topic)
    ③下图介绍:

队列消息(Queue)总结:

  • 两种消费方式:
    ①同步阻塞方式(receive):订阅者或接收者抵用MessageConsumer的receive()方法来接收消息,receive方法在能接收到消息之前(或超时之前)将一直阻塞。
    ②异步非阻塞方式(监听器onMessage()):订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器,当消息到达之后,系统会自动调用监听器MessageListener的onMessage(Message message)方法。
  • 队列的特点:
    ①每个消息只能有一个消费者,类似1对1的关系。好比个人快递自己领取自己的。
    ②消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,消费者都可以提取消息。好比我们的发送短信,发送者发送后不见得接收者会即收即看。
    ③消息被消费后队列中不会再存储,所以消费者不会消费到已经被消费掉的消息。
  • 消息消费情况:
    ①情况1:只启动消费者1。结果:消费者1会消费所有的数据。
    ②情况2:先启动消费者1,再启动消费者2。结果:消费者1消费所有的数据。消费者2不会消费到消息。
    ③情况3:生产者发布6条消息,在此之前已经启动了消费者1和消费者2。结果:消费者1和消费者2平摊了消息。各自消费3条消息。
    ④疑问:怎么去将消费者1和消费者2不平均分摊呢?而是按照各自的消费能力去消费。我觉得,现在activemq就是这样的机制。

主题消息(Topic)介绍:

  • 在发布订阅消息传递域中,目的地被称为主题(topic)
  • 发布/订阅消息传递域的特点如下:
    ①生产者将消息发布到topic中,每个消息可以有多个消费者,属于1:N的关系;
    ②生产者和消费者之间有时间上的相关性。订阅某一个主题的消费者只能消费自它订阅之后发布的消息。
    ③生产者生产时,topic不保存消息它是无状态的不落地,假如无人订阅就去生产,那就是一条废消息,所以,一般先启动消费者再启动生产者。
    ④默认情况下如上所述,但是JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。一句话,好比我们的微信公众号订阅

tpoic和queue对比:

比较项目Topic模式队列Queue模式队列
工作模式.“订阅-发布"模式,如果当前没有订阅者,消息将会被丢弃。如果有多个订阅者,那么这些订阅者都会收到消息“负载均衡"模式,如果当前没有消费者,消息也不会云弃;如果有多个消费者,那么—条消息也只会发送始其中一个消费者,并且要求消费者ack信息
有无状态无状态Queue数据默认会在mq服务器上以文件形式保存,比如Active MQ—般保存在SAMQ_HOME\\datakr-storeldata下面。也可以配置成DB存储。
传递完整性如果没有订阅者,消息会被丢弃消息不会云弃
处理效率由于消息要按照订阅者的数量进行复制,所以处理性能会随着订阅者的增加而明显降低,并且还要结合不同消息协议自身的性能差异由于—条消息只发送给—个消费者,所以就算消费者再多,性能也不会有明显降低。当然不同消息协议的具体性能也是有差异的

JMS规范与落地

JMS是什么:

  • JMS是Java消息服务
  • Java消息服务指的是两个应用程序之间进行异步通信的API,它为标准协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持Java应用程序开发。在JavaEE中,当两个应用程序使用JMS进行通信时,它们之间不是直接相连的,而是通过一个共同的消息收发服务组件关联起来以达到解耦/异步削峰的效果。

JMS的组成结构和特点:

消息头:

  • JMS的消息头有哪些属性:
    ①JMSDestination:消息目的地
    ②JMSDeliveryMode:消息持久化模式
    ③JMSExpiration:消息过期时间
    ④JMSPriority:消息的优先级
    ⑤JMSMessageID:消息的唯一标识符。后面我们会介绍如何解决幂等性。
  • 说明: 消息的生产者可以set这些属性,消息的消费者可以get这些属性。这些属性在send方法里面也可以设置。

消息体:

  • 封装具体的消息数据
  • 5种消息体格式:
    TextMessage——普通字符串消息,包含一个string
    MapMessage——一个Map类型的消息,key为string类型,而值为Java的基本类型
    ③BytesMessage——二进制数组消息,包含一个byte[]
    ④StreamMessage——Java数据流消息,用标准流操作来顺序的填充和读取。
    ⑤ObjectMessage——对象消息,包含一个可序列化的Java对象
  • 发送和接受的消息体类型必须一致对应

消息属性:

  • 如果需要除消息头字段之外的值,那么可以使用消息属性。他是识别/去重/重点标注等操作,非常有用的方法。
  • 他们是以属性名和属性值对的形式制定的。可以将属性是为消息头得扩展,属性指定一些消息头没有包括的附加信息,比如可以在属性里指定消息选择器。消息的属性就像可以分配给一条消息的附加消息头一样。它们允许开发者添加有关消息的不透明附加信息。它们还用于暴露消息选择器在消息过滤时使用的数据。
  • 下图是设置消息属性的API:set对应类型Property(String name,对应类型 value)

JMS的可靠性:

  • PERSISTENT:持久性
  • Transaction:事务
  • Acknowledge:签收

消息的持久化:

  • 什么是持久化消息?
    ①保证消息只被传送一次和成功使用一次。在持久性消息传送至目标时,消息服务将其放入持久性数据存储。如果消息服务由于某种原因导致失败,它可以恢复此消息并将此消息传送至相应的消费者。虽然这样增加了消息传送的开销,但却增加了可靠性。
    ②我的理解:在消息生产者将消息成功发送给MQ消息中间件之后。无论是出现任何问题,如:MQ服务器宕机、消费者掉线等。都保证(topic要之前注册过,queue不用)消息消费者,能够成功消费消息。如果消息生产者发送消息就失败了,那么消费者也不会消费到该消息。
  • 参数设置说明:
    ①非持久:非持久化:当服务器宕机,消息不存在。
    messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
    ②持久:持久化:当服务器宕机,消息依然存在。
    messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT)
    Queue默认是持久。
  • 持久的Queue:持久化消息这是队列的默认传递模式,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。
  • 持久的Topic:一定要先运行一次消费者,类似于像MQ注册,我订阅了这个主题。然后再运行主题生产者,无论消费着是否在线,都会接收到,在线的立即接收到,不在线的等下次上线把没接收到的接收。类似微信公众号订阅发布。

消息事务:

  • producer提交时的事务:
    ①false:只要执行send,就进入到队列中,关闭事务,那第2个签收参数的设置需要有效。
    ②true:先执行send再执行commit,消息才被真正提交到队列中,消息需要需要批量提交,需要缓冲处理。
  • consumer消费时的事务:
    ①false:activeMQ默认认为你执行了commit,消费了消息。
    ②true:只有执行了commit,activeMQ才认为你消费了消息,控制台的消费数才会上升。不执行commit的话,会重复消费消息!
  • 事务偏生产者/签收偏消费者!

消息签收:

  • 非事务:
    ①自动签收(Session.AUTO_ACKNOWLEDGE):该方式是默认的。该种方式,无需我们程序做任何操作,框架会帮我们自动签收收到的消息。
    ②手动签收(Session.CLIENT_ACKNOWLEDGE):手动签收。该种方式,需要我们手动调用Message.acknowledge(),来签收消息。如果不签收消息,该消息会被我们反复消费,只到被签收。
    ③允许重复消息(Session.DUPS_OK_ACKNOWLEDGE):多线程或多个消费者同时消费到一个消息,因为线程不安全,可能会重复消费。该种方式很少使用到。
    ④事务下的签收(Session.SESSION_TRANSACTED):开始事务的情况下,可以使用该方式。该种方式很少使用到。
  • 事务:
    ①由于消费者开启了事务,没有提交事务(就算手动签收也没用),服务器认为,消费者没有收到消息。
    ②生产事务开启,只有commit后才能将全部消息变为已消费。
  • 签收和事务的关系:
    在事务性会话中,当一个事务被成功提交则消息被自动签收。如果事务回滚,则消息会被再次传送。事务优先于签收,开始事务后,签收机制不再起任何作用。
    非事务性会话中,消息何时被确认取决于创建会话时的应答模式。
    ③消费者事务开启,只有commit后才能将全部消息变为已消费。
    ④事务偏向生产者,签收偏向消费者。也就是说生产者使用事务更好点,消费者使用签收机制更好点。

JMS的点对点总结:

  • 点对点模型是基于队列的,生产者发送消息到队列,消费者从队列接收消息,队列的存在使得消息的异步传输成为可能。和我们平时给朋友发送短信类似。
    ①如果在Session关闭时有部分消息被收到但还没有被签收(acknowledge),那当消费者下次连接到相同的队列时,这些消息还会被再次接收。
    ②队列可以长久的保存消息直到消费者收到消息。消费者不需要因为担心消息会丢失而时刻和队列保持激活的链接状态,充分体现了异步传输模式的优势

JMS的发布订阅总结:

  • 非持久订阅:
    ①非持久订阅只有当客户端处于激活状态,也就是和MQ保持连接状态才能收发到某个主题的消息。如果消费者处于离线状态,生产者发送的主题消息将会丢失作废,消费者永远不会收到。一句话:先订阅注册才能接受到发布,只给订阅者发布消息。
  • 持久订阅:
    ①客户端首先向MQ注册一个自己的身份ID识别号,当这个客户端处于离线时,生产者会为这个ID保存所有发送到主题的消息,当客户再次连接到MQ的时候,会根据消费者的ID得到所有当自己处于离线时发送到主题的消息当持久订阅状态下,不能恢复或重新派送一个未签收的消息。持久订阅才能恢复或重新派送一个未签收的消息。
  • 用哪个?
    ①当所有的消息必须被接收,则用持久订阅。当消息丢失能够被容忍,则用非持久订阅。

ActiveMQ的broker

简介:

  • 相当于一个ActiveMQ服务器实例说白了,Broker其实就是实现了用代码的形式启动ActiveMQ将MQ嵌入到Java代码中,以便随时用随时启动,在用的时候再去启动这样能节省了资源,也保证了可用性。

嵌入式Broker:

  • POM.XML:
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.11</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.10.1</version>
</dependency>
<dependency>
    <groupId>org.apache.xbean</groupId>
    <artifactId>xbean-spring</artifactId>
    <version>4.15</version>
</dependency>                          
  • 主启动类:
import org.apache.activemq.broker.BrokerService;
public class EmbedBroker     
	public static void main(String[] args) throws Exception         
		//ActiveMQ也支持在vm中通信基于嵌入的broker        
		BrokerService brokerService = new BrokerService();        	
		brokerService.setPopulateJMSXUserID(true);        
		brokerService.addConnector("tcp://127.0.0.1:61616");        
		brokerService.start();    
	
 
  • 和Linux上的ActiveMQ是一样的,Broker相当于一个Mini版本的ActiveMQ

Spring,SpringBoot整合ActiveMQ

Spring整合ActiveMQ:

  • Maven修改,需要添加Spring支持JMS的包:
 <!-- activemq核心依赖包  -->
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.10.0</version>
    </dependency>
    <!--  嵌入式activemq的broker所需要的依赖包   -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.10.1</version>
    </dependency>
    <!-- activemq连接池 -->
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-pool</artifactId>
        <version>5.15.10</version>
    </dependency>
    <!-- spring支持jms的包 -->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jms</artifactId>
        <version>5.2.1.RELEASE</version>
    </dependency>
    <!--spring相关依赖包-->
    <dependency>
        <groupId>org.apache.xbean</groupId>
        <artifactId>xbean-spring</artifactId>
        <version>4.15</version>
    </dependency>
  • Spring配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd">

    <!--  开启包的自动扫描  -->
    <context:component-scan base-package="com.activemq.demo"/>
    <!--  配置生产者  -->
    <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
        <property name="connectionFactory">
            <!--      正真可以生产Connection的ConnectionFactory,由对应的JMS服务商提供      -->
            <bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
                <property name="brokerURL" value="tcp://192.168.10.130:61616"/>
            </bean>
        </property>
        <property name="maxConnections" value="100"/>
    </bean>

    <!--  这个是队列目的地,点对点的Queue  -->
    <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <!--    通过构造注入Queue名    -->
        <constructor-arg index="0" value="spring-active-queue"/>
    </bean>

    <!--  这个是队列目的地,  发布订阅的主题Topic-->
    <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg index="0" value="spring-active-topic"/>
    </bean>

    <!--  Spring提供的JMS工具类,他可以进行消息发送,接收等  -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!--    传入连接工厂    -->
        <property name="connectionFactory" ref="connectionFactory"/>
        <!--    传入目的地    -->
        <property name="defaultDestination" ref="destinationQueue"/>
        <!--    消息自动转换器    -->
        <property name="messageConverter">
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
        </property>
    </bean>
</beans>
  • 队列(Queue):
---------------生产者------------------
@Service
public class SpringMQ_Producer     
	private JmsTemplate jmsTemplate;    
	@Autowired    
	public void setJmsTemplate(JmsTemplate jmsTemplate)         
		this.jmsTemplate = jmsTemplate;    
	    
	public static void main(String[] args)         
		ApplicationContext applicationContext = new ClassPathXmlApplicationContext("Application.xml");        
		SpringMQ_Producer springMQ_producer = applicationContext.getBean(SpringMQ_Producer.class);        
		springMQ_producer.jmsTemplate.send(
			session -> session.createTextMessage("***Spring和ActiveMQ的整合case111....."));        
		System.out.println("********send task over");    
 

---------------消费者------------------
@Service
public class SpringMQ_Consumer     
	private JmsTemplate jmsTemplate;    
	@Autowired    
	public void setJmsTemplate(JmsTemplate jmsTemplate)         
		this.jmsTemplate = jmsTemplate;    
	    
	public static void main(String[] args)         
	ApplicationContext applicationContext = new ClassPathXmlApplicationContext("Application.xml");        
	SpringMQ_Consumer springMQ_consumer = applicationContext.getBean(SpringMQ_Consumer.class);        
	String returnValue = (String) springMQ_consumer.jmsTemplate.receiveAndConvert();        
	System.out.println("****消费者收到的消息:   " + returnValue);    
 
  • 主题(Topic):
---------------生产者------------------
@Service
public class SpringMQ_Topic_Producer     
	private JmsTemplate jmsTemplate;    
	public SpringMQ_Topic_Producer(JmsTemplate jmsTemplate)         
		this.jmsTemplate = jmsTemplate;    
	    
	public static void main(String[] args)         
	ApplicationContext applicationContext = new ClassPathXmlApplicationContext("Application.xml");        
	SpringMQ_Topic_Producer springMQ_topic_producer = applicationContext.getBean(SpringMQ_Topic_Producer.class);        
	//直接调用application.xml里面创建的destinationTopic这个bean设置为目的地就行了        
	springMQ_topic_producer.jmsTemplate.setDefaultDestination(((Destination) applicationContext.getBean("destinationTopic")));        
	springMQ_topic_producer.jmsTemplate.send(
		session -> session.createTextMessage("***Spring和ActiveMQ的整合TopicCase111.....")
	);    
	
 

---------------消费者------------------
@Service
public class SpringMQ_Topic_Consumer     
	private JmsTemplate jmsTemplate;    
	public SpringMQ_Topic_Consumer(JmsTemplate jmsTemplate)         
		this.jmsTemplate = jmsTemplate;    
	    
	public static void main(String[] args)         
		ApplicationContext applicationContext = new ClassPathXmlApplicationContext("Application.xml");        
		SpringMQ_Topic_Consumer springMQConsumer = applicationContext.getBean(SpringMQ_Topic_Consumer.class);        
		//直接调用application.xml里面创建的destinationTopic这个bean设置为目的地就行了        
		springMQConsumer.jmsTemplate.setDefaultDestination(((Destination) 
		applicationContext.getBean("destinationTopic")));        
		String returnValue = (String) springMQConsumer.jmsTemplate.receiveAndConvert();        
		System.out.println("****消费者收到的消息:   " + returnValue);    
	
 
  • 在Spring里面实现消费者不启动,直接通过配置监听完成:
<!--/配置监听程序-->
<bean id="jmscontainer" class="org.springframework.jms.1listener.DefaultlessageListenerContainer">
	<property name="connectionFactory" ref="jmsFactory" />
	<property name="destination" ref="destinationTopic" />
	<!-- public class MyMessageListener implements MessageListener-->
	<property name="messageListener" ref="myMessageListener" />
</bean>
//实现MessageListener的类,需要把这个类交给xml配置里面的DefaultMessageListenerContainer管理 
@Component
public class MyMessageListener implements MessageListener     
	@Override    
	public void onMessage(Message message)         
		if (message instanceof TextMessage)             
			TextMessage textMessage = (TextMessage) message;            
			try                 
				System.out.println("消费者收到的消息" + textMessage.getText());            
			 catch (JMSException e)                 
				e.printStackTrace();            
			        
		    
	
 

SpringBoot整合ActiveMQ:

  • POM文件:
<!--spring boot整合activemq的jar包-->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-activemq</artifactId>
   <version>2.1.5.RELEASE</version>
</dependency>
  • YML文件:
# web占用的端口
server:
  port: 7777

spring:
  activemq:
    # activemq的broker的url
    broker-url: tcp://192.168.17.3:61616
    # 连接activemq的broker所需的账号和密码
    user: admin
    password: admin
  jms:
    # 目的地是queue还是topic, false(默认) = queue    true =  topic
    pub-sub-domain: false

# 自定义队列名称。这只是个常量
myQueueName: springboot-activemq-queue 
# 自定义主题名称。这只是个常量
myTopicName: springboot-activemq-topic 
  • 配置bean:
@Component
@EnableJms 
//开启Springboot的Jms
public class ConfigBean     
	@Value("myQueueName")    
	private String myQueueName;    
	@Bean    
	public ActiveMQQueue queue()         
		//创建一个ActiveMQQueue        
		return new ActiveMQQueue(myQueueName);    
	
	
	@Value("$myTopicName")    
	private String topicName;    
	@Bean    
	public ActiveMQTopic activeMQTopic()    
		//创建一个ActiveMQTopic
		return new ActiveMQTopic(topicName);    
	
 
  • 队列(queue):
-------------生产者-------------
@Component
public class Queue_Produce 
    // JMS模板
    @Autowired
    private JmsMessagingTemplate  jmsMessagingTemplate ;
    // 这个是我们配置的队列目的地
    @Autowired
    private Queue queue ;
    
    // 发送消息
    public void produceMessage()
        // 一参是目的地,二参是消息的内容
        jmsMessagingTemplate.convertAndSend(queue,"****"+ UUID.randomUUID().toString().substring(0,6));
    
    
    // 定时任务。每3秒执行一次。非必须代码,仅为演示。
    @Scheduled(fixedDelay = 3000)
    public void produceMessageScheduled()
        produceMessage();
    

-------------消费者-------------
@Component
public class Queue_consummer 
    // 注册一个监听器。destination指定监听的主题。
    @JmsListener(destination = "$myqueue")
    public void receive(TextMessage textMessage) throws  Exception
        System.out.println(" ***  消费者收到消息  ***"+textMessage.getText());
    

  • 主题(topic):
-------------生产者-------------
@Component
public class Topic_Produce 
    @Autowired
    private JmsMessagingTemplate  jmsMessagingTemplate ;
    @Autowired
    private Topic  topic ;

    @Scheduled(fixedDelay = 3000)
    public void produceTopic()
        jmsMessagingTemplate.convertAndSend(topic,"主题消息"+ UUID.randomUUID().toString().substring(0,6));
    

-------------消费者-------------
@Component
public class Topic_Consummer 

    @JmsListener(destination = "$mytopic")
    public void receive(TextMessage textMessage) throws  Exception
        System.out.println("消费者受到订阅的主题:"+textMessage.getText());
    

  • 持久化订阅:
-------------配置Bean-------------
/** 
* 设置持久化订阅 
* 配置文件的方式无法进行配置持久化订阅。所以需要自己去生成一个持久化订阅 
*/
@Component
@EnableJms
public class ActiveMQConfigBean     
	@Value("$spring.activemq.broker-url")    
	private String brokerUrl;    
	
	@Value("$spring.activemq.user")    
	private String user;    
	
	@Value("$spring.activemq.password")    
	private String password;    
	
	public ConnectionFactory connectionFactory()        
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();        
		connectionFactory.setBrokerURL(brokerUrl);        
		connectionFactory.setUserName(user);        
		connectionFactory.setPassword(password);        
		return connectionFactory;    
	    
	
	@Bean(name = "jmsListenerContainerFactory")    
	public DefaultJmsListenerContainerFactory jmsListenerContainerFactory()         	
		DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory = 
		new DefaultJmsListenerContainerFactory();        
		defaultJmsListenerContainerFactory.setConnectionFactory(connectionFactory());        
		defaultJmsListenerContainerFactory.setSubscriptionDurable(true);        
		defaultJmsListenerContainerFactory.setClientId("我是持久订阅者一号");        
		return defaultJmsListenerContainerFactory;    
	
 
-------------消费者-------------
@Component
public class Topic_Consumer     
	//需要在监听方法指定连接工厂    
	@JmsListener(destination = "$myTopicName",
	containerFactory = "jmsListenerContainerFactory")    
	public void consumer(TextMessage textMessage) throws JMSException         
		System.out.println("订阅着收到消息:    " + textMessage.getText());    
	
 

SpringBoot整合ActiveMQ之Queue与Topoic并存:

  • application.properties中定义相关配置项:
spring.jms.pub-sub-domain=true
spring.activemq.broker-url=tcp://172.18.1.18:61616
#spring.activemq.user=按实际情况配置
#spring.activemq.password=按实际情况配置  
spring.activemq.in-memory=false
spring.activemq.pool.enabled=false
spring.activemq.pool.maxConnections=2
spring.activemq.pool.expiryTimeout=0
spring.activemq.pool.idleTimeout=30000
spring.activemq.packages.trust-all=true
  • 定义配置类:
@Configuration
@EnableJms
public class JmsConfiguration 
    // topic模式的ListenerContainer
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) 
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setPubSubDomain(true);
        bean.setConnectionFactory(activeMQConnectionFactory);
        return bean;
    
    
    // queue模式的ListenerContainer
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) 
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setConnectionFactory(activeMQConnectionFactory);
        return bean;
    

  • 定义监听器实现:
@Service
public class MQConsumerService 
    
    @JmsListener(destination = "portal.admin.topic",containerFactory = "jmsListenerContainerTopic") // 监听

以上是关于ActiveMQ的主要内容,如果未能解决你的问题,请参考以下文章

《图解HTTP》阅读笔记

Java网络编程案例---聊天室

Linux 系统的网络基础_all

TCP/IP协议

TCP/IP 协议

干货TCP/IP协议中需要必知必会的十大问题~