RabbitMQ笔记

Posted XLrong2000

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ笔记相关的知识,希望对你有一定的参考价值。

MQ

MessageQueue,消息队列,存放消息的队列,也就是事件驱动架构中的Broker

同步通讯和异步通讯

同步调用

Feign、RestTemplate

同步调用优缺点
  • 优点
    • 时效性强,可以立刻得到结果
  • 缺点
    • 耦合度高
    • 性能下降
    • 资源浪费
    • 级联失败

异步调用

为了解除时间发布者与订阅者之间的耦合,两者并不是直接通信,而是有一个中间人(Broker)
发布者发布事件到Broker,不关心谁来订阅事件
订阅者从Broker订阅事件,不关心谁发来的信息
Broker是一个像数据总线的东西,所有的服务要接收数据和发送数据都发到这个总线上,这个总线就像协议一样,让服务间的通讯变得标准和可控

异步调用优缺点
  • 优点
    • 耦合度低: 每个服务都可以灵活插拔,可替换
    • 吞吐量提升: 无需等待订阅者处理完成,响应更快捷
    • 故障隔离: 服务没有直接调用,不存在级联失败问题
    • 流量削峰: 不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理时间
    • 调用间没有阻塞: 不会造成无效的资源占用
  • 缺点
    • 依赖于Broker的可靠性、安全性、吞吐能力
    • 架构复杂,业务没有明显的流程先,不好追踪管理

比较常见的MQ实现

  • ActiveMQ
  • RabbitMQ
  • RocketMQ
  • Kafka

几种常见MQ的对比

RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般
  • 追求可用性:Kafka、 RocketMQ 、RabbitMQ
  • 追求可靠性:RabbitMQ、RocketMQ
  • 追求吞吐能力:RocketMQ、Kafka
  • 追求消息低延迟:RabbitMQ、Kafka

RabbitMQ

RabbitMQ是基于Erlang语言开发的开源消息通信中间件

安装RabbitMQ

  1. 拉取镜像
docker pull rabbitmq:3.8-management
  1. 运行容器
docker run \\
 -e RABBITMQ_DEFAULT_USER=admin \\
 -e RABBITMQ_DEFAULT_PASS=123456 \\
 -v mq-plugins:/plugins \\
 --name mq \\
 --hostname mq \\
 -p 15672:15672 \\
 -p 5672:5672 \\
 -d \\
 rabbitmq:3.8-management

重要概念

  • channel: 操作MQ的工具
  • exchange: 消息路由,消息发送给它,它再发送到队列中
  • queue: 缓存消息
  • virtual host: 虚拟主机,是对queue、exchange等资源的逻辑分组

SpringAMQP

SpringAMQP是基于RabbitMQ封装的一套模版,并且还利用SpringBoot对其实现了自动装配

AMQP是一套用于在应用程序之间传递业务消息的开放标准,该协议与语言和平台无关,更符合微服务中独立性的要求

SpringAMQP是基于AMQP协议定义的一套API规范,提供了模版来发送和接受消息,包括两部分,其中SpringAMQP是基础抽象,SpringRabbit是底层的默认实现

SpringAMQP特征

  • 侦听器容器:用于一部处理入栈消息
  • 用于发送和接收消息的RabbitTemplate
  • RabbitAdmin用于自动声明队列,交换和绑定

使用步骤

  1. 引入AMQP依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 便写配置文件
spring: 
  rabbitmq: 
    host: ip
    port: 5672
    virtual-host: / # 虚拟主机
    username: admin # 用户名
    password: 123456  # 密码
  1. 便写发送消息测试类
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testSimpleQueue() 
    String queueName = "simple.queue";
    String message = "msg";
    
    rabbitTemplate.convertAndSend(queueName, message);

  1. 便携配置文件
spring: 
  rabbitmq: 
    host: ip
    port: 5672
    virtual-host: / # 虚拟主机
    username: admin # 用户名
    password: 123456  # 密码
  1. 便写接受消息测试类
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException 
    System.out.println("spring 消费者接收到消息: [" + msg + "]");

RabbitMQ模型

  • 基本队列模型
  • 工作队列模型
  • 发布订阅模型
    • 广播Fanout
    • 路由Direct
    • 通配Topic

基本队列

  1. 导入依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 编写配置类,添加mq连接信息
  2. 在publisher服务终利用RabbitTemplate发送信息到队列
  3. 在consumer服务中编写消费逻辑,绑定队列
基本消息队列的信息发送流程
  1. 建立connection
  2. 创建channel
  3. 利用channel声明队列
  4. 利用channel向队列发送消息
基本消息队列的消息接受流程
  1. 建立connection
  2. 创建channel(消费者也创建队列是为了防止队列不存在)
  3. 利用channel声明队列
  4. 定义consumer的消费行为handleDelivery()
  5. 利用channel将消费者与队列绑定

工作队列

实现一个队列绑定多个队列

  • 投递消息预取的方式默认是轮询

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理

  • 可以指定一个消费者处理完当前消息之后再去取新的消息:

spring: 
  rabbitmq: 
    listener: 
      prefetch: 1 # 每次只能取一条数据,处理完之后才能取下一条数据

发布/订阅

广播Fanout
路由Direct

会将接收到的信息根据规则路由到指定的Queue,因此称为路由模式

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送信息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey
通配Topic
  • TopicExchange与DirectExchange类似,区别在于routingKey可以是多个单词的列表,并且以.分割
  • Queue与Exchange指定BindingKey时可以使用通配符
    • #: 代指0或多个单词
    • *: 代指一个单词

SpringAMQP消息转换器

以上是关于RabbitMQ笔记的主要内容,如果未能解决你的问题,请参考以下文章

分布式消息中间件之RabbitMQ学习笔记[一]

分布式消息中间件之RabbitMQ学习笔记[一]

Python 学习笔记 - RabbitMQ

RabbitMQ学习笔记3:RabbitMQ快速入门消息模式

RabbitMQ学习笔记3:RabbitMQ快速入门消息模式

RabbitMQ入门学习笔记