分布式消息队列--RabbitMQ

Posted Jafeiyn

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式消息队列--RabbitMQ相关的知识,希望对你有一定的参考价值。

消息队列MQ

MQ优劣势

1.优势
  • 应用解耦:提高系统容错性和可维护性
  • 异步提速:提升用户体验和系统吞吐量
  • 削峰填谷:提高系统稳定性
2. 劣势
  • 引入MQ提高了系统复杂度,需考虑如何保证消息不被丢失的情况
  • 引入MQ降低了系统可用性,需考虑如何保证MQ的高可用

常见MQ对比

RabbitMQ

RabbitMQ是一款基于AMQP协议(高级消息队列网络协议)标准、采用Erlang语言开发的消息中间件,基于AMQP协议的客户端与中间件传递消息不受两端的不同产品不同开发语言的条件限制。

核心概念

  1. Broker:即rabbitmq-server,核心消息处理中心,负责接收与分发消息,内部包含很多个不同租户的虚拟主机
  2. Virtual Hosts:虚拟主机,类似数据库的概念,用于区分多租户的数据隔离,内部包含交换机和对应绑定的队列
  3. Connection:消息发送者或消息接收者与Broker之间的TCP(长)连接
  4. Channel:消息通道,是一种在Connection内部建立的逻辑连接,客户端程序的每个线程对应一个channel,channel之间是完全隔离的,AMQP Method包含了channel id,当有客户端要与Broker建立连接时,会通过channel id识别出对应channel进行数据传输,这样Channel作为轻量级的连接极大减少了系统建立TCP连接的开销
  5. Exchange:交换机,内部有消息分发规则,消息从发送端发到Broker的Exchange,再根据分发规则匹配查询表中的路由key将消息分发到绑定的队列中(常见的交换机类型有direct,topic,fanout)
  6. Queue:队列,实际存储消息的临时容器
  7. Binding:绑定,即交换机与队列之间的虚拟连接,Binding中可以包含路由key,Binding信息保存到交换机的查询表中用于消息分发

核心原理

消息可靠性(消息防丢失)
  1. 持久化(exchange持久化、queue持久化、消息持久化)
  2. 消息发送者Confirm确认
  3. 消息接收者Ack确认
  4. Broker高可用
延迟队列

消息进入队列后不会立即被消费,只有到达指定时间后才会被消费;
RabbitMQ默认不支持延迟队列
实现方式:定时器、延迟队列(TTL+死信队列)
应用场景:商品下单后30min未支付则取消订单回滚交易、用户注册7days后短信问候等

死信队列
  • 死信交换机和死信队列和普通的没有区别
  • 当消息成为死信后,若该队列绑定了死信交换机,则消息会被死信交换机路由到死信队列
  • 消息成为死信的三种情况:
  1. 队列消息长度超过限制
  2. 消费者不消费消息,并且不重回队列
  3. 消息到达超时时间未消费
消息重复消费

避免消息被重复消费,需要实现消息幂等性保障方案:

  1. 对消息接收者使用数据库乐观锁机制
  2. redis原子性操作setnx实现(消息若需要入库则需要考虑如何保证缓存与数据库操作的原子性;若不需要入库则需要考虑定时同步方案,或者双缓存异步同步到缓存/数据库方案,注意要有一个回调函数保证最终一致性)
消息积压

使用工作队列模式,增加消息接收者和相应队列消费消息,将消息先批量取出来记录数据库,然后慢慢处理

五种工作模式

  1. 简单队列模式

    一个消息发送者对应一个消息接收者

  2. 工作队列模式

    一个消息发送者对应多个消息接收者,队列中的每个消息只能被一个消息接收者消费,适用轮询分发消息的场景

  3. fanout发布订阅模式

    一个消息发送者可能对应多个消息接收者;
    消息从发送者发到Broker后首先进入到交换机(X)中,X根据分发规则匹配查询表中的路由key将消息分发给bingding的队列,消息接收者监听对应的队列消费消息(交换机的类型主要有四种:direct, fanout, topic, headers)

  4. direct路由模式

    路由模式是以指定的完整路由key将消息发送者发送的消息发送给指定消息接收者;
    消息发送者携带指定路由key将消息发送到direct交换机,然后通过交换机的路由匹配规则转发到binding的队列中,消息接收者监听指定队列消费消息

  5. topic主题模式(通配符模式)

    topic模式即通配符匹配路由key,通配符包含“*”和“#”,星号代表匹配一个单词,而#代表匹配零个、一个或多个单词;
    消息发送者携带指定路由key将消息发送到topic交换机,然后通过交换机的通配符匹配规则转发到binding的队列中,消息接收者监听指定队列消费消息

单机搭建部署

安装环境:CentOS-7.*

  1. 安装依赖环境

进入/usr/local目录
cd /usr/local

yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz

  1. 安装Erlang
    上传已下载好的rabbitmq-server-3.7.18-1.el7.noarch.rpm, erlang-22.0.7-1.el7.x86_64.rpm

rz 选择要上传的文件

安装erlang
rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm

  1. 安装依赖环境socat

yum install -y socat

  1. 安装rabbitmq-server

rpm -ivh rabbitmq-server-3.7.18-1.el7.noarch.rpm

  1. 开启管理界面及配置

启用管理界面
rabbitmq-plugins enable rabbitmq_management

修改默认配置,如可以修改密码,修改loopback_users等配置
vi /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.18/ebin/rabbit.app

修改rabbit.app配置文件中loopback_users,将其值由[<<“guest”>>]改为[guest]后保存并退出即可(注:此操作用于设置浏览器访问rabbitmq控制台界面的用户名)

:wq

  1. 启动rabbitmq

service rabbitmq-server start (启用服务)

service rabbitmq-server stop(停止服务)
service rabbitmq-server restart(重启服务)

  1. 验证

service rabbitmq-server status(查看服务状态)

rabbitmq-server服务启动后,浏览器访问 http://192.168.126.133:15672,输入用户名密码(都是guest)即可进入控制台界面

基本使用操作
  1. 新建用户

  2. 新建虚拟主机(虚拟主机可以理解为数据库)

  3. 虚拟主机关联用户
    点击已添加的虚拟主机jeffrey_host,选择要授权的用户并配置其相关权限规则,然后点击Set permission.


  4. 新建队列(队列可以理解为数据表)

  5. 新建交换机

SpringBoot整合RabbitMQ(注:以下示例为topic模式,其他模式相应调整下对应的配置类即可)
  1. 引入依赖
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 修改application.yml
server:
  port: 8080

#rabbitmq singleton config
spring:
  application:
    name: rabbitmq-demo
  rabbitmq:
    host: 192.168.126.134
    port: 5672
    username: jeffrey
    password: zfy
    virtual-host: jeffrey_host
    topicExchange: jeffrey_topic_exchange
    topicQueue1: jeffrey_topic_queue1
    topicQueue2: jeffrey_topic_queue2
  1. 添加配置类
package com.itjeffrey.rabbitmq.test.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * topic模式配置(有路由key, 模糊匹配,有交换机)
 * @From: Jeffrey
 * @Date: 2022/11/19
 */
@Configuration
public class TopicConfig 

    @Value("$spring.rabbitmq.topicExchange")
    private String exchange;

    @Value("$spring.rabbitmq.topicQueue1")
    private String queue1;

    @Value("$spring.rabbitmq.topicQueue2")
    private String queue2;

    //声明交换机
    @Bean
    public Exchange topicExchange()
        return ExchangeBuilder.topicExchange(exchange).durable(false).build();
    

    //声明队列
    @Bean
    public Queue topicQueue1()
        return QueueBuilder.nonDurable(queue1).build();
    

    @Bean
    public Queue topicQueue2()
        return QueueBuilder.nonDurable(queue2).build();
    

    //队列绑定交换机,并指定路由key
    @Bean
    public Binding topicBinding1()
        return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("TR.*").noargs();
    

    @Bean
    public Binding topicBinding2()
        return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("TR.#").noargs();
    

  1. 消息发送者
package com.itjeffrey.rabbitmq.test.service;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

/**
 * @From: Jeffrey
 * @Date: 2022/11/19
 */
@Service
public class Producer 

    @Value("$spring.rabbitmq.topicExchange")
    private String topicExchange;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //topic模式
    @Scheduled(cron = "0/10 * * * * ?")
    public void send5()
        String msg = "test message";
        rabbitTemplate.send(topicExchange, "TR.A.B",
                new Message(msg.getBytes(StandardCharsets.UTF_8), new MessageProperties()));
        System.out.println("send topic msg!");
    

  1. 消息接收者
package com.itjeffrey.rabbitmq.test.service;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/**
 * @From: Jeffrey
 * @Date: 2022/11/19
 */
@Service
public class Consumer 

    @RabbitListener(queues = "jeffrey_topic_queue1")
    public void listen1(Message message)
        System.out.println("receive topic_queue1 msg -> " + new String(message.getBody()));
    

    @RabbitListener(queues = "jeffrey_topic_queue2")
    public void listen2(Message message)
        System.out.println("receive topic_queue2 msg -> " + new String(message.getBody()));
    
    

  1. 控制台输出

集群搭建部署

企业集群部署一般采用一个HAProxy负载均衡器代理和两个RabbitMQ-Server服务分别部署在三个不同服务器节点的方式;也可以采用HAProxy+KeepAlived一主一备作为负载均衡代理集群和三台RabbitMQ-Server服务器集群的方式增强MQ的高可用性

企业级分布式集群部署步骤如下:

  1. 环境准备:修改两台rabbitmq服务器的主机名s1, s2;
    使用命令 “hostnamectl set-hostname 主机名” 修改各服务器的主机名

hostnamectl set-hostname s1
hostnamectl set-hostname s2

  1. 同步.erlang.cookie文件:s1中执行将s1中的.erlang.cookie文件同步到s2中

#s2可以替换为ip
scp /var/lib/rabbitmq/.erlang.cookie s2:/var/lib/rabbitmq/.erlang.cookie

  1. 添加集群节点:s2中执行重启rabbitmq服务,加入集群

#stop_app禁止rabbitmq接收外部请求
rabbitmqctl stop_app
rabbitmqctl join_cluster --ram rabbit@s2
#start_app允许rabbitmq接收外部请求
rabbitmqctl start_app
rabbitmq-plugins enable rabbitmq_management
systemctl restart rabbitmq-server.service

  1. 查看集群信息

rabbitmqctl cluster_status

  1. 在第三台服务器上安装启动HAProxy

#安装haproxy
yum install haproxy
#修改haproxy.cfg配置文件
vi /etc/haproxy/haproxy.cfg

#添加配置
#监听rabbitmq集群
listen rabbitmq_cluster
#通过端口5672映射s1,s2
bind 0.0.0.0:5672
#记录tcp连接状态和连接时间
option tcplog
#四层协议代理,对tcp协议转发
mode tcp
#开启tcp的keep alive长连接模式
option clitcpka
#haproxy与rabbitmq-server的连接超时时间
timeout connect 1s
#client端与haproxy最大空闲时间
timeout client 10s
#server端与haproxy最大空闲时间
timeout server 10s
#轮询转发消息
balance roundrobin
#每5s发一次心跳包,连续两次有响应则表示连接状态良好,连续三次无响应则表示服务故障该节点将被踢除
server node1 192.168.126.134:5672 check inter 5s rise 2 fall 3
server node2 192.168.126.133:5672 check inter 5s rise 2 fall 3
#开启haproxy前端监控服务
listen http_front
#绑定监控服务端口1080
bind 0.0.0.0:1080
#每30s刷新一次监控统计页面
stats refresh 30s
#指定监控统计页面uri
stats uri /haproxy_stats
#指定监控统计页面的访问用户名和密码
stats auth admin:admin

:wq
#启动haproxy
systemctl start haproxy
#查看haproxy状态
systemctl status haproxy.service

  1. 访问如下地址监控RabbitMQ集群节点

http://haproxy服务器IP:1080/haproxy_stats

  1. 代码连接RabbitMQ集群地址改为:haproxy服务器IP:5672

(注:本篇文章部分图例引用自网络)

以上是关于分布式消息队列--RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章

rabbitmq~消息失败后重试达到 TTL放到死信队列(事务型消息补偿机制)

SpringBoot下RabbitMQ的实战应用:动态创建和动态监控队列死信备份交换机

.Net Core&RabbitMQ死信队列

RabbitMQ学习教程二(交换机,死信队列)

RabbitMQ死信队列

分布式消息通信之RabbitMQ_01