ActiveMQ知识概括

Posted GeorgeLin98

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMQ知识概括相关的知识,希望对你有一定的参考价值。

ActiveMQ知识概括

ActiveMQ简介

ActiveMQ安装:

  • 安装步骤:
    ①去ActiveMQ官网下载压缩包。
    ②解压压缩包到指定目录。
    ③启动ActiveMQ:service activemq start
    ④查看activemq状态:service activemq status
    ⑤关闭activemq服务:service activemq stop
  • 启动时指定日志输出文件:
    ①activemq日志默认的位置是在:%activemq安装目录%/data/activemq.log
    ②这是我们启动时指定日志输出文件:service activemq start > /usr/local/raohao/activemq.log
  • 查看程序启动是否成功的3种方式(通用):
    ①ps -ef | grep activemq
    ②netstat -anp | grep 61616
    ③lsof -i: 61616

ActiveMQ控制台:

  • 访问activemq管理页面地址:http://IP地址:8161/。默认的用户名和密码是admin/admin。
  • 备注:
    ①ActiveMQ采用61616端口提供JMS服务。
    ②ActiveMQ采用8161端口提供管理控制台服务。
  • 默认程序连接activemq(JMS服务)是不需要密码的,为了安装起见,一般都会设置密码,提高安全性。
  • ActiveMQ控制台之队列:
    ①Number Of Pending Messages:等待消费的消息,这个是未出队列的数量,公式=总接收数-总出队列数。
    ②Number Of Consumers:消费者数量,消费者端的消费者数量。
    ③Messages Enqueued:进队消息数,进队列的总消息量,包括出队列的。这个数只增不减。
    ④Messages Dequeued:出队消息数,可以理解为是消费者消费掉的数量。
  • ActiveMQ控制台之主题:
  • ActiveMQ控制台之订阅者:

Java实现ActiveMQ

pom.xml导入依赖:

<!--  activemq  所需要的jar 包-->
<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-all</artifactId>
  <version>5.15.9</version>
</dependency>
<!--  activemq 和 spring 整合的基础包 -->
<dependency>
  <groupId>org.apache.xbean</groupId>
  <artifactId>xbean-spring</artifactId>
  <version>3.16</version>
</dependency>

JMS编码总体规范:

  • 架构:
  • JMS开发的基本步骤:
    ①创建一个connection factory
    ②通过connection factory来创建JMS connection
    ③启动JMS connection
    ④通过connection创建JMS session
    ⑤创建JMS destination
    ⑥创建JMS producer或者创建JMS message并设置destination
    ⑦创建JMS consumer或者是注册一个JMS message listener
    ⑧发送或者接受JMS message(s)
    ⑨关闭所有的JMS资源(connection, session, producer, consumer等)

Destination简介:

  • Destination是目的地。下面拿jvm和mq,做个对比。目的地,我们可以理解为是数据存储的地方。
  • Destination分为两种:队列和主题。
    在点对点的消息传递域中,目的地被称为队列(queue)
    在发布订阅消息传递域中,目的地被称为主题(topic)
    ③下图介绍:

队列消息(Queue)总结:

  • 两种消费方式:
    ①同步阻塞方式(receive):订阅者或接收者抵用MessageConsumer的receive()方法来接收消息,receive方法在能接收到消息之前(或超时之前)将一直阻塞。
    ②异步非阻塞方式(监听器onMessage()):订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器,当消息到达之后,系统会自动调用监听器MessageListener的onMessage(Message message)方法。
  • 队列的特点:
    ①每个消息只能有一个消费者,类似1对1的关系。好比个人快递自己领取自己的。
    ②消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,消费者都可以提取消息。好比我们的发送短信,发送者发送后不见得接收者会即收即看。
    ③消息被消费后队列中不会再存储,所以消费者不会消费到已经被消费掉的消息。
  • 消息消费情况:
    ①情况1:只启动消费者1。结果:消费者1会消费所有的数据。
    ②情况2:先启动消费者1,再启动消费者2。结果:消费者1消费所有的数据。消费者2不会消费到消息。
    ③情况3:生产者发布6条消息,在此之前已经启动了消费者1和消费者2。结果:消费者1和消费者2平摊了消息。各自消费3条消息。
    ④疑问:怎么去将消费者1和消费者2不平均分摊呢?而是按照各自的消费能力去消费。我觉得,现在activemq就是这样的机制。

主题消息(Topic)介绍:

  • 在发布订阅消息传递域中,目的地被称为主题(topic)
  • 发布/订阅消息传递域的特点如下:
    ①生产者将消息发布到topic中,每个消息可以有多个消费者,属于1:N的关系;
    ②生产者和消费者之间有时间上的相关性。订阅某一个主题的消费者只能消费自它订阅之后发布的消息。
    ③生产者生产时,topic不保存消息它是无状态的不落地,假如无人订阅就去生产,那就是一条废消息,所以,一般先启动消费者再启动生产者。
    ④默认情况下如上所述,但是JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。一句话,好比我们的微信公众号订阅

tpoic和queue对比:

比较项目Topic模式队列Queue模式队列
工作模式.“订阅-发布"模式,如果当前没有订阅者,消息将会被丢弃。如果有多个订阅者,那么这些订阅者都会收到消息“负载均衡"模式,如果当前没有消费者,消息也不会云弃;如果有多个消费者,那么—条消息也只会发送始其中一个消费者,并且要求消费者ack信息
有无状态无状态Queue数据默认会在mq服务器上以文件形式保存,比如Active MQ—般保存在SAMQ_HOME\\datakr-storeldata下面。也可以配置成DB存储。
传递完整性如果没有订阅者,消息会被丢弃消息不会云弃
处理效率由于消息要按照订阅者的数量进行复制,所以处理性能会随着订阅者的增加而明显降低,并且还要结合不同消息协议自身的性能差异由于—条消息只发送给—个消费者,所以就算消费者再多,性能也不会有明显降低。当然不同消息协议的具体性能也是有差异的

JMS规范与落地

JMS是什么:

  • JMS是Java消息服务
  • Java消息服务指的是两个应用程序之间进行异步通信的API,它为标准协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持Java应用程序开发。在JavaEE中,当两个应用程序使用JMS进行通信时,它们之间不是直接相连的,而是通过一个共同的消息收发服务组件关联起来以达到解耦/异步削峰的效果。

JMS的组成结构和特点:

消息头:

  • JMS的消息头有哪些属性:
    ①JMSDestination:消息目的地
    ②JMSDeliveryMode:消息持久化模式
    ③JMSExpiration:消息过期时间
    ④JMSPriority:消息的优先级
    ⑤JMSMessageID:消息的唯一标识符。后面我们会介绍如何解决幂等性。
  • 说明: 消息的生产者可以set这些属性,消息的消费者可以get这些属性。这些属性在send方法里面也可以设置。

消息体:

  • 封装具体的消息数据
  • 5种消息体格式:
    TextMessage——普通字符串消息,包含一个string
    MapMessage——一个Map类型的消息,key为string类型,而值为Java的基本类型
    ③BytesMessage——二进制数组消息,包含一个byte[]
    ④StreamMessage——Java数据流消息,用标准流操作来顺序的填充和读取。
    ⑤ObjectMessage——对象消息,包含一个可序列化的Java对象
  • 发送和接受的消息体类型必须一致对应

消息属性:

  • 如果需要除消息头字段之外的值,那么可以使用消息属性。他是识别/去重/重点标注等操作,非常有用的方法。
  • 他们是以属性名和属性值对的形式制定的。可以将属性是为消息头得扩展,属性指定一些消息头没有包括的附加信息,比如可以在属性里指定消息选择器。消息的属性就像可以分配给一条消息的附加消息头一样。它们允许开发者添加有关消息的不透明附加信息。它们还用于暴露消息选择器在消息过滤时使用的数据。
  • 下图是设置消息属性的API:set对应类型Property(String name,对应类型 value)

JMS的可靠性:

  • PERSISTENT:持久性
  • Transaction:事务
  • Acknowledge:签收

消息的持久化:

  • 什么是持久化消息?
    ①保证消息只被传送一次和成功使用一次。在持久性消息传送至目标时,消息服务将其放入持久性数据存储。如果消息服务由于某种原因导致失败,它可以恢复此消息并将此消息传送至相应的消费者。虽然这样增加了消息传送的开销,但却增加了可靠性。
    ②我的理解:在消息生产者将消息成功发送给MQ消息中间件之后。无论是出现任何问题,如:MQ服务器宕机、消费者掉线等。都保证(topic要之前注册过,queue不用)消息消费者,能够成功消费消息。如果消息生产者发送消息就失败了,那么消费者也不会消费到该消息。
  • 参数设置说明:
    ①非持久:非持久化:当服务器宕机,消息不存在。
    messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
    ②持久:持久化:当服务器宕机,消息依然存在。
    messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT)
    Queue默认是持久。
  • 持久的Queue:持久化消息这是队列的默认传递模式,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。
  • 持久的Topic:一定要先运行一次消费者,类似于像MQ注册,我订阅了这个主题。然后再运行主题生产者,无论消费着是否在线,都会接收到,在线的立即接收到,不在线的等下次上线把没接收到的接收。类似微信公众号订阅发布。

消息事务:

  • producer提交时的事务:
    ①false:只要执行send,就进入到队列中,关闭事务,那第2个签收参数的设置需要有效。
    ②true:先执行send再执行commit,消息才被真正提交到队列中,消息需要需要批量提交,需要缓冲处理。
  • consumer消费时的事务:
    ①false:activeMQ默认认为你执行了commit,消费了消息。
    ②true:只有执行了commit,activeMQ才认为你消费了消息,控制台的消费数才会上升。不执行commit的话,会重复消费消息!
  • 事务偏生产者/签收偏消费者!

消息签收:

  • 非事务:
    ①自动签收(Session.AUTO_ACKNOWLEDGE):该方式是默认的。该种方式,无需我们程序做任何操作,框架会帮我们自动签收收到的消息。
    ②手动签收(Session.CLIENT_ACKNOWLEDGE):手动签收。该种方式,需要我们手动调用Message.acknowledge(),来签收消息。如果不签收消息,该消息会被我们反复消费,只到被签收。
    ③允许重复消息(Session.DUPS_OK_ACKNOWLEDGE):多线程或多个消费者同时消费到一个消息,因为线程不安全,可能会重复消费。该种方式很少使用到。
    ④事务下的签收(Session.SESSION_TRANSACTED):开始事务的情况下,可以使用该方式。该种方式很少使用到。
  • 事务:
    ①由于消费者开启了事务,没有提交事务(就算手动签收也没用),服务器认为,消费者没有收到消息。
    ②生产事务开启,只有commit后才能将全部消息变为已消费。
  • 签收和事务的关系:
    在事务性会话中,当一个事务被成功提交则消息被自动签收。如果事务回滚,则消息会被再次传送。事务优先于签收,开始事务后,签收机制不再起任何作用。
    非事务性会话中,消息何时被确认取决于创建会话时的应答模式。
    ③消费者事务开启,只有commit后才能将全部消息变为已消费。
    ④事务偏向生产者,签收偏向消费者。也就是说生产者使用事务更好点,消费者使用签收机制更好点。

JMS的点对点总结:

  • 点对点模型是基于队列的,生产者发送消息到队列,消费者从队列接收消息,队列的存在使得消息的异步传输成为可能。和我们平时给朋友发送短信类似。
    ①如果在Session关闭时有部分消息被收到但还没有被签收(acknowledge),那当消费者下次连接到相同的队列时,这些消息还会被再次接收。
    ②队列可以长久的保存消息直到消费者收到消息。消费者不需要因为担心消息会丢失而时刻和队列保持激活的链接状态,充分体现了异步传输模式的优势

JMS的发布订阅总结:

  • 非持久订阅:
    ①非持久订阅只有当客户端处于激活状态,也就是和MQ保持连接状态才能收发到某个主题的消息。如果消费者处于离线状态,生产者发送的主题消息将会丢失作废,消费者永远不会收到。一句话:先订阅注册才能接受到发布,只给订阅者发布消息。
  • 持久订阅:
    ①客户端首先向MQ注册一个自己的身份ID识别号,当这个客户端处于离线时,生产者会为这个ID保存所有发送到主题的消息,当客户再次连接到MQ的时候,会根据消费者的ID得到所有当自己处于离线时发送到主题的消息当持久订阅状态下,不能恢复或重新派送一个未签收的消息。持久订阅才能恢复或重新派送一个未签收的消息。
  • 用哪个?
    ①当所有的消息必须被接收,则用持久订阅。当消息丢失能够被容忍,则用非持久订阅。

ActiveMQ的broker

简介:

  • 相当于一个ActiveMQ服务器实例说白了,Broker其实就是实现了用代码的形式启动ActiveMQ将MQ嵌入到Java代码中,以便随时用随时启动,在用的时候再去启动这样能节省了资源,也保证了可用性。

嵌入式Broker:

  • POM.XML:
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.11</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.10.1</version>
</dependency>
<dependency>
    <groupId>org.apache.xbean</groupId>
    <artifactId>xbean-spring</artifactId>
    <version>4.15</version>
</dependency>                          
  • 主启动类:
import org.apache.activemq.broker.BrokerService;
public class EmbedBroker     
	public static void main(String[] args) throws Exception         
		//ActiveMQ也支持在vm中通信基于嵌入的broker        
		BrokerService brokerService = new BrokerService();        	
		brokerService.setPopulateJMSXUserID(true);        
		brokerService.addConnector("tcp://127.0.0.1:61616");        
		brokerService.start();    
	
 
  • 和Linux上的ActiveMQ是一样的,Broker相当于一个Mini版本的ActiveMQ

Spring,SpringBoot整合ActiveMQ

Spring整合ActiveMQ:

  • Maven修改,需要添加Spring支持JMS的包:
 <!-- activemq核心依赖包  -->
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.10.0</version>
    </dependency>
    <!--  嵌入式activemq的broker所需要的依赖包   -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.10.1</version>
    </dependency>
    <!-- activemq连接池 -->
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-pool</artifactId>
        <version>5.15.10</version>
    </dependency>
    <!-- spring支持jms的包 -->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jms</artifactId>
        <version>5.2.1.RELEASE</version>
    </dependency>
    <!--spring相关依赖包-->
    <dependency>
        <groupId>org.apache.xbean</groupId>
        <artifactId>xbean-spring</artifactId>
        <version>4.15</version>
    </dependency>
  • Spring配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd">

    <!--  开启包的自动扫描  -->
    <context:component-scan base-package="com.activemq.demo"/>
    <!--  配置生产者  -->
    <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
        <property name="connectionFactory">
            <!--      正真可以生产Connection的ConnectionFactory,由对应的JMS服务商提供      -->
            <bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
                <property name="brokerURL" value="tcp://192.168.10.130:61616"/>
            </bean>
        </property>
        <property name="maxConnections" value="100"/>
    </bean>

    <!--  这个是队列目的地,点对点的Queue  -->
    <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <!--    通过构造注入Queue名    -->
        <constructor-arg index="0" value="spring-active-queue"/>
    </bean>

    <!--  这个是队列目的地,  发布订阅的主题Topic-->
    <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg index="0" value="spring-active-topic"/>
    </bean>

    <!--  Spring提供的JMS工具类,他可以进行消息发送,接收等  -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!--    传入连接工厂    -->
        <property name="connectionFactory" ref="connectionFactory"/>
        <!--    传入目的地    -->
        <property name="defaultDestination" ref="destinationQueue"/>
        <!--    消息自动转换器    -->
        <property name="messageConverter">
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
        </property>
    </bean>
</beans>
  • 队列(Queue):
---------------生产者------------------
@Service
public class SpringMQ_Producer     
	private JmsTemplate jmsTemplate;    
	@Autowired    
	public void setJmsTemplate(JmsTemplate jmsTemplate)         
		this.jmsTemplate = jmsTemplate;    
	    
	public static void main(String[] args)         
		ApplicationContext applicationContext = new ClassPathXmlApplicationContext("Application.xml");        
		SpringMQ_Producer springMQ_producer = applicationContext.getBean(SpringMQ_Producer.class);        
		springMQ_producer.jmsTemplate.send(
			session -> session.createTextMessage("***Spring和ActiveMQ的整合case111....."));        
		System.out.println("********send task over");    
 

---------------消费者------------------
@Service
public class SpringMQ_Consumer     
	private JmsTemplate jmsTemplate;    
	@Autowired    
	public void setJmsTemplate(JmsTemplate jmsTemplate)         
		this.jmsTemplate = jmsTemplate;    
	    
	public static void main(String[] args)         
	ApplicationContext applicationContext = new ClassPathXmlApplicationContext("Application.xml");        
	SpringMQ_Consumer springMQ_consumer = applicationContext.getBean(SpringMQ_Consumer.class);        
	String returnValue = (String) springMQ_consumer.jmsTemplate.receiveAndConvert();        
	System.out.println("****消费者收到的消息:   " + returnValue);    
 
  • 主题(Topic):
---------------生产者------------------
@Service
public class SpringMQ_Topic_Producer     
	private JmsTemplate jmsTemplate;    
	public SpringMQ_Topic_Producer(JmsTemplate jmsTemplate)         
		this.jmsTemplate = jmsTemplate;    
	    
	public static void main(String[] args)         
	ApplicationContext applicationContext = new ClassPathXmlApplicationContext("Application.xml");        
	SpringMQ_Topic_Producer springMQ_topic_producer = applicationContext.getBean(SpringMQ_Topic_Producer.class);        
	//直接调用application.xml里面创建的destinationTopic这个bean设置为目的地就行了        
	springMQ_topic_producer.jmsTemplate.setDefaultDestination(((Destination) applicationContext.getBean("destinationTopic")));        
	springMQ_topic_producer.jmsTemplate.send(
		session -> session.createTextMessage("***Spring和ActiveMQ的整合TopicCase111.....")
	);    
	
 

---------------消费者------------------
@Service
public class SpringMQ_Topic_Consumer     
	private JmsTemplate jmsTemplate;    
	public SpringMQ_Topic_Consumer(JmsTemplate jmsTemplate)         
		this.jmsTemplate = jmsTemplate;    
	    
	public static void main(String[] args)         
		ApplicationContext applicationContext = new ClassPathXmlApplicationContext("Application.xml");        
		SpringMQ_Topic_Consumer springMQConsumer = applicationContext.getBean(SpringMQ_Topic_Consumer.class);        
		//直接调用application.xml里面创建的destinationTopic这个bean设置为目的地就行了        
		springMQConsumer.jmsTemplate.setDefaultDestination(((Destination) 
		applicationContext.getBean("destinationTopic")));        
		String returnValue = (String) springMQConsumer.jmsTemplate.receiveAndConvert();        
		System.out.println("****消费者收到的消息:   " + returnValue);    
	
 
  • 在Spring里面实现消费者不启动,直接通过配置监听完成:
<!--/配置监听程序-->
<bean id="jmscontainer" class="org.springframework.jms.1listener.DefaultlessageListenerContainer">
	<property name="connectionFactory" ref="jmsFactory" />
	<property name="destination" ref="destinationTopic" />
	<!-- public class MyMessageListener implements MessageListener-->
	<property name="messageListener" ref="myMessageListener" />
</bean>
//实现MessageListener的类,需要把这个类交给xml配置里面的DefaultMessageListenerContainer管理 
@Component
public class MyMessageListener implements MessageListener     
	@Override    
	public void onMessage(Message message)         
		if (message instanceof TextMessage)             
			TextMessage textMessage = (TextMessage) message;            
			try                 
				System.out.println("消费者收到的消息" + textMessage.getText());            
			 catch (JMSException e)                 
				e.printStackTrace();            
			        
		    
	
 

SpringBoot整合ActiveMQ:

  • POM文件:
<!--spring boot整合activemq的jar包-->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-activemq</artifactId>
   <version>2.1.5.RELEASE</version>
</dependency>
  • YML文件:
# web占用的端口
server:
  port: 7777

spring:
  activemq:
    # activemq的broker的url
    broker-url: tcp://192.168.17.3:61616
    # 连接activemq的broker所需的账号和密码
    user: admin
    password: admin
  jms:
    # 目的地是queue还是topic, false(默认) = queue    true =  topic
    pub-sub-domain: false

# 自定义队列名称。这只是个常量
myQueueName: springboot-activemq-queue 
# 自定义主题名称。这只是个常量
myTopicName: springboot-activemq-topic 
  • 配置bean:
@Component
@EnableJms 
//开启Springboot的Jms
public class ConfigBean     
	@Value("myQueueName")    
	private String myQueueName;    
	@Bean    
	public ActiveMQQueue queue()         
		//创建一个ActiveMQQueue        
		return new ActiveMQQueue(myQueueName);    
	
	
	@Value("$myTopicName")    
	private String topicName;    
	@Bean    
	public ActiveMQTopic activeMQTopic()    
		//创建一个ActiveMQTopic
		return new ActiveMQTopic(topicName);    
	
 
  • 队列(queue):
-------------生产者-------------
@Component
public class Queue_Produce 
    // JMS模板
    @Autowired
    private JmsMessagingTemplate  jmsMessagingTemplate ;
    // 这个是我们配置的队列目的地
    @Autowired
    private Queue queue ;
    
    // 发送消息
    public void produceMessage()
        // 一参是目的地,二参是消息的内容
        jmsMessagingTemplate.convertAndSend(queue,"****"+ UUID.randomUUID().toString().substring(0,6));
    
    
    // 定时任务。每3秒执行一次。非必须代码,仅为演示。
    @Scheduled(fixedDelay = 3000)
    public void produceMessageScheduled()
        produceMessage();
    

-------------消费者-------------
@Component
public class Queue_consummer 
    // 注册一个监听器。destination指定监听的主题。
    @JmsListener(destination = "$myqueue")
    public void receive(TextMessage textMessage) throws  Exception
        System.out.println(" ***  消费者收到消息  ***"+textMessage.getText());
    

  • 主题(topic):
-------------生产者-------------
@Component
public class Topic_Produce 
    @Autowired
    private JmsMessagingTemplate  jmsMessagingTemplate ;
    @Autowired
    private Topic  topic ;

    @Scheduled(fixedDelay = 3000)
    public void produceTopic()
        jmsMessagingTemplate.convertAndSend(topic,"主题消息"+ UUID.randomUUID().toString().substring(0,6));
    

-------------消费者-------------
@Component
public class Topic_Consummer 

    @JmsListener(destination = "$mytopic")
    public void receive(TextMessage textMessage) throws  Exception
        System.out.println("消费者受到订阅的主题:"+textMessage.getText());
    

  • 持久化订阅:
-------------配置Bean-------------
/** 
* 设置持久化订阅 
* 配置文件的方式无法进行配置持久化订阅。所以需要自己去生成一个持久化订阅 
*/
@Component
@EnableJms
public class ActiveMQConfigBean     
	@Value("$spring.activemq.broker-url")    
	private String brokerUrl;    
	
	@Value("$spring.activemq.user")    
	private String user;    
	
	@Value("$spring.activemq.password")    
	private String password;    
	
	public ConnectionFactory connectionFactory()        
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();        
		connectionFactory.setBrokerURL(brokerUrl);        
		connectionFactory.setUserName(user);        
		connectionFactory.setPassword(password);        
		return connectionFactory;    
	    
	
	@Bean(name = "jmsListenerContainerFactory")    
	public DefaultJmsListenerContainerFactory jmsListenerContainerFactory()         	
		DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory = 
		new DefaultJmsListenerContainerFactory();        
		defaultJmsListenerContainerFactory.setConnectionFactory(connectionFactory());        
		defaultJmsListenerContainerFactory.setSubscriptionDurable(true);        
		defaultJmsListenerContainerFactory.setClientId("我是持久订阅者一号");        
		return defaultJmsListenerContainerFactory;    
	
 
-------------消费者-------------
@Component
public class Topic_Consumer     
	//需要在监听方法指定连接工厂    
	@JmsListener(destination = "$myTopicName",
	containerFactory = "jmsListenerContainerFactory")    
	public void consumer(TextMessage textMessage) throws JMSException         
		System.out.println("订阅着收到消息:    " + textMessage.getText());    
	
 

SpringBoot整合ActiveMQ之Queue与Topoic并存:

  • application.properties中定义相关配置项:
spring.jms.pub-sub-domain=true
spring.activemq.broker-url=tcp://172.18.1.18:61616
#spring.activemq.user=按实际情况配置
#spring.activemq.password=按实际情况配置  
spring.activemq.in-memory=false
spring.activemq.pool.enabled=false
spring.activemq.pool.maxConnections=2
spring.activemq.pool.expiryTimeout=0
spring.activemq.pool.idleTimeout=30000
spring.activemq.packages.trust-all=true
  • 定义配置类:
@Configuration
@EnableJms
public class JmsConfiguration 
    // topic模式的ListenerContainer
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) 
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setPubSubDomain(true);
        bean.setConnectionFactory(activeMQConnectionFactory);
        return bean;
    
    
    // queue模式的ListenerContainer
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) 
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setConnectionFactory(activeMQConnectionFactory);
        return bean;
    

  • 定义监听器实现:
@Service
public class MQConsumerService 
    
    @JmsListener(destination = "portal.admin.topic",containerFactory = "jmsListenerContainerTopic") // 监听

以上是关于ActiveMQ知识概括的主要内容,如果未能解决你的问题,请参考以下文章

ActiveMQ知识概括

ActiveMQ_5死信队列

我们如何分析 Activemq 死信队列中的消息

Spring JMS 和 ActiveMQ 在哪里查看死信队列中的消息

创建自定义死信队列

消息中间件MQ知识概括