RabbitMQ 超详细入门篇

Posted Java.慈祥

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ 超详细入门篇相关的知识,希望对你有一定的参考价值。

RabbitMQ 入门篇🚪

MQ 的基本概念:

什么是 MQ ?

MQ全称为Message Queue即消息队列

  • "消息队列" 是在消息的传输过程中保存消息的容器

  • 它是典型的:生产者————消费者模型

    生产者不断向消息队列中生产消息 ———————— 消费者不断的从队列中获取消息.

    这样的好处: 生产者只需要关注发消息,消费者只需要关注收消息,二者没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦.

为什么要使用 MQ?

或者说MQ 有什么好处,MQ 主要可以实现三种功能:

服务解耦

  • 场景:服务A产生数据, 而服务B,C,D需要这些数据

    那么我们可以在A服务中直接调用B,C,D服务,把数据传递到下游服务即可;

  • 随着我们的应用规模不断扩大,会有更多的服务需要A的数据

    如果有几十甚至几百个下游服务,而且会不断变更,再加上还要考虑下游服务出错的情况 A服务中调用代码的维护会极为困难 程序非常的耦合

  • ,通过 MQ消息队列 可以实现,对程序的 解耦

    A服务只需要向消息服务器发送消息,而不用考虑谁需要这些数据

    下游服务如果需要数据,自行从消息服务器订阅消息,不再需要数据时则取消订阅即可

流量削峰

  • 场景:

    我们有一个应用,平时访问量是每秒300请求,我们用一台服务器即可轻松应对 √

    但,在高峰期, 访问量瞬间翻了十倍, 达到每秒3000次请求, 单台服务器无法应对 我们增加到10台服务器,减压

  • 而,很多时候这种高压 每天只出现一次,每次只有半小时

    那么我们10台服务器在多数时间都只分担每秒几十次请求,这样就有点浪费资源了

  • 使用MQ来进行流量削峰

    我们可以对于这种,可能会突然产生高请求的功能,设置一个MQ

    当用户发起请求后台并不会立刻处理,而是通过 MQ 发送一个请求,发送到队列里面,排队等待处理…

    我们的后台,接收者,发现队列中有消息,一个一个的取出,进行后台处理… 避免了同一时刻大量的请求,而处理不过来导致 服务崩溃~

异步调用

  • 场景:

    对于有些服务之间的调用会有很长时间的响应,而用户并不能接受这么时间的响应:

    A 调用 B,B 需要花费很长时间执行,但是 A 需要知道 B 什么时候可以执行完给页面响应…

  • 外卖支付

    相信大家都点过外卖,用户支付完成之后,到真正外卖到手是一个很漫长复杂的过程~ 我们不可能一直停留在页面上进行等待~

    支付后————发送支付成功的通知————再寻找外卖小哥来进行配送…

    而寻找外卖小哥的过程非常耗时,高峰期,可能要等待几十秒甚至更长,这样就造成整条调用链路响应非常缓慢

  • MQ解决方案:

    用户下单,订单数据可以发送到消息队列服务器,立刻响应客户端 为您寻找骑手,整条链路的响应时间只有200毫秒左右

    消息接收方,监听获取每一个订单消息后台缓慢的寻找外卖小哥~

AMQP 和 JMS

AMQP 和 JMS 是目前市面上常见的两种 消息队列协议

AMQP

  • AMQP 高级消息队列协议!

    是应用层协议的一个开放标准,为面向消息的中间件设计,兼容 JMS RabbitMQ 就是基于 AMQP 协议实现的

JMS

  • JMS Java 消息服务

    JMS的客户端之间可以通过JMS服务进行异步的消息传输

  • JMS(Java Message Service,Java消息服务)API是一个消息服务的标准或者说是规范

    就像JDBC一样通过接口定义一组规范,不同的实现尝试实现对于的驱动来完成开发...

    它使分布式通信耦合度更低,消息服务更加可靠以及异步性。 ActiveMQ 就是基于 JMS 规范实现的

总结:

规范:

  • AMQP 为消息定义了线路层(wire-level protocol)的协议

  • JMS所定义的是API规范

跨平台

  • Java 体系中,多个client均可以通过JMS进行交互,不需要应用修改代码,但是其对跨平台的支持较差
  • AMQP天然具有跨平台、跨语言特性

支持消息类型

  • JMS 支持TextMessage、MapMessage 等复杂的消息类型
  • AMQP 仅支持 byte[] 消息类型(复杂的类型可序列化后发送

Exchange 交换机提供的路由算法

  • AMQP可以提供多样化的路由方式来传递消息到消息队列 4种交换机类型,6种模式
  • JMS 仅支持 队列 和 主题/订阅 方式两种

常见MQ产品:

  • ActiveMQ:基于JMS,早期的MQ框架,现在已经很少使用了
  • Kafka:分布式消息系统,高吞吐量
  • RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好 本篇学习😶
  • RocketMQ:基于JMS,阿里巴巴产品,目前交由Apache基金会

RabbitMQ

  • 官方地址

  • RabbitMQ是由erlang语言开发,所以安装环境需要安装 erlang

  • 基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列

  • 它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛

RabbitMQ 的工作原理

组成部分:

Producer 消息生产者

  • 消息生产者,即生产方客户端,生产方客户端将消息,通过信道Channel发送到MQ

Connection 连接对象

  • Producer /Consumer 和 broker 之间的 TCP 连接

    程序通过,Connection连接对象将,创建出Channel信道生产者通过 信道 将消息发送给MQ 消费者通过 信道 获取到MQ的消息~

  • Channel 信道:

    如果每一次访问 RabbitMQ 都建立一个 Connection,消息量大的时候,对于性能也是巨大的;

    Channel 是在 connection 内部建立的逻辑连接,为 Connection 减少了操作系统建立 TCP connection 的开销; 细节不详细介绍

    可以理解为是一个,消息数据传递的一个通到

    可以通过它,来创建配置,生产者|消费者 与MQ通信 声明设置绑定:交换机|队列

Broker 可以认为是 MQ

  • 消息队列服务进程此进程包括两个部分:Exchange交换机和Queue队列

  • Exchange交换机

    是 RabbitMQ 非常重要的一个部件

    一方面它接收来自生产者的消息,另一方面它将消息 推送到队列中

  • Queue 队列

    RabbitMQ 内部使用的一种数据结构队列 队列就像是一个“吸管” 一边吸水一边出水,遵循 “先进先出”原则;

    生产者发消息——交换机——转发到队列上 是真正消息存储的地方~

Consumer 消息消费者

  • 消息消费者,即消费方客户端,通关信道Channel接收MQ转发的消息,并进行相关的处理;

-----发送消息-----

  • 生产者通过 Connection 和Broker建立TCP连接。
  • Connection 建立 Channel 通道
  • 生产者通过信道,将消息发送给Broker(MQ),由Exchange将消息进行转发~ 队列中去!

-----接收消息-----

  • 消费者通过 Connection 和Broker建立TCP连接
  • Connection 建立 Channel 通道
  • 消费者监听指定的Queue(队列),当有消息到达Queue时Broker默认将消息,通过 Channel 推送给消费者

Exchange 交换机四种类型

RabbitMQ消息传递模型的核心思想是:

  • 生产者永远不会将任何消息直接发送到队列,通常生产者甚至不知道消息是否会被传递到任何队列 生产者只能向交换机(Exchange)发送消息

  • 交换机是一个非常简单的东西,一边接收来自生产者的消息,另一边将消息推送到队列.

  • RabbitMQ 的交换机具有很多中类型,可以完成很多种复杂的场景操作:

交换机类型:

  • fanout: 广播模式发布/订阅,交换机给所有的队列,发送相同的消息;

  • direct : 路由模式routing key 交换机,根据对应的 routing key 的队列上发送消息;

  • topic: 动态路由模式,可以用过一定的规则定义 roting key 使 交换机动态的多样性选择 队列

    * 表示一个单词

    # 表示任意数量(零个或多个)单词

  • headers: 请求头模式,目前用的很少了,就像请求头一样,发送消息时候附带头部数据,交换机根据消息的头部信息匹配对应的队列;

RabbitMQ环境搭建

本次搭建是Linux 的 如果有朋友是Win的话可以参考这篇文章:🚀

工具准备🔨:

RabbitMQ是由erlang语言开发,所以安装环境需要安装 erlang

  • erlang-21.3.8.21-1.el7.x86_64.rpm erlang环境
  • rabbitmq-server-3.8.8-1.el7.noarch.rpm rabbit安装

官网下载,如果没有的话也可以底部本人的网盘下载

环境搭建🏚:

本人使用的是 阿里云服务器 没有的话也可以使用虚拟机… 事先使用连接工具上传了文件

本人喜欢把工具都安装在 /usr/wsm 目录下:

[root@iZj6ciuzx7luldnazt4iswZ ~]# cd /
[root@iZj6ciuzx7luldnazt4iswZ /]# ls
bin   dev  home        lib    lost+found  mnt  patch  root  sbin  sys  usr  www
boot  etc  install.sh  lib64  media       opt  proc   run   srv   tmp  var
[root@iZj6ciuzx7luldnazt4iswZ /]# cd usr
[root@iZj6ciuzx7luldnazt4iswZ usr]# ls
bin  etc  games  include  lib  lib64  libexec  local  sbin  share  src  tmp
[root@iZj6ciuzx7luldnazt4iswZ usr]# mkdir wsm
[root@iZj6ciuzx7luldnazt4iswZ usr]# ls
bin  etc  games  include  lib  lib64  libexec  local  sbin  share  src  tmp  wsm
[root@iZj6ciuzx7luldnazt4iswZ usr]# cd wsm
[root@iZj6ciuzx7luldnazt4iswZ wsm]# ls
erlang-21.3-1.el7.x86_64.rpm  rabbitmq-server-3.8.8-1.el7.noarch.rpm					#上传的两个文件

解压安装:

# 解压安装 erlang
rpm -ivh erlang-21.3.8.21-1.el7.x86_64.rpm
# 云下载一个 初始化一些配置, 过程比较慢请耐心等待~, 在这之后才可以进行 安装 RabbitMQ
yum install socat -y
# 解压安装 rabbitmq
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm

ok ,安装完毕了解一些 RabbitMQ 命令:

# 启动服务
systemctl start rabbitmq-server
# 查看服务状态
systemctl status rabbitmq-server
# 开机自启动
systemctl enable rabbitmq-server
# 停止服务
systemctl stop rabbitmq-server
# 重启服务
systemctl restart rabbitmq-server

注意:这里只是把RabbitMQ 服务给搭建好了,为了方便操作我们还需要安装一个web控制面板

# 安装web控制面板
rabbitmq-plugins enable rabbitmq_management

# 安装完毕以后,重启服务即可
systemctl restart rabbitmq-server

# 访问 http://服务器ip:15672 ,用默认账号密码(guest)登录,出现权限问题
# 默认情况只能在 localhost 本机下访问,所以需要添加一个远程登录的用户
# 创建账号和密码: admin 123456
rabbitmqctl add_user admin 123456
# 设置用户角色,用户级别: administrator monitoring policymaker managment
rabbitmqctl set_user_tags admin administrator
# 为用户添加资源权限
# set_permissions [-p <vhostpath>] <user> <conf> <write> <read>  # 添加配置、写、读权限
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

##### 扩展一些命令:#####
关闭应用的命令为:  rabbitmqctl stop_app
清除的命令为:		 rabbitmqctl reset
重新启动命令为:	rabbitmqctl start_app
  • 如果是阿里云的服务器 别忘记开启端口 还有 关闭防火墙~

用户级别:

  1. administrator:可以登录控制台、查看所有信息、可以对 rabbitmq 进行管理
  2. monitoring:监控者 登录控制台,查看所有信息
  3. policymaker:策略制定者 登录控制台,指定策略
  4. managment:普通管理员 登录控制台

主要端口介绍:阿里云建议将这些都打开~

  1. 4369 – erlang发现口

  2. 5672 – client端通信口

  3. 15672 – 管理界面ui端口

  4. 25672 – server间内部通信口

测试是否可以访问:

访问页面:

  • Overview

    概览 RabbitMQ 的整体情况,也可以查看集群各个节点的信息 情况 MQ 各个端口映射信息

  • Connection

    该 选项专栏 下是MQ 与各个 生产者 消费者 连接情况.

  • Channels

    这里展示,各个 通道 与 连接的关系

  • Exchanage

    展示所有的 交换机

  • Queue

    展示所有的 队列

  • Admin

    这里管理着,MQ 所有的操作用户~

RabbitMQ 管理页面:

Overview

Connections

  • Name 连接名 点击连接名, 还可以查看详细的信息~
  • User name 当前连接登录MQ 的用户
  • State 当前连接的状态,running 运行 idle 空闲
  • SSL|TLS 是否使用的是 SSL|TLS协议
  • Peotocol AMQP 0-9-1 指的是AMQP 的协议版本号
  • Channels 当前连接创建通道的 通道总数
  • From client 每秒发出的消息数
  • To client 每秒接收的消息数

Channels

记录各个连接的信道:

一个连接IP 可以有多个信道 多个通道通过多线程实现,不相互干扰 我们在 信道中创建:队列 交换机 ...

生产者的通道一般使用完之后会立马关闭,消费者是一直监听的…

  • Channel 通道名称

  • User Name 该通道,创建者 用户名

  • Model 通道的确认模式 C confirm模式 T 表示事务

  • State 通道当前的状态 running 运行 idie 空闲

  • Unconfirmed 待确认的消息数

  • Prefetch 预先载入

    Prefetch 表示每个消费者最大的能承受的未确认消息数目

    简单来说就是用来指定一个消费者一次可以从 RabbitMQ 中获取多少条消息并缓存在消费者中,

    一旦消费者的缓冲区满了,RabbitMQ 将会停止投递新的消息到该消费者中直到它发出有消息被 ack 了

    消费者负责不断处理消息,不断 ack,然后只要 UnAcked 数少于 Prefetch * consumer 数目,RabbitMQ 就不断将消息投递过去

  • Unacker 待 ack 的消息数

  • publish 消息生产者发送消息的 速率

  • confirm 消息生产者确认消息的 速率

  • unroutable drop 表示消息,未被接收,且已经删除的消息.

  • deliver / get 消息消费者获取消息的 速率

  • ack 消息消费者 ack 消息的速率. MQ 的 ACK机制:100%消息消费!

Exchange

Queue

  • Name 表示消息队列的名称
  • Type 消息队列的类型…
  • Features:表示消息队列的特性,D 表示消息队列持久化
  • State:表示当前队列的状态,running 表示运行中;idle 表示空闲
  • Ready:表示待消费的消息总数
  • Unacked:表示待应答的消息总数
  • Total:表示消息总数 Ready+Unacked
  • incoming:表示消息进入的速率
  • deliver/get:表示获取消息的速率
  • ack:表示消息应答的速

Admin

Java 集成 RabbitMQ 案例

创建一个Maven项目并使用 git 进行管理, wlog.md文件进行着项目日志的记录✍~

引入RabbitMQ 的依赖:

pom.xml

<dependencies>
    <!-- rabbitMQ 依赖 -->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.8.0</version>
    </dependency>
</dependencies>

简单模式 Hello Word:

如图,显而易见,非常简单就是一个一发一读 的过程…

  • P:生产者,也就是要发送消息的程序
  • C:消费者:消息的接受者,会一直等待消息到来。
  • queue:消息队列,图中红色部分。可以缓存消息;生产者向其中投递消息,消费者从其中取出消息

发送者

Producer.Java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/** 消息生产者 **/
public class Producer 
    // 定义队列名称
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException 
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("47.243.109.199");
        factory.setUsername("admin");
        factory.setPassword("123456");
        //channel 实现了自动 close 接口 自动关闭 不需要显示关闭
        //创建连接对象
        Connection connection = factory.newConnection();
        //根据连接对象,获取信道
        Channel channel = connection.createChannel();

        /**设置消息队列的属性!
         *  queue       :队列名称
         *  durable     :是否持久化 如果持久化,mq重启后队列数据还在! (队列是在虚拟路径上的...)
         *  exclusive   :队列是否独占此连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
         *  autoDelete  :队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
         *  arguments   :队列参数 null,可以设置一个队列的扩展参数,需要时候使用!比如:可设置存活时间
         * */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        /**发送消息,参数:
         * exchange     :指定的交换机,不指定就会有默认的....
         * routingKey   :路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机routingKey设置为队列的名称
         * props        :消息包含的属性: 后面介绍,可以是一个一个对象... 消息持久化配置...
         * body         :发送的消息,AMQP以字节方式传输...
         * */
        channel.basicPublish("", QUEUE_NAME, null, "Hello Word你好世界".getBytes());
        System.out.println("消息发送完毕");
    

消费者

Consumer.Java

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/** 消息消费者 **/
public class Consumer 
    // 定义队列名称
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("47.243.109.199");
        factory.setUsername("admin");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        System.out.println("等待接收消息.........");
        //收到消息后用来处理消息的回调对象
        DeliverCallback deliverCallback = (consumerTag, delivery) -> 
            String message = new String(delivery.getBody());
            System.out.println(message);
        ;

        //取消消费的一个回调接口 如在消费的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag) -> 
            System.out.println("消息消费被中断");
        ;
       	/**
         * 消费者消费消息 - 接受消息
         * queue            消费哪个队列
         * autoAck          消费成功之后是否要自动应答 true 代表自动应答 false 手动应答,要通过编程实现回复验证,这就是Unacked 为返回ack的数据
         * deliverCallback  消费方法,当消费者接收到消息要执行的方法, 参数是一个函数式接口可以使用 lambda表达式~
         * cancelCallback   消息被取消时的回调
         */
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    

  • 建议先启动 消费者 在启动 发送者

    可以看到,消费者启动之后,在等待 发生者 发送消息,发送者启动发送消息,消费者控制台会立刻接收到消息!

  • MQ 发送者一般情况下都不会直接忘队列发消息 这种情况下MQ 都会有一个默认的交换机~

工作模式 Work Queues

工作模式 相当于 简单模式的 升级版!

  • 多个消费者,对应一个发送者,发送者 产生的消息存在队列种,队列会以复杂均衡形式 轮询的发送给多个消费者

一般应用于:发送方事务简单,接收方事务复杂…

  • 美团外卖:用户下单——后台内部要联系商家 骑手 生产订单 处理...

抽取工作类:

因为上面示例我们知道,创建交换机|队列 需要Channel信道 交换机 队列是创建在信道里面的

  • 而每次创建交换机的时候,都要创建一次 Connection Channel
  • 于是我们可以将它抽离出一个工具类 MQChannelUtil.Java

MQChannelUtil.Java

  • com.wsm目录下创建一个 util包专门用来存储工具类🛠
import com.rabbitmq.client.Channel; //导入MQ的包~
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/** RabbitMQ 连接配置类: **/
public class MQChannelUtil 
    //得到一个连接的 channel
    public static Channel getChannel() throws Exception 
        //创建一个连接工厂, 设置连接: IP 端口 用户 密码
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("42.192.149.71");
//      factory.setPort("设置对应的端口,默认就是: 5672");
        factory.setUsername("admin");
        factory.setPassword("123456");
        //创建连接对象 信道对象
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    

发送者

Producer.Java

import com.rabbitmq.client.Channel;
import com.wsm.Util.MQChannelUtil;
import java.util.Scanner;

/** 消息生产者 **/
public class Producer 
    // 定义队列名称
    private final static String QUEUE_NAME = "Word";

    public static void main(String[] args) throws Exception 
        // 工具类创建一个信道
        Channel channel = MQChannelUtil.getChannel();
        // Java控制台测试法消息:
        Scanner scanner = new Scanner(System.in);
        //创建交换机
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        // 循环多次发布消息:
        while (scanner.hasNext())
            String message = scanner.next();
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("消息发送完毕");
        
    

消费者1

Consumer1.Java

import com.rabbitmq.client.*;
import com.wsm.Util.MQChannelUtil;

/** 消息消费者 **/
public class Consumer1 
    // 定义队列名称
    private final static String QUEUE_NAME = "Word";

    public static void main(String[] args) throws Exception 
        // 工具类创建一个信道
        Channel channel = MQChannelUtil.getChannel();

        //收到消息后用来处理消息的回调对象
        DeliverCallback deliverCallback = (consumerTag, delivery) -> 
            String message = new String(delivery.getBody());
            System.out.println(message);
        ;

        //取消消费的一个回调接口 如在消费的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag) -> 
            System.out.println("消息消费被中断");
        ;

        /** 消费者消费消息 - 接受消息: 注意参数两个回调函数~ */
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    

消费者2

Consumer2.Java 和消费者1 一模一样换一个名字,两个消费者监听一个队列 进行数据处理....

结果测试:

消息被轮询消费

  • 通过程序执行发现生产者总共发送 4 个消息,消费者 1 和消费者 2 分别分得两个消息,并且是按照有序的一个接收一次消息
  • MQ 发送消息,一般情况下只会被一个 消费者执行消费 消费者执行之后, 队列就会将消息删除,(ACK机制...
  • 后面可以通过,交换机模式完成,一个消息被多个消费者消费…

消息确认接收机制 ACK

消息一旦被消费者接收,队列中的消息就会被删除

RabbitMQ怎么知道消息被接收了呢?

  • 如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败!

  • 但是RabbitMQ无从得知,这样消息就丢失了!

因此,RabbitMQ有一个ACK机制

  • 当消费者获取消息后,会向RabbitMQ发送回执ACK 告知消息已经被接收。不过这种回执ACK分两种情况:
  • 自动ACK: 消息一旦被接收,消费者自动发送ACK
  • 手动ACK: 消息接收后,不会发送ACK,需要手动调用

自动ACK

  • RabbitMQ 默认此种模式:

    消息发送后立即被认为已经传送成功! 消费者 接收到消息,就向队列发送ack,队列立刻就删除消息

  • 这种模式需要在高吞吐量和数据传输安全性方面做权衡 仅适用在消费者可以高效并以 某种速率能够处理这些消息的情况下使用

手动ACK

  • 消息接收后,不会发送ACK,需要手动代码进行调用 待消费者 执行完毕之后,在通过代码向 队列发送ack,队列接收到ack 之后会将消息删除!

  • channel.basicAck(long deliveryTag,boolean multiple); 用于肯定确认

    RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了

    参数1 long类型,表示处理的消息标识,MQ没发送一个消息都一个对于该消息的唯一标识… 就像序列化 序列号一样,用于网络传输..

    参数2 boolean类型,表示是否支持批量处理

  • channel.basicNack(deliveryTag, false, true); 用于否定确认, 消费者 消息执行过程中失败,或服务器挂机…

    参数1 同上,消息的唯一标识

    参数2 表示是否支持批量处理

    参数3 requeue true则重新入队列 false丢弃或者进入死信队列

  • channel.basicReject(deliveryTag, true); 用于否定确认

    参数1 同上

    参数2 requeue true则重新入队列 false丢弃或者进入死信队列

    与 Channel.basicNack 相比少一个参数,不可以进行批量处理…

Multiple 批量消息处理:

  • true 代表批量应答处理

    比如,现在队列上存在 1 2 3 4 四个消息,都发送给了消费者,而消费者逐一处理,4 结束了.

    不管是否 ACK|NACK 都直接将,其它的 1 2 3 都以相同的,方式进行 批量处理!

    好处:在MQ 服务,稳定的时候,支持大量的消息处理速度… 缺点,容易造成数据丢失💀...

  • flase 建议使用,不批量应答

    就是, 一次只处理当前消息的 ACK|NACK

消息自动重新入队

消费者设置了手动ACK 之后....

如果消费者由于某些原因失去连接 其通道已关闭,连接已关闭或 TCP 连接丢失 导致消息未发送 ACK 确认

  • 消费者监听 队列消息,消费者开始处理,但是处理过程中,消费者突然与MQ 连接断开 消费者服务挂了
  • MQ 正常情况下会与 消费者建立连接,当消费者突然断开,一段时间没有返回,消息处理的 ack,MQ就会当作消费者出现故障. 将消息重新交给其它消费者处理!心跳机制♥

生产者

Producer.Java

import com.rabbitmq.clientRabbitMq高级特性之延迟队列 通俗易懂 超详细 内含案例

RabbitMQ入门案例

RabbitMQ延迟队列

RabbitMQ一文带你搞定RabbitMQ延迟队列

Rabbitmq通过死信队列实现过期监听

使用RabbitMQ的死信队列实现延迟消息