RabbitMQ笔记
Posted XLrong2000
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ笔记相关的知识,希望对你有一定的参考价值。
MQ
MessageQueue,消息队列,存放消息的队列,也就是事件驱动架构中的Broker
同步通讯和异步通讯
同步调用
Feign、RestTemplate
同步调用优缺点
- 优点
- 时效性强,可以立刻得到结果
- 缺点
- 耦合度高
- 性能下降
- 资源浪费
- 级联失败
异步调用
为了解除时间发布者与订阅者之间的耦合,两者并不是直接通信,而是有一个中间人(Broker)
发布者发布事件到Broker,不关心谁来订阅事件
订阅者从Broker订阅事件,不关心谁发来的信息
Broker是一个像数据总线的东西,所有的服务要接收数据和发送数据都发到这个总线上,这个总线就像协议一样,让服务间的通讯变得标准和可控
异步调用优缺点
- 优点
- 耦合度低: 每个服务都可以灵活插拔,可替换
- 吞吐量提升: 无需等待订阅者处理完成,响应更快捷
- 故障隔离: 服务没有直接调用,不存在级联失败问题
- 流量削峰: 不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理时间
- 调用间没有阻塞: 不会造成无效的资源占用
- 缺点
- 依赖于Broker的可靠性、安全性、吞吐能力
- 架构复杂,业务没有明显的流程先,不好追踪管理
比较常见的MQ实现
- ActiveMQ
- RabbitMQ
- RocketMQ
- Kafka
几种常见MQ的对比
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
- 追求可用性:Kafka、 RocketMQ 、RabbitMQ
- 追求可靠性:RabbitMQ、RocketMQ
- 追求吞吐能力:RocketMQ、Kafka
- 追求消息低延迟:RabbitMQ、Kafka
RabbitMQ
RabbitMQ是基于Erlang语言开发的开源消息通信中间件
安装RabbitMQ
- 拉取镜像
docker pull rabbitmq:3.8-management
- 运行容器
docker run \\
-e RABBITMQ_DEFAULT_USER=admin \\
-e RABBITMQ_DEFAULT_PASS=123456 \\
-v mq-plugins:/plugins \\
--name mq \\
--hostname mq \\
-p 15672:15672 \\
-p 5672:5672 \\
-d \\
rabbitmq:3.8-management
重要概念
- channel: 操作MQ的工具
- exchange: 消息路由,消息发送给它,它再发送到队列中
- queue: 缓存消息
- virtual host: 虚拟主机,是对queue、exchange等资源的逻辑分组
SpringAMQP
SpringAMQP是基于RabbitMQ封装的一套模版,并且还利用SpringBoot对其实现了自动装配
AMQP是一套用于在应用程序之间传递业务消息的开放标准,该协议与语言和平台无关,更符合微服务中独立性的要求
SpringAMQP是基于AMQP协议定义的一套API规范,提供了模版来发送和接受消息,包括两部分,其中SpringAMQP是基础抽象,SpringRabbit是底层的默认实现
SpringAMQP特征
- 侦听器容器:用于一部处理入栈消息
- 用于发送和接收消息的RabbitTemplate
- RabbitAdmin用于自动声明队列,交换和绑定
使用步骤
- 引入AMQP依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 便写配置文件
spring:
rabbitmq:
host: ip
port: 5672
virtual-host: / # 虚拟主机
username: admin # 用户名
password: 123456 # 密码
- 便写发送消息测试类
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue()
String queueName = "simple.queue";
String message = "msg";
rabbitTemplate.convertAndSend(queueName, message);
- 便携配置文件
spring:
rabbitmq:
host: ip
port: 5672
virtual-host: / # 虚拟主机
username: admin # 用户名
password: 123456 # 密码
- 便写接受消息测试类
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException
System.out.println("spring 消费者接收到消息: [" + msg + "]");
RabbitMQ模型
- 基本队列模型
- 工作队列模型
- 发布订阅模型
- 广播Fanout
- 路由Direct
- 通配Topic
基本队列
- 导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 编写配置类,添加mq连接信息
- 在publisher服务终利用RabbitTemplate发送信息到队列
- 在consumer服务中编写消费逻辑,绑定队列
基本消息队列的信息发送流程
- 建立connection
- 创建channel
- 利用channel声明队列
- 利用channel向队列发送消息
基本消息队列的消息接受流程
- 建立connection
- 创建channel(消费者也创建队列是为了防止队列不存在)
- 利用channel声明队列
- 定义consumer的消费行为handleDelivery()
- 利用channel将消费者与队列绑定
工作队列
实现一个队列绑定多个队列
-
投递消息预取的方式默认是轮询
-
多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
-
可以指定一个消费者处理完当前消息之后再去取新的消息:
spring:
rabbitmq:
listener:
prefetch: 1 # 每次只能取一条数据,处理完之后才能取下一条数据
发布/订阅
广播Fanout
路由Direct
会将接收到的信息根据规则路由到指定的Queue,因此称为路由模式
- 每一个Queue都与Exchange设置一个BindingKey
- 发布者发送信息时,指定消息的RoutingKey
- Exchange将消息路由到BindingKey
通配Topic
- TopicExchange与DirectExchange类似,区别在于routingKey可以是多个单词的列表,并且以
.
分割 - Queue与Exchange指定BindingKey时可以使用通配符
- #: 代指0或多个单词
- *: 代指一个单词
SpringAMQP消息转换器
以上是关于RabbitMQ笔记的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMQ学习笔记3:RabbitMQ快速入门消息模式