初识MQ和RabbitMQ
Posted 洛水|天依
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了初识MQ和RabbitMQ相关的知识,希望对你有一定的参考价值。
MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列。
几种常见MQ的对比:
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
MQ解决什么问题
MQ是一直存在,不过随着微服务架构的流行,成了解决微服务之间问题的常用工具,有以下几种主要的作用:
异步处理:用户注册后,发送注册邮件和注册短信。用户注册完成后,提交任务到 MQ,发送模块并行获取 MQ 中的任务。
系统解耦:比如用注册完成,再加一个发送微信通知。只需要新增发送微信消息模块,从 MQ 中读取任务,发送消息即可。无需改动注册模块的代码,这样注册模块与发送模块通过 MQ 解耦。
流量削峰:秒杀和抢购等场景经常使用 MQ 进行流量削峰。活动开始时流量暴增,用户的请求写入MQ,超过 MQ 最大长度丢弃请求,业务系统接收 MQ 中的消息进行处理,达到流量削峰、保证系统可用性的目的。
日志处理:日志采集方收集日志写入 kafka 的消息队列中,处理方订阅并消费 kafka 队列中的日志数据。
消息通讯:点对点或者订阅发布模式,通过消息进行通讯。如微信的消息发送与接收、聊天室等。
今天介绍RabbitMQ
RabbitMQ是MQ消息队列的一种,我们一般使用的是Spring集合后的SpringAMQP.
SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。
AMQP 是一种高级消息队列协议.而SpringAMQ是基于AMQP协议制订的一套api规范,提供了模范来发送和接收消息.
SpringAMQP提供了三个功能:
- 自动声明队列、交换机及其绑定关系 (绑关系)
- 基于注解的监听器模式,异步接收消息 (接收消息)
- 封装了RabbitTemplate工具,用于发送消息 (发送消息)
SpringAMQP的简单使用步骤:
1.在父工程中引入依赖,(依赖中包含了RabbitMQ的依赖)
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.编写消息发送方和消息接收方的配置文件.这里是让rabbitMQ服务端和消息的发送方和接收方建立了联系
spring:
rabbitmq:
host: 192.168.150.100 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: username # 用户名
password: 1234 # 密码
3.在消息的接收方,新建一个类用来监听发送方发出的消息.
@Component
public class SpringRabbitListener
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException
System.out.println("spring 消费者接收到消息:【" + msg + "】");
注意这个类是要交给Spring管理的所以加上@Component.监听的动作交给Spring,如果监听到这个队列中有消息,就会接收到,不用我们自己进行任何的操作
4.在消息的发送方,使用RabbitTemplet来发送消息到消息队列
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue()
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
这是个简单的发送和接收的例子.
完整的消息的发送涉及到的对象及步骤,消息生产者Producer首先和RabbitMQ服务器建立连接,获取通道channel, 然后生产者发送消息给指定的虚拟机中的交换机,交换机交换机根据消息的routingKey将消息路由(转发)给指定的队列.然后消费者Consumer首先也是和Rabbit建立连接,获取通道channel,然后消费者监听指定的队列,如果监听的队列Queue中有消息了,就可以从消息队列中获取到Producer发送的的\\消息了.
完整架构如下所示:
发布/订阅
发布订阅的模型如图:
可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:
- Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
- Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
- Consumer:消费者,与以前一样,订阅队列,没有变化
- Queue:消息队列也与以前一样,接收消息、缓存消息。
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
具体的实现流程:
-
导入依赖 作用:引入一个Rabbit服务器,
<!--AMQP依赖,包含RabbitMQ--><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
编写消息发送者和消息接收者(消费者)的yml配置文件 作用: 和引入的Rabbit服务器建立联系,获取通道channel (Spring自动完成)
spring: rabbitmq: host: 192.168.100.100 # 主机名 port: 5672 # 端口 virtual-host: / # 虚拟主机 username: usname # 用户名 password: 123456 # 密码
-
在消息的接收方这边创建监听. (通过注解@RabbitListener 声明出队列和交换机)
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT), key = "red","blue" //队列的标签key 与交换机的key做匹配 ))
-
在消息生产者这边引用RabbitTemplate对象发送消息(Spring管理着这个RabbitTemplate的Bean,自动注入就可以使用)给交换机
-
交换机是消费者的接收方声明的.发送发也可以声明,这里有常用的几种类型的交换机
Exchange:常用以下3种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
-
交换机根据不同的routingkey把消息转发给匹配的消息队列,消费者监听者监听的通道中有了交换机转发的消息就获取消息.
描述下Direct交换机与Topic交换机的差异?
- Topic交换机接收的消息RoutingKey必须是多个单词,以
**.**
分割 - Topic交换机与队列绑定时的bindingKey可以指定通配符
#
:代表0个或多个词*
:代表1个词
这里边消息的发送存在一个消息转化的问题,Spring使用的是jdk的消息序列化器,会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。DK序列化存在下列问题:
- 数据体积过大
- 有安全漏洞
- 可读性差
JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。
具体的实现
就是导入依赖在消息接收的和发送的都需要导入,一个序列化,一个反序列化
<dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> <version>2.9.10</version> </dependency>
在消息接收的和发送的启动类中添加一个Bean即可,Spring启动的时候,发现有这个bean就不创建这个了,直接管理这个bean
@Bean public MessageConverter jsonMessageConverter() return new Jackson2JsonMessageConverter();
在spring中有一个简单的配置,可以解决
多个消息接收器处理能力不同的的情况下,我们想让能力强的多处理,也就是处理完了消息就从消息对列中拿消息去处理,能力少的根据能力处理多少.默认是多个消息处理器轮询分发消息,平均每个处理器获取到的消息是一样的,但是我们可以通过设置prefetch来控制消费者预取的消息数量,从而达到能者多劳的效果
rabbitmq: listener: simple: prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
- Topic交换机接收的消息RoutingKey必须是多个单词,以
学习随笔记录>>>>有不对.欢迎指教
以上是关于初识MQ和RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章