消息队列RabbitMQ
Posted giao卤蛋
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列RabbitMQ相关的知识,希望对你有一定的参考价值。
一、RabbitMQ
1.1现实问题
目前我们已经完成了商品和搜索系统的开发。我们思考一下,是否存在问题?
- 商品的原始数据保存在数据库中,增删改查都在数据库中完成。
- 搜索服务数据来源是索引库,如果数据库商品发生变化,索引库数据不能及时更新。
如果我们在后台修改了商品的价格,搜索页面依然是旧的价格,这样显然不对。该如何解决?
这里有两种解决方案:
- 方案1:每当后台对商品做增删改操作,同时要修改索引库数据
- 方案2:搜索服务对外提供操作接口,后台在商品增删改后,调用接口
以上两种方式都有同一个严重问题:就是代码耦合,后台服务中需要嵌入搜索和商品页面服务,违背了微服务的独立
原则。
所以,我们会通过另外一种方式来解决这个问题:消息队列
1.2消息队列(MQ)
1.2.1什么是消息队列
消息队列,即MQ,Message Queue。
消息队列是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。
结合前面所说的问题:
- 商品服务对商品增删改以后,无需去操作索引库,只是发送一条消息,也不关心消息被谁接收。
- 搜索服务服务接收消息,去处理索引库。
如果以后有其它系统也依赖商品服务的数据,同样监听消息即可,商品服务无需任何代码修改。
1.2.2AMQP和JMS
MQ是消息通信的模型,并不是具体实现。现在实现MQ的有两种主流方式:AMQP、JMS。
两者间的区别和联系:
- JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
- JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
- JMS规定了两种消息模型;而AMQP的消息模型更加丰富
1.2.3常见的MQ产品
- ActiveMQ:基于JMS
- RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
- RocketMQ:基于JMS,阿里巴巴产品,目前交由Apache基金会
- Kafka:分布式消息系统,高吞吐量
1.2.4 RabbitMQ
RabbitMQ是基于AMQP的一款消息管理系统
官网: http://www.rabbitmq.com/
官方教程:http://www.rabbitmq.com/getstarted.html
消息中间件的两大规范:JMS,AMQP
RabbitMQ就是根据AMQP实现,也兼容JMS
提供的五种消息模型,第一种就是对队列的实现,后四种是我们发布订阅的变形实现
也就是遵循了两种实现,一个订阅的模式(发布订阅主体模式),还有一个队列模式点对点
假设我们有一个对象想要传出去,但它传输只支持流,我们把对象序列化成json,json就是一个字符串,把字符串以流的形式(byte)传输出去
spring boot无论整合那个消息中间件都非常简单,因为spring boot已经原生支持JMS,RabbitMQ
二、RabbitMQ概念
RabbitMQ简介:
RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现。
核心概念
-
Message:消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则有一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
-
Publisher:消息生产者,也是一个向交换器发布消息的客户端应用程序。
-
Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。Exchange有四种类型:direct(默认),fanout,topic,和headers,不同类型的Echange转发消息的策略有所区别
-
Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走
-
Binding:绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。Exchange和Queue的绑定可以是多对多的绑定。
-
Connection:网络连接,比如一个TCP连接
-
Channel:信道,多路复用连接中的一条独立的双向绑定数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁TCP都是非常昂贵的开销,所以引入信道的概念,以复用一条TCP连接。
-
Consumer:消费的消费者,表示一个从消息队列中取得消息的客户端应用程序
-
Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须是在连接时指定,RabbitMQ默认的是vhost是/。
-
Broker:表示消息队列服务器实体
原理如下:
三、Docker安装RabbitMQ
3.1安装
下载镜像:docker pull rabbitmq:management
创建实例并启动:
docker run -d --name rabbitmq --publish 5671:5671 \\
--publish 5672:5672 --publish 4369:4369 --publish 25672:25672 --publish 15671:15671 --publish 15672:15672 \\
rabbitmq:management
加上这个命令,以后虚拟机docker一启动,rabbitmq就启动
docker update rabbitmq --restart=always
注意:
4369 – erlang发现口
5672 --client端通信口
15672 – 管理界面ui端口
25672 – server间内部通信口
3.2测试
我们只要访问15672端口,就可以看到rabbitmq的管理端的登录页
默认账号密码guest
connections:无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费,在这里可以查看连接情况
channels:通道,建立连接后,会形成通道,消息的投递获取依赖通道。
Exchanges:交换机,用来实现消息的路由
Queues:队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列。
端口:
5672: rabbitMq的编程语言客户端连接端口
15672:rabbitMq管理界面端口
25672:rabbitMq集群的端口
四、RabbitMQ运行机制
AMQP中的消息路由由
- AMQP中消息的路由过程和Java开发熟悉的JMS存在一些差别,AMQP中增加了Exchange和Binding的角色。生产者把消息发布到Exchange上,消息最终到达队列并被消费者接收,而binding决定交换器的消息应该发送到那个队列。
Exchange类型
-
Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、tanout、topic、headers。headers匹配AMQP消息的header而不是路由键,headers交换器和direct交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型:
direct和headers是JMS的点对点通信的实现 tanout和topic是发布订阅的实现 headers性能比较低下不建议使用
direct和fanout和topic三个交换机,交换机类型不同,路由到地方就不一样
五、RabbitMQ的图形用户界面使用
添加用户
如果不使用guest,我们也可以直接创建一个用户;
1、 超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
2、 监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
3、 策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
4、 普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
5、 其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
创建一个新的交换机
交换机必须与队列进行绑定才能工作
进入交换机可以看到
默认是没有任何绑定,我们也可以在里面进行消息传输
所以我们想要使用交换机首先绑定一个队列,绑定一个队列就要先创建一个队列
创建好队列现在将交换机与队列进行绑定(交换机也可以根交换机绑定,也可以跟队列绑定)
进行绑定
接下来我们根据如下图来在RabbitMQ里面创建所有的交换机,队列,以及绑定好他们之间的关系,可以进行测试下各个关系都是怎么进行工作的
首先先创建四个队列以便测试
接着创建交换机
使用该交换机绑定四个队列
进入队列可以看到
消息ready为0,消息总量total为0,未回复消息unacked也为0
想要发消息,首先将消息发送给交换机
下面我们是我们直接交换机的演示,就是路由键精确写什么,我们就路由到哪里。direct exchange
发送消息指定我们的路由 根据绑定交换机队列指定的路由不一样而进行开发
对路由为hhxy.news的消息队列进行发送消息
就可以发现hhxy.news中确实有消息发送 ready表示有一个消息准备好了但还没接收
如果想要查看消息队列中的消息进入hhxy.news队列里面,再进入get Message中
Ack Mode回复模式 Nack message requeue true我们把消息拿来,不告诉RabbitMQ我收到消息了,RabbitMQ就会把消息重新存放到队列里面,让别人拿到。
接着我们发现看完消息后,消息还在所以我们换一种类型查看消息
通过Automatic ack来获取消息
查看消息成功,当我们再次点击获取消息时会发现
就会发现消息已经不在队列中了
fanout exchange 广播型交换机
同样我们为该交换器绑定消息队列
发送消息查看消息队列谁能接收到消息
可以发现所有的消息队列都获取到数据了
可以说我们发消息无论我们的路由键是什么,我们的消息队列都会有消息
就算我们发消息不写路由键,所有消息队列都会收到
Topic exchange 主题型交换机
首先我们创建一个交换机topic
根据hhxy.#,#.news,#.emps的路由键来绑定交换机
#就代表可以有单词,可以没有 *就代表必须有
现在开始发送消息 观察具体哪一个消息队列获取到消息
假设我们发送一个路由键为hhxy.news的消息
消息队列为1的,当我发送了消息就加1如下图
因为hhxy.news符合路由键中四个要求所以所有的都会加1
我们再发送一个消息这次以.news结尾的路由键 hello.news
根据情况推论,hello.news只符合以上四种路由键中hhxyxueyuan.news的路由键*.news,所以应该只要hhxyxueyuan.news的消息队列会加1,我们测试一下结果如下
果然如同我们猜想,hhxyxueyuan.news消息队列加1了
可以看出topic exchange相当于模糊匹配
五、RabbitMQ整合
1,引入spring-boot-starter-amqp
2,application.yml配置
3,测试RabbitMQ
1,AmqpAdmin:管理组件
2,RabbitTemplate:消息发送处理组件
/**
* 使用RabbitMQ
* 1,引入了amqp场景 RabbitAutoConfiguration就会自动生效
* 2,给容器中自动配置了
* RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate
* 所有的属性都是spring.rabbitmq
* @ConfigurationProperties(prefix = "spring.rabbitmq")
* public class RabbitProperties
*
*
* 3,给配置文件中配置spring.rabbitmq信息
* 4,@EnableRabbit:@EnableXxxxx 开启功能
* 5,监听消息:使用@RabbitListener:必须有@EnableRabbit
* @RabbitListener:类+方法上(监听哪些队列即可)
* RabbitHandler:标在方法上(重载区分不同的消息)
*
*/
给RabbitMQ放一些连接工厂,从里面获取连接
放入组件
配置application.yml可以查看
RabbitAutoConfiguration的代码中 RabbitProperties
配置文件我们写在application.properties中
spring.rabbitmq.host=192.168.172.128
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
其他的暂时不写,因为RabbitProperties.class中默认定义了很多
现在开始测试使用代码来创建exchange,queue,binding
启动类上要添加注解@EnableRabbit
/**
* 使用RabbitMQ
* 1,引入了amqp场景 RabbitAutoConfiguration就会自动生效
*
* 2,给容器中自动配置了
* RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate
* 所有的属性都是spring.rabbitmq
* @ConfigurationProperties(prefix = "spring.rabbitmq")
* public class RabbitProperties
*
*
* 3,给配置文件中配置spring.rabbitmq信息
* 4,@EnableRabbit:@EnableXxxxx 开启功能
*
*/
@EnableRabbit
@EnableDiscoveryClient
@SpringBootApplication
public class MallOrderApplication
public static void main(String[] args)
SpringApplication.run(MallOrderApplication.class,args);
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class MallOrderApplication
@Resource
AmqpAdmin amqpAdmin;
/**
*1,如何创建exchange[hello.java.exchange],queue,binding
* 1)、使用AmqpAdmin进行创建 管理组件,帮我们进行创建队列,绑定关系,销毁这些队列,后台的增删改查都能使用
*2,如何收发消息
*
*/
//创建出exchange
@Test
public void createExchange()
// public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
//三个参数:exchange的名字,是否持久化true,是否自动删除false
DirectExchange directExchange = new DirectExchange("hello-java-exchange",true,false);
amqpAdmin.declareExchange(directExchange);
log.info("exchang[]创建成功","hello-java-exchange");
接着我们创建出队列
/**
* Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
* 姓名 是否持久化 是否排它 是否自动删除
* 排它如果是true,这个排它只能被声明的连接使用 只要有一条连接连上我们队列,是我们声明的这个连接,别人就连不上我们这个队列,实际开发中我们都不应该是排它,我们让所有人都能连接到队列,谁能接到消息可能就一个人能接到消息
*/
@Test
public void createQueue()
Queue queue = new Queue("hello-java-queue",true,false,false);
amqpAdmin.declareQueue(queue);
log.info("queue[]创建成功","hello-java-queue");
再将交换机与队列进行绑定
/**
* Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments)
* 目的地 目的地的类型 交换机 路由键 自定义参数
* 将exchange指定的交换机和destination目的地进行绑定,使用routingKey作为指定路由键
*/
@Test
public void binding()
Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE,"hello-java-exchange","hello.java",null);
amqpAdmin.declareBinding(binding);
log.info("binding[]创建成功","hello-java-binding");
接下来我们来测试发送消息
@Resource
RabbitTemplate rabbitTemplate;
//RabbitTemplate可以用来收发消息
@Test
public void sendMessage()
//1,发送消息
String msg="hello world";
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",msg);
//交换机 路由键 发送的消息
log.info("消息发送完成",msg);
我们将消息取出方便接下来测试 hello-java-queue已经清空为0
我们尝试传输一个对象(发送一个对象)
先新建一个对象
package com.hhxy.mall.order;
import lombok.Data;
import lombok.ToString;
import java.io.Serializable;
@ToString
@Data
public class giao implements Serializable
private Long id;
private String name;
private Integer age;
@Resource
RabbitTemplate rabbitTemplate;
@Test
public void sendMessage()
giao giao = new giao();
giao.setId(1L);
giao.setName("giao");
giao.setAge(18);
//1,发送消息
// String msg="hello world";
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",giao);
log.info("消息发送完成",giao);
我们进入图形用户界面获取下消息
可以得知该消息是通过Java序列化得到的内容,要想看到真正的内容还得对内容进行反序列化
且发送对象必须对对象进行序列化,因为传输对象使用的序列化机制
我们发送的对象类型的消息,可以是一个json 在RabbitAutoConfiguration中RabbitTemplate中有消息转换器MessageConverter
消息转换器messageConverter在RabbitTemplateConfiguration构造的时候我们就会拿到所有的消息转换器
如果容器中有messageConverter,就用容器中的,就把messageConverter放到RabbitTemplate
如果容器中没有RabbitTemplate则使用SimpleMessageConverter
SimpleMessageConverter就是使用我们序列化机制 它是通过fromMessage进行消息转换
SimpleMessageConverter中还有创建消息createMessage
如果是string就直接做,如果是实现了序列化接口,它就使用序列化工具将它转换成一个byte[]数组
可以看到是messageConverter在起作用,如果我们想要变换messageConverter消息转换策略,我们就一起看看messageConverter内
messageConverter是一个接口我们看看方法内部
要想以json的形式传输我们就要在容器中加一个Jackson2JsonMessageConverter(给容器中放一个消息转换器)
package com.hhxy.mall.order.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MyRabbitConfig
@Bean
public MessageConverter messageConverter()
return new Jackson2JsonMessageConverter();
添加了Jackson2JsonMessageConverter我们在测试下发送的对象的消息,获取出来是什么类型
在消息属性可以看到内容类型是json,而且在properties中还有type,那个实体类对象使用json形式传输
接下来我们测试下接收消息 我们想要接收某个队列的内容,就需要先监听这个队列
想要监听队列spring为我们抽取了个注解RabbitListener,它的作用就是监听我们指定的队列
其中方法:queues()可以指定我们想要监听的队列,只要这个队列有内容,我们就可以收到内容,而且该队列必须存在
如果我们只是测试收发消息,发消息之类的可以不用监听消息,我们可以不开启@EnableRabbit注解,如果想要监听消息就一定要先开启@EnableRabbit注解
接下来测试:
监听hello-java-queue这个队列的消息
现在随便进入一个业务逻辑获取写一个方法
/**
* queue:声明需要监听的所有队列
*/
@RabbitListener(queues = "hello-java-queue")
public void recieveMessage(Object message)
System.out.println("接收到消息。。。。内容:"+message+"=====>消息类型:"+message.getClass());
接着我们再发送一次消息可以发现服务接收到消息
接收到消息。。。。内容:(Body:'"id":1,"name":"giao","age":18' MessageProperties [headers=__TypeId__=com.hhxy.mall.order.giao, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=hello-java-exchange, receivedRoutingKey=hello.java, deliveryTag=1, consumerTag=amq.ctag-KustLHtvD-P25mE8sGmu6g, consumerQueue=hello-java-queue])=====>消息类型:class org.springframework.amqp.core.Message
换一种参数写法
/**
* queue:声明需要监听的所有队列
*
* class org.springframework.amqp.core.Message
*
* 参数类型可以写以下类型
* 1,Message message:原生消息详细信息。头+体
* 2,T<发送消息的类型>Giao giao
*/
@RabbitListener(queues = "hello-java-queue")
public void recieveMessage(Message message, Giao content)
byte[] body = message.getBody();
//'"id":1,"name":"giao","age":18'
MessageProperties messageProperties = message.getMessageProperties();
//消息头属性信息
System.out.println("接收到消息。。。。内容:"+message+"=====>内容:"+content);
接收到消息。。。。内容:(Body:'"id":1,"name":"giao","age":18' MessageProperties [headers=__TypeId__=com.hhxy.mall.order.Giao, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=hello-java-exchange, receivedRoutingKey=hello.java, deliveryTag=1, consumerTag=amq.ctag-6bu-qmHY7lcoqBMrSqzTqQ, consumerQueue=hello-java-queue])=====>内容:Giao(id=1, name=giao, age=18)
一个客户端只会建立一个连接 所有的数据都在通道里传输,所以我们可以获取到通道
/**
* queue:声明需要监听的所有队列
*
* class org.springframework.amqp.core.Message
*
* 参数类型可以写以下类型
* 1,Message message:原生消息详细信息。头+体
* 2,T<发送消息的类型>Giao giao
* 3.Channel channel:当前传输数据的通道
*/
@RabbitListener(queues = "hello-java-queue")
public void recieveMessage(Message message, Giao content, Channel channel)
byte[] body = message.getBody();
//'"id":1,"name":"giao","age":18'
MessageProperties messageProperties = message.getMessageProperties();
//消息头属性信息
System.out.println("接收到消息。。。。内容:"+message+"=====>内容:"+content);
* queue可以很多人来监听。只要收到消息,队列就会删除消息,而且只能有同一个收到此消息
* 场景:
* 1,订单服务启动多个
1,订单服务启动多个
当有多个订单服务监听这个消息队列时,获取消息到底是所有都获得还是只有一个呢?
改造发送消息的单元测试方法 for循环发送十次消息
查看接收消息的情况
虽然两个客户端都能接收到消息,但是可以发现同一个消息只能被一个客户端接收
为什么发现有几个消息没有被客户端接收,这三个消息并没有丢失,是因为单元测试中接收了三个消息
2,只有一个消息完全处理完,方法运行结束,我们就可以接受到下一个消息
发送消息代码如上发送十条消息
客户端接收消息代码如下
@RabbitListener(queues = "hello-java-queue")
public void recieveMessage(Message message, Giao content, Channel channel) throws InterruptedException
System.out.println("接收到消息。。。。"+content);
byte[] body = message.getBody();
//'"id":1,"name":"giao","age":18'
MessageProperties messageProperties = message.getMessageProperties();
Thread.sleep(3000);
//消息头属性信息
System.out.println("消息处理完成=》"+content.getName());
了解RabbitListener和RabbitHandler
RabbitListener可以标记在类+方法上
RabbitHandler可以标记在方法上
二者之间区别 接收不同对象的场景下可以使用RabbitHandler重载处理
发现单元测试总是接收一部分消息不方便测试,编写一个控制类来发送消息 接着观察接收消息的对象
@RestController
public class RabbitController
@Resource
RabbitTemplate rabbitTemplate;
@GetMapping("/sendMq")
public String sendMq(@RequestParam(value = "num",defaultValue = "10") Integer num)
for (int i = 0; i <10 ; i++)
if (i%2==0)
OrderReturnApplyEntity orderReturnApplyEntity = new OrderReturnApplyEntity();
orderReturnApplyEntity.setId(1L);
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",orderReturnApplyEntity);
else
OrderEntity orderEntity = new OrderEntity();
orderEntity.setOrderSn(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",orderEntity);
return "ok";
@RabbitListener(queues = "hello-java-queue")
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService
/**
* queue:声明需要监听的所有队列
*
* class org.springframework.amqp.core.Message
*
* 参数类型可以写以下类型
* 1,Message message:原生消息详细信息。头+体
* 2,T<发送消息的类型>Giao giao
* 3.Channel channel:当前传输数据的通道
*
* queue可以很多人来监听。只要收到消息,队列就会删除消息,而且只能有同一个收到此消息
* 场景:
* 1,订单服务启动多个
* 2,只有一个消息完全处理完,方法运行结束,我们就可以接受到下一个消息
*/
// @RabbitListener(queues = "hello-java-queue")
@RabbitHandler
public void recieveMessage(Message message, OrderReturnApplyEntity orderReturnApp以上是关于消息队列RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章