RabbitMQ 学习笔记

Posted dingwen_blog

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ 学习笔记相关的知识,希望对你有一定的参考价值。

1.中间件

中间件(Middleware)是处于操作系统和应用程序之间的软件,也有人认为它应该属于操作系统中的一部分。人们在使用中间件时,往往是一组中间件集成在一起,构成一个平台(包括开发平台和运行平台),但在这组中间件中必须要有一个通信中间件,即中间件+平台+通信,这个定义也限定了只有用于分布式系统中才能称为中间件,同时还可以把它与支撑软件和使用软件区分开来。

1.1 特点

  • 满足大量应用的需要
  • 运行于多种硬件和 OS平台
  • 支持分布计算,提供跨网络、硬件和 OS平台的透明性的应用或服务的交互
  • 支持标准的协议
  • 支持标准的接口

1.2消息中间件

利用可靠的消息传递机制进行系统和系统直接的通讯,通过提供消息传递和消息的派对机制,它可以在分布式系统环境下扩展进程间的通讯,是一种接受数据、接受请求、存储数据、发送数据等功能的技术服务。

1.3 消息组成部分

  • 协议
  • 持久化机制
  • 分发策略
  • 容错

1.4协议

  1. 计算机底层操作系统和应用程序通讯时共同遵守的一组约定,只有遵循共同的约定和规范,系统和底层操作系统之间才能相互交流2. 和一般的网络应用程序的不同它主要负责数据的接受和传递,所以性能比较的高3. 协议对数据格式和计算机之间交换数据都必须严格遵守规范。

1.5 协议组成

  • 语法:语法是用户数据与控制信息的结构与格式,以及数据出现的顺序
  • 语义:语义是解释控制信息每个部分的意义,它规定了需要发出何种控制信息,以及完成的动作与做出什么样的响应
  • 时序:时序是对事件发生顺序的详细说明

1.6 AMQP

AMQP:(全称:Advanced Message Queuing Protocol)是高级消息队列协议。由摩根大通集团联合其他公司共同设计。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。(支持者: RabbitMQ ActiveMQ)

  • 分布式事务

  • 消息的持久化

  • 高性能和高可靠

    1.6.1 生产者流转过程

    在这里插入图片描述

    1.6.2 消费者流转过程

在这里插入图片描述

1.7 MQTT

MQTT协议(Message Queueing Telemetry Transport)消息队列是 IBM开放的及时通讯协议,物联网系统架构中的重要组成部分。支持者: RabbitMQ ActiveMQ)

  • 轻量
  • 结构简单
  • 传输快
  • 不支持事务
  • 不支持持久化

1.8 OpenMessage

是近几年由阿里、雅虎和滴滴出行、Stremalio等公司共同参与创立的分布式信息中间件、流处理等领域的应用开发标准。

  • 结构简单
  • 解析速度快
  • 事务
  • 持久化

1.9 Kafka

Kafka协议是基于 TCP/IP的二进制协议。消息内部是 通过长度来分割,由一些基本数据类型组成。

  • 结构简单
  • 解析速度快
  • 无事务支持
  • 有持久化设计

2. 安装

2.1 rpm安装

https://blog.csdn.net/qq_38020915/article/details/117957570

2.2 docker安装

https://blog.csdn.net/qq_38020915/article/details/117957001

3.RabbitMQ

3.1 核心概念

  • Server:又称Broker,接收客户端连接,实现AMQP服务。
  • Connection: 应用程序和Broker的网络连接。TCPIP三次握手四次握手
  • Channel: 网络信道,对应一个会话任务,进行消息的读写
  • Message: 消息,服务与应用程序之间传送的数据,由Properties(对消息的描述)和body(消息)组成
  • Virtual Host: 虚拟地址,用于进行逻辑隔离
  • Queue: 队列
  • Exchange: 交换机,根据路由键分发到队列
  • Bindings: 交换机和队列之间的虚拟连接
  • Routing key: 路由规则

3.2 组成

在这里插入图片描述

3.3运行流程

在这里插入图片描述

3.4 消息模型

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

  • 简单模式

  • 工作模式

    • 公平分发(能者多劳)

      • producer

        跟简单模式一样

      • consumer

        • maven

          //简单模式
          public class Consumer{
          	//3.接受内容
              //指标定义出来
              channel.basicQos(1);
              channel.basicConsume("queue1",false,new DefaultConsumer(){
                  public void handle(String consumerTag, Delivery message) throws IOException {
                    System.out.println(new String("收到消息是" + new String(meassage.getBody()),"UTF-8"));
                    //改成手动应答
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
                  },new CancelCallback(){
                      public void handle(String consumerTag) throws IOException {
                          System.out.println("接受失败了");
                  }
                });
              //4.关闭
              channel.close();
              connection.close();
          }
          
        • springboot

              # TODO 公平分发 0:(默认)轮询 1:公平分发
          #    listener:
          #      direct:
          #        prefetch: 1
          #        acknowledge-mode: manual
          
          
    • 轮询

      跟简单模式一样。

  • 发布订阅模式

  • 路由模式

  • 主题模式

  • 参数模式

  • 发布确认模式

3.5 使用场景、解决问题

  • 线程池异步处理(削峰)
  • 解耦(模块分离,高内聚,低耦合)
  • 消息可靠(持久化)
  • 高可用(集群)

3.6 SpringBoot 2.x 整合使用

完整代码地址: https://gitee.com/dingwen-gitee/stu-spr-boo-rab.git

3.6.1 依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.dingwen</groupId>
        <artifactId>stu-spr-boo-rab</artifactId>
        <version>1.0-SNAPSHOT</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.dingwen.rabcon</groupId>
    <artifactId>rab-con</artifactId>
    <version>1.0-SNAPSHOT</version>
    <name>rab-con</name>
    <description>rab-con</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>

        <!--rabbitmq -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

3.6.2 配置(fanout)

package com.dingwen.rabcon.config;

import com.dingwen.rabcon.constant.RabbitConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * rabbitmq 配置类
 *
 * @author dingwen
 * 2021.06.17 10:36
 */
@Configuration
public class RabbitmqConfiguration {

    /**
     * 声明fanout模式的交换机
     *
     * @return {@link FanoutExchange}
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        // 持久化、关闭自动删除
        return new FanoutExchange(RabbitConstant.FANOUT_EXCHANGE_NAME, true, false);
    }

    /**
     * 短信任务队列
     *
     * @return {@link Queue}
     */
    @Bean
    public Queue smsQueue() {
        // 持久化、排他性队列、关闭自动删除
        return new Queue(RabbitConstant.FANOUT_QUEUE_SMS_NAME);
    }

    /**
     * 邮件任务队列
     *
     * @return {@link Queue}
     */
    @Bean
    public Queue emailQueue() {
        // 持久化、排他性队列、关闭自动删除
        return new Queue(RabbitConstant.FANOUT_QUEUE_EMAIL_NAME);
    }

    /**
     * 短信队列绑定到交换机
     *
     * @return {@link Binding}
     */
    @Bean
    public Binding smsBinding() {
        return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
    }

    /**
     * 邮箱队列绑定到交换机
     *
     * @return {@link Binding}
     */
    @Bean
    public Binding emailBinding() {
        return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
    }

}

3.6.3 producer

package com.dingwen.rabpro.service.impl;

import com.dingwen.rabpro.constant.RabbitConstant;
import com.dingwen.rabpro.service.OrderService;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

/**
 * order service impl
 *
 * @author dingwen
 * 2021.06.17 11:11
 */
@Service
public class OrderServiceImpl implements OrderService {
    private final RabbitTemplate rabbitTemplate;

    @Autowired
    public OrderServiceImpl(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @Override
    public void placeOrder() {
        rabbitTemplate.convertAndSend(RabbitConstant.FANOUT_EXCHANGE_NAME, null, UUID.randomUUID().toString());
    }
}

3.6.4 consumer

package com.dingwen.rabcon.service.impl;

import com.dingwen.rabcon.constant.RabbitConstant;
import com.dingwen.rabcon.service.EmailService;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.util.Date;


/**
 * email service impl
 *
 * @author dingwen
 * 2021.06.17 11:22
 */
@Service
@RabbitListener(queues = {RabbitConstant.FANOUT_QUEUE_EMAIL_NAME})
public class EmailServiceImpl implements EmailService {

    @Override
    @RabbitHandler
    public void sendEmail(String message) {
        System.out.println("E: message = " + message);
    }
}

3.6.5 topic匹配规则

  • “*”: 只有一个一级
  • “#”: 至少一个一级

3.7 持久化

原文地址: https://zhuanlan.zhihu.com/p/74840278

3.7.1 消息什么时候需要持久化

  • 消息本身在publish的时候就要求消息写入磁盘
  • 内存紧张,需要将部分内存中的消息转移到磁盘

3.7.2 消息什么时候会刷到磁盘

  • 写入文件前会有一个Buffer,大小为1M(1048576),数据在写入文件时,首先会写入到这个Buffer,如果Buffer已满,则会将Buffer写入到文件(未必刷到磁盘)
  • 有个固定的刷盘时间:25ms,也就是不管Buffer满不满,每隔25ms,Buffer里的数据及未刷新到磁盘的文件内容必定会刷到磁盘
  • 每次消息写入后,如果没有后续写入请求,则会直接将已写入的消息刷到磁盘:使用Erlang的receive x after 0来实现,只要进程的信箱里没有消息,则产生一个timeout消息,而timeout会触发刷盘操作

3.7.3 消息保存到磁盘个格式

消息保存于$MNESIA/msg_store_persistent/x.rdq文件中,其中x为数字编号,从1开始,每个文件最大为16M(16777216),超过这个大小会生成新的文件,文件编号加1。

3.7.4 文件何时删除

  • 当所有文件中的垃圾消息(已经被删除的消息)比例大于阈值(GARBAGE_FRACTION = 0.5)时,会触发文件合并操作(至少有三个文件存在的情况下),以提高磁盘利用率
  • publish消息时写入内容,ack消息时删除内容(更新该文件的有用数据大小),当一个文件的有用数据等于0时,删除该文件

3.7.5 存储位置

在这里插入图片描述

3.8 排他队列(Exclusive Queue)

只有自己可见的队列,即不允许其它用户访问

  • 只对首次声明它的连接(Connection)可见
  • 在其连接断开的时候自动删除(包括持久态队列)
  • 是限制连接而不是通道

如果试图在一个不同的连接中重新声明或访问(如publish,consume)该排他性队列,会得到资源被锁定的错误ESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'UserLogin2'

3.9 阻塞队列

当队列为空后者队列慢的情况下回等待,当队列有空不会空是进行操作。

4.RabbitMQ高级

4.1 过期时间

可对消息设置过期时间(TTL),表示消息在这个时间内可以被消费。消息过期后被删除或放入死信队列。可对单条消息进行设置,也可以对整个队列进行设置。如果两种方法同时使用则按照TTL小的为准。

4.1.1 消息TTL

package com.dingwen.rabpro.service.impl;

import com.dingwen.rabpro.constant.RabbitConstant;
import com.dingwen.rabpro.service.OrderService;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

/**
 * order service impl
 *
 * @author dingwen
 * 2021.06.17 11:11
 */
@Service
public class OrderServiceImpl implements OrderService {
    private final RabbitTemplate rabbitTemplate;

    @Autowired
    public OrderServiceImpl(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @Override
    public void placeOrder() {

        // 测试消息过期时间
        MessagePostProcessor messagePostProcessor = (message) -> {
            MessageProperties messageProperties = message.getMessageProperties();
            messageProperties.setContentEncoding("UTF-8");
            // 过期时间 5000 毫秒
            messageProperties.setExpiration("5000");
            return message;
        };
        // fanout
        rabbitTemplate.convertAndSend(RabbitConstant.FANOUT_EXCHANGE_NAME, null, UUID.randomUUID().toString(), messagePostProcessor);

        // direct topic
        // 同上需要指定路由key
    }
}



4.1.2 队列TTL

package com.dingwen.rabcon.config;

import com.dingwen.rabcon.constant.RabbitConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * rabbitmq 配置类
 *
 * @author dingwen
 * 2021.06.17 10:36
 */
@Configuration
public class RabbitmqConfigurationFanout {

    /**
     * 声明fanout模式的交换机
     *
     * @return {@link FanoutExchange}
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        // 持久化、关闭自动删除
        return new FanoutExchange(RabbitConstant.FANOUT_EXCHANGE_NAME, true, false);
    }

    /**
     * 短信任务队列
     *
     * @return {@link Queue}
     */
    @Bean
    public Queue smsQueue() {
        // 持久化、排他性队列、关闭自动删除
        return new Queue(RabbitConstant.FANOUT_QUEUE_SMS_NAME);
    }

    /**
     * 邮件任务队列
     *
     * @return {@link Queue}
     */
    @Bean
    public Queue emailQueue() {
        // 持久化、排他性队列、关闭自动删除

        // TTL队列测试
        Map<String, Object> args = new HashMap<>(1);
        // 整形 毫秒
        args.put("x-message-ttl", 5000);
        return new Queue(RabbitConstant.FANOUT_QUEUE_EMAIL_NAME, true, false, false以上是关于RabbitMQ 学习笔记的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ简单学习笔记

RabbitMQ学习笔记

RabbitMq学习笔记

学习笔记编程不良人老师的RabbitMQ教程的学习笔记

Python 学习笔记 - RabbitMQ

RabbitMq学习笔记