初识MQ和RabbitMQ

Posted 洛水|天依

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了初识MQ和RabbitMQ相关的知识,希望对你有一定的参考价值。

MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列。

几种常见MQ的对比:

RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般

MQ解决什么问题

MQ是一直存在,不过随着微服务架构的流行,成了解决微服务之间问题的常用工具,有以下几种主要的作用:

异步处理:用户注册后,发送注册邮件和注册短信。用户注册完成后,提交任务到 MQ,发送模块并行获取 MQ 中的任务。

系统解耦:比如用注册完成,再加一个发送微信通知。只需要新增发送微信消息模块,从 MQ 中读取任务,发送消息即可。无需改动注册模块的代码,这样注册模块与发送模块通过 MQ 解耦。

流量削峰:秒杀和抢购等场景经常使用 MQ 进行流量削峰。活动开始时流量暴增,用户的请求写入MQ,超过 MQ 最大长度丢弃请求,业务系统接收 MQ 中的消息进行处理,达到流量削峰、保证系统可用性的目的。

日志处理:日志采集方收集日志写入 kafka 的消息队列中,处理方订阅并消费 kafka 队列中的日志数据。

消息通讯:点对点或者订阅发布模式,通过消息进行通讯。如微信的消息发送与接收、聊天室等。

今天介绍RabbitMQ

RabbitMQ是MQ消息队列的一种,我们一般使用的是Spring集合后的SpringAMQP.

SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。

AMQP 是一种高级消息队列协议.而SpringAMQ是基于AMQP协议制订的一套api规范,提供了模范来发送和接收消息.

SpringAMQP提供了三个功能

  • 自动声明队列、交换机及其绑定关系 (绑关系)
  • 基于注解的监听器模式,异步接收消息 (接收消息)
  • 封装了RabbitTemplate工具,用于发送消息 (发送消息)

SpringAMQP的简单使用步骤:

1.在父工程中引入依赖,(依赖中包含了RabbitMQ的依赖)

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.编写消息发送方和消息接收方的配置文件.这里是让rabbitMQ服务端和消息的发送方和接收方建立了联系

spring:
  rabbitmq:
    host: 192.168.150.100 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: username # 用户名
    password: 1234 # 密码

3.在消息的接收方,新建一个类用来监听发送方发出的消息.

@Component
public class SpringRabbitListener 

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException 
        System.out.println("spring 消费者接收到消息:【" + msg + "】");
    

注意这个类是要交给Spring管理的所以加上@Component.监听的动作交给Spring,如果监听到这个队列中有消息,就会接收到,不用我们自己进行任何的操作

4.在消息的发送方,使用RabbitTemplet来发送消息到消息队列

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest 

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleQueue() 
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message);
    

这是个简单的发送和接收的例子.

完整的消息的发送涉及到的对象及步骤,消息生产者Producer首先和RabbitMQ服务器建立连接,获取通道channel, 然后生产者发送消息给指定的虚拟机中的交换机,交换机交换机根据消息的routingKey将消息路由(转发)给指定的队列.然后消费者Consumer首先也是和Rabbit建立连接,获取通道channel,然后消费者监听指定的队列,如果监听的队列Queue中有消息了,就可以从消息队列中获取到Producer发送的的\\消息了.

完整架构如下所示:

发布/订阅

发布订阅的模型如图:

可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:

  • Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
  • Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
  • Consumer:消费者,与以前一样,订阅队列,没有变化
  • Queue:消息队列也与以前一样,接收消息、缓存消息。

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

具体的实现流程:

  1. 导入依赖 作用:引入一个Rabbit服务器,

    <!--AMQP依赖,包含RabbitMQ--><dependency>  
      <groupId>org.springframework.boot</groupId>   
       <artifactId>spring-boot-starter-amqp</artifactId>
       </dependency>
    
  2. 编写消息发送者和消息接收者(消费者)的yml配置文件 作用: 和引入的Rabbit服务器建立联系,获取通道channel (Spring自动完成)

    spring:
      rabbitmq:
        host: 192.168.100.100 # 主机名
        port: 5672 # 端口
        virtual-host: / # 虚拟主机
        username: usname # 用户名
        password: 123456 # 密码
    
  3. 在消息的接收方这边创建监听. (通过注解@RabbitListener 声明出队列和交换机)

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"), 
            exchange = @Exchange(name = "itcast.direct",  type = ExchangeTypes.DIRECT),
            key = "red","blue"  //队列的标签key 与交换机的key做匹配
    ))
    
  4. 在消息生产者这边引用RabbitTemplate对象发送消息(Spring管理着这个RabbitTemplate的Bean,自动注入就可以使用)给交换机

  5. 交换机是消费者的接收方声明的.发送发也可以声明,这里有常用的几种类型的交换机

    Exchange:常用以下3种类型:

    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
  6. 交换机根据不同的routingkey把消息转发给匹配的消息队列,消费者监听者监听的通道中有了交换机转发的消息就获取消息.

    描述下Direct交换机与Topic交换机的差异?

    • Topic交换机接收的消息RoutingKey必须是多个单词,以 **.** 分割
    • Topic交换机与队列绑定时的bindingKey可以指定通配符
    • #:代表0个或多个词
    • *:代表1个词

    这里边消息的发送存在一个消息转化的问题,Spring使用的是jdk的消息序列化器,会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。DK序列化存在下列问题:

    • 数据体积过大
    • 有安全漏洞
    • 可读性差

    JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

    具体的实现

    ​ 就是导入依赖在消息接收的和发送的都需要导入,一个序列化,一个反序列化

    <dependency>
      <groupId>com.fasterxml.jackson.dataformat</groupId>
        <artifactId>jackson-dataformat-xml</artifactId>
        <version>2.9.10</version>
    </dependency>
    

    ​ 在消息接收的和发送的启动类中添加一个Bean即可,Spring启动的时候,发现有这个bean就不创建这个了,直接管理这个bean

    @Bean
    public MessageConverter jsonMessageConverter()
        return new Jackson2JsonMessageConverter();
    
    

    在spring中有一个简单的配置,可以解决

    多个消息接收器处理能力不同的的情况下,我们想让能力强的多处理,也就是处理完了消息就从消息对列中拿消息去处理,能力少的根据能力处理多少.默认是多个消息处理器轮询分发消息,平均每个处理器获取到的消息是一样的,但是我们可以通过设置prefetch来控制消费者预取的消息数量,从而达到能者多劳的效果

      rabbitmq:
        listener:
          simple:
            prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
    

学习随笔记录>>>>有不对.欢迎指教

以上是关于初识MQ和RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ 服务异步通信 -- 初识MQ(同步通信和异步通信MQ几种常见MQ的对比)RabbitMQ安装和介绍

初识RabbitMQ系列之一:简单介绍

RabbitMQ学习笔记-p1(初识MQ&快速入门)

01-初识消息队列MQ&&Rabbit相关概念介绍

RabbitMQ入门小结

RabbitMQ学习初识RabbitMQRabbitMQ的安装