RabbitMQ(简介概念安装和springboot整合)

Posted 李潘杜若

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ(简介概念安装和springboot整合)相关的知识,希望对你有一定的参考价值。

RabbitMQ(简介、概念、安装和springboot整合)

一、MQ简介

​ 在计算机科学中,消息队列((英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列互交。消息会保存在队列中,直到接收者取回它。

1.1.实现
  • 消息队列常常保存在链表结构中。拥有权限的进程可以向消息队列中写入或读取消息。
  • 目前,有很多消息队列有很多开源的实现,包括JBoss Messaging.JORAM、Apache ActiveMQ、Sun0pen Message Queue、IBM MQ、Apache Qpid和HTTPSQS。
  • 当前使用较多的消息队列有RabbitMQ、RocketNQ、ActiveMQ、Kafka、ZeroNQ、MetaMq等,而部分数据库如Redis、mysql以及phxsql也可实现消息队列的功能。
1.2.特点

​ MQ是消费者-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。MQ和JMS类似,但不同的是JMS是SUN JAVA消息中间件服务的一个标准和API定义,而MQ则是遵循了AMQP协议的具体实现和产品。
注意:

  1. AMQP,即DAdvanced MessageQueuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。
  2. JMS,Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件的API,
    用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。常见的消息队列,大部分都实现了JMSAPI,如
    ActiveMQ ,Redis以及 RabbitMQ等。
1.3.优缺点

优点

应用耦合、异步处理、流量削锋

  • 解耦

    传统模式:

传统模式的缺点:
系统间耦合性太强,如上图所示,系统A在代码中直接调用系统B和系统C的代码,如果将来D系统接入,系统A还需要修改代码,过于麻烦!
中间件模式:

中间件模式的的优点:
将消息写入消息队列,需要消息的系统自己从消息队列中订阅,从而系统A不需要做任何修改。
。异步
传统模式:

传统模式的缺点:
—些非必要的业务逻辑以同步的方式运行,太耗费时间。

中间件模式:

中间件模式的的优点:
将消息写入消息队列,需要消息的系统自己从消息队列中订阅,从而系统A不需要做任何修改。

  • 削峰

    传统模式:

传统模式的缺点:
并发量大的时候,所有的请求直接怼到数据库,造成数据库连接异常

中间件模式:

中间件模式的的优点:
系统A慢慢的按照数据库能处理的并发量,从消息队列中慢慢拉取消息。在生产中,这个短暂的高峰期积压是允许的。
缺点
系统可用性降低、系统复杂性增加

1.4.使用场景

​ 消息队列,是分布式系统中重要的组件,其通用的使用场景可以简单地描述为:当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候
​ 在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。

1.5.为什么使用RabbitMQ

​ AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

​ AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

​ RabbitMQ是一个开源的AMQP实现,服务器端用Erlangi语言编写,支持多种客户端,如: Python、
Ruby、 .NET,Java,JMS、C,php,ActionScript, XMPP,STONP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

总结如下:

  • 基于AMQP协议
  • 高并发(是一个容量的概念,服务器可以接受的最大任务数量)。
  • 高性能(是一个速度的概念,单位时间内服务器可以处理的任务数)
  • 高可用(是一个持久的概念,单位时间内服务器可以正常工作的时间比例)。
  • 强大的社区支持,以及很多公司都在使用
  • 支持插件
  • 支持多语言

二、概念

  • RabbitMQ简介:RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现。

  • Message:消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

  • Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序。

  • Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。Exchange有4种类型:direct(默认),fanout, topic, 和headers(headers和direct交换器完全一致,但性能差很多,目前几乎用不到),不同类型的Exchange转发消息的策略有所区别。

  • Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

  • Binding:绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。Exchange 和Queue的绑定可以是多对多的关系。

  • Connection:网络连接,比如一个TCP连接。

  • Channel:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP 连接。

  • Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

  • Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加

    密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。

  • Broker:表示消息队列服务器实体

三、安装

  1. 获取镜像
#指定版本,该版本包含了web控制页面
docker pull rabbitmq:management
  1. 运行镜像
#方式一:默认guest 用户,密码也是 guest
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management

#方式二:设置用户名和密码
docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management

#方式三
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management

	# 4369, 25672 (Erlang发现&集群端口) 
	# 5672, 5671 (AMQP端口) 15672 (web管理后台端口) 
	# 61613, 61614 (STOMP协议端口) 
	# 1883, 8883 (MQTT协议端口) 
	# https://www.rabbitmq.com/networking.html
  1. 访问ui界面
http://localhost:15672/

四、SpringBoot整合RabbitMQ

4.1. 引入依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
4.2. application.yml配置
spring:
  rabbitmq:
    host: 192.168.10.123
    port: 5672
    virtual-host: /
    #开启发送端确认
    #publisher-confirms: true
    publisher-confirm-type: correlated
    # 开启发送端消息抵达队列的确认
    publisher-returns: true
    template:
      #只要抵达队列,以异步发送优先调用我们这个return - confirm
      mandatory: true
    listener:
      simple:
        #手动ack消息(手动确认消息是否消费)
        acknowledge-mode: manual
4.3. RabbitConfig配置类
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

@Configuration
public class MyRabbitConfig 

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Primary
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) 
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setMessageConverter(messageConverter());
        initRabbitTemplate();
        return rabbitTemplate;
    

    /**
     * 设置传输消息格式为json
     * @return
     */
    @Bean
    public MessageConverter messageConverter() 
        return new Jackson2JsonMessageConverter();
    

    /**
     * 定制RabbitTemplate
     * 1、服务收到消息就会回调
     *      1、spring.rabbitmq.publisher-confirms: true
     *      2、设置确认回调
     * 2、消息正确抵达队列就会进行回调
     *      1、spring.rabbitmq.publisher-returns: true
     *         spring.rabbitmq.template.mandatory: true
     *      2、设置确认回调ReturnCallback
     *
     * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息),即手动确认
     *      消费端宕机,消息也不会丢失(channel.basicAck() )
     *
     */
    // @PostConstruct  //MyRabbitConfig对象创建完成以后,执行这个方法
    public void initRabbitTemplate() 

        /**
         * 1、只要消息抵达Broker就ack=true
         * correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
         * ack:消息是否成功收到
         * cause:失败的原因
         */
       /* rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() 
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) 
                System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
            
        );*/
        //设置确认回调
        rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> 
            System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
        );


        /**
         * 只要消息没有投递给指定的队列,就触发这个失败回调
         * message:投递失败的消息详细信息
         * replyCode:回复的状态码
         * replyText:回复的文本内容
         * exchange:当时这个消息发给哪个交换机
         * routingKey:当时这个消息用哪个路邮键
         */
        rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> 
            System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
                    "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
        );
    


4.4. RabbitMQConfig(容器中创建交换机、队列和绑定)
package com.lyh.mall.order.config;

import com.lyh.mall.order.entity.OrderEntity;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;
import java.util.HashMap;


/**
 * MQ交换机、普通队列、死信队列和绑定
 * 容器中的Queue、Exchange、Binding 会自动创建(在RabbitMQ中不存在的情况下)
 **/

@Configuration
public class MyRabbitMQConfig 

    /**
     * 测试
     */
    /*@RabbitListener(queues = "order.release.order.queue")
    public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException 
        System.out.println("收到过期的订单消息:准备关闭订单"+orderEntity.getOrderSn());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    */
  
   /**
     * 使用JSON序列化机制,进行消息转换
     * @return
     */
    @Bean
    public MessageConverter messageConverter() 
        return new Jackson2JsonMessageConverter();
    

    /* 容器中的Queue、Exchange、Binding 会自动创建(在RabbitMQ中不存在的情况下) */

    /**
     * 死信队列
     *
     * @return
     */
    @Bean
    public Queue orderDelayQueue() 
        /*
            Queue(String name,  队列名字
            boolean durable,  是否持久化
            boolean exclusive,  是否排他
            boolean autoDelete, 是否自动删除
            Map<String, Object> arguments) 属性
         */
        HashMap<String, Object> arguments = new HashMap<>();
        //死信路由
        arguments.put("x-dead-letter-exchange", "order-event-exchange");
        //死信路由键
        arguments.put("x-dead-letter-routing-key", "order.release.order");
        arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟
        //创建队列
        Queue queue = new Queue("order.delay.queue", true, false, false, arguments);

        return queue;
    

    /**
     * 普通队列
     *
     * @return
     */
    @Bean
    public Queue orderReleaseQueue() 

        Queue queue = new Queue("order.release.order.queue", true, false, false);

        return queue;
    

    /**
     * TopicExchange
     * 创建交换机
     * @return
     */
    @Bean
    public Exchange orderEventExchange() 
        /*
         *   String name,
         *   boolean durable,
         *   boolean autoDelete,
         *   Map<String, Object> arguments
         * */
        return new TopicExchange("order-event-exchange", true, false);

    

    /**
     * 绑定死信队列
     * @return
     */
    @Bean
    public Binding orderCreateBinding() 
        /*
         * String destination, 目的地(队列名或者交换机名字)
         * DestinationType destinationType, 目的地类型(Queue、Exhcange)
         * String exchange,
         * String routingKey,
         * Map<String, Object> arguments
         * */
        return new Binding("order.delay.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.create.order",
                null);
    

    /**
     * 绑定普通队列
     * @return
     */
    @Bean
    public Binding orderReleaseBinding() 

        return new Binding("order.release.order.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.release.order",
                null);
    

    /**
     * 订单释放直接和库存释放进行绑定
     * @return
     */
    @Bean
    public Binding orderReleaseOtherBinding() 

        return new Binding("stock.release.stock.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.release.other.#",
                null);
    


    /**
     * 商品秒杀队列
     * @return
     */
    @Bean
    public Queue orderSecKillOrrderQueue() 
        Queue queue = new Queue("order.seckill.order.queue", true, false, false);
        return queue;
    

    @Bean
    public Binding orderSecKillOrrderQueueBinding() 
        //String destination, DestinationType destinationType, String exchange, String routingKey,
        // 			Map<String, Object> arguments
        Binding binding = new Binding(
                "order.seckill.order.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.seckill.order",
                null);

        return binding;
    


4.5. 测试代码
  • AmqpAdmin:管理组件
  • RabbitTemplate:消息发送处理组件
  • @RabbitListener 监听消息的方法可以有三种参数(不分数量,顺序)Object content, Message message,Channel channel
import com.lyh.mall.order.entity.OrderReturnReasonEntity;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.Date;

@SpringBootTest
class MallOrderApplicationTests 

    @Autowired
    private AmqpAdmin amqpAdmin;
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送消息
     */
    @Test
    void sendMessige()
        OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
        reasonEntity.setId(1L);
        reasonEntity.setCreateTime(new Date());
        reasonEntity.setName("你好!");
        rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",reasonEntity);
        System.out.println("发送消息成功!!");
    

    /**
     * 创建交换机
     */
    @Test
    void createExchange() 
        DirectExchange directExchange = new DirectExchange("hello-java-exchange",true,false);
        amqpAdmin.declareExchange(directExchange);
        System.out.println("创建成功");
    

    /**
     * 创建队列
     */
    @Test
    void createQueue() 
        Queue queue = new Queue("hello-java-queue",true,false,false);
        amqpAdmin.declareQueue(queue);
        System.out.println("创建成功");
    

    /**
     * 创建绑定
     */
    @Test
    void createBinding() 
        //String destination【目的地】,
        // DestinationType destinationType【目的地类型】,
        // String exchange【交换机】,
        // String routingKey【路由键】,
        //@Nullable Map<String, Object> arguments【自定义参数】
        Binding binding = new Binding("hello-java-queue",Binding.DestinationType.QUEUE,"hello-java-exchange","hello.java",null);
        amqpAdmin.declareBinding(binding);
        System.out.println("创建成功");
    



//service层加监听注解获取 消息数据

/**
     * 监听消息
     * queues 声明需要监听的所有队列
     * org.springframework.amqp.core.Message
     * <p>
     * 参数可以写一下类型
     * 1、Message essage: 原生消息详细信息。头+体
     * 2、发送的消息的类型: OrderReturnReasonEntity content;
     * 3、Channel channel:当前传输数据的通道
     * <p>
     * Queue:可以很多人都来监听,只要收到消息,队列删除消息,而且只能有一个收到此消息
     * 1)、订单服务启动多个:同一个消息,只能有一个客户端收到
     * 2)、只有一个消息完全处理完,方法运行结束,我们就可以接收到下一个消息
     */
     @RabbitListener(queues = "hello-java-queue")
    //这个类的这个方法才能接受hello-java-queue消息
    //@RabbitHandler  //类上加注解@RabbitListener(queues = "hello-java-queue")
    public void receiveMessage(Message message, OrderReturnReasonEntity content, Channel channel) 

        //拿到消息体
//        byte[] body = message.getBody();
        //拿到消息头
//        MessageProperties properties = message.getMessageProperties();

        System.out.println("接收到消息:" + content);

        //消息处理完 手动确认  deliveryTag在Channel内按顺序自增
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("deliveryTag->" + deliveryTag);

        try 
            if (deliveryTag % 2 == 0) 
                //确认签收 队列删除该消息 false非批量模式
                channel.basicAck(deliveryTag, false);
             else 
                //拒收退货 第三个参数 -> true:重新入队 false:丢弃
                channel.basicNack(deliveryTag, false, true);
            
         catch (IOException e) 
            //网络中断
        
    

    //    @RabbitHandler
    //public void receiveMessage2(OrderEntity content) 

       // System.out.println("接收到消息:" + content);
    //

以上是关于RabbitMQ(简介概念安装和springboot整合)的主要内容,如果未能解决你的问题,请参考以下文章

一文搞懂 RabbitMQ 的重要概念以及安装

RabbitMQ从概念到使用从Docker安装到RabbitMQ整合Springboot1.5w字保姆级教学

RabbitMQ安装教程

RabbitMQ简介RabbitMQ 特点AMQP 概念生产者和消费者 Broker 服务节点Queue 队列Exchange 交换器如何保证消息的可靠性?

RabbitMQ

RabbitMQ基本概念和Linux安装配置RabbitMQ