RabbitMQ消息队列笔记

Posted 知道什么是码怪吗?

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ消息队列笔记相关的知识,希望对你有一定的参考价值。

目录

RabbitMQ概念

四大核心概念

核心部分

测试"Hello World"

测试Work Queues

消息应答

自动应答

手动应答

消息应答方法

消息自动重新入队

手动应答测试

持久化

队列持久化

消息持久化

不公平分发

预取值

消息确认

开启发布确认

单个确认发布

批量确认发布

异步确认发布

处理异步未确认的消息


RabbitMQ概念

RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是 一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收, 存储和转发消息数据。

四大核心概念

生产者:产生数据发送消息的程序是生产者。

交换机:交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个由交换机类型决定。

队列:队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式。

消费者:消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。

核心部分

测试"Hello World"

接下来开始测试最基础的消息,前提是已经在Linux虚拟机当中开启了RabbitMQ服务并且在Web页面登录成功。

新建项目开启添加依赖

    <dependencies>
        <!--rabbitmq 依赖客户端-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.8.0</version>
        </dependency>
        <!--操作文件流的一个依赖-->
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.6</version>
        </dependency>
    </dependencies>

目录结构

生产者代码

public class Producer 
    // 队列名称
    public static final String QUEUE_NAME = "hello";
​
    // 发消息
    public static void main(String[] args) throws IOException, TimeoutException 
        // 创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 工厂IP连接RabbitMQ的队列
        factory.setHost("192.168.193.147");
        // 用户名
        factory.setUsername("admin");
        // 密码
        factory.setPassword("123");
        factory.setPort(5672);
        // 创建连接
        Connection connection = factory.newConnection();
        // 获取信道
        Channel channel = connection.createChannel();
        /*
         * 生成一个队列
         * 参数1:队列名称
         * 参数2:队列里面的消息是否持久化,默认情况下,消息存储在内存中
         * 参数3:该队列是否只供一个消费者进行消费,是否进行消费共享,true可以多个消费者消费,
         *        false只能一个消费者消费
         * 参数4:是否自动删除:最后一个消费者断开连接之后,该队列是否自动删除,true则自动删除,
         *        false不自动删除
         * 参数5:其他参数
         * */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 发消息
        String message = "hello world";
        /*
         * 发送一个消息
         * 参数1:发送到哪个交换机
         * 参数2:路由的key值是那个,本次是队列的名称
         * 参数3:其他参数信息
         * 参数4:发送消息的消息体
         * */
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
        System.out.println("消息发送完毕!");
    

​消费者代码

public class Consumer 
    // 队列名称
    public static final String QUEUE_NAME = "hello";
​
    // 接受消息
    public static void main(String[] args) throws IOException, TimeoutException, TimeoutException 
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.193.147");
        factory.setUsername("admin");
        factory.setPassword("123");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 声明 接受消息
        DeliverCallback deliverCallback = (consumerTag, message) -> 
            System.out.println(new String(message.getBody()));
        ;
        // 声明 取消消息
        CancelCallback cancelCallback = consumer -> 
            System.out.println("消息消费被中断");
        ;
        /*
         * 消费者接收消息
         * 参数1:表示消费哪个UI列
         * 参数2:消费成功之后,是否需要自动应答,true表示自动应答,false表示手动应答
         * 参数3:消费者成功消费的回调
         * 参数4:消费者取消消费的回调
         */
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    

先启动生产者,在启动消费者,可以在消费者一侧接收到生产者发送的信息,并且网页界面上将会显示通道信息。

测试Work Queues

工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。 相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。

工作线程之间的关系是竞争关系,当消息来到时,轮询分发消息。一个消息只能被处理一次,不能被处理多次。

什么是轮询?可以理解为按顺序分发并且循环,类似于取余操作选取哪一个线程。

工具类

public class RabbitMqUtils 
    // 得到一个连接的channel
    public static Channel getChannel() throws IOException, TimeoutException 
        // 创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.193.147");
        factory.setUsername("admin");
        factory.setPassword("123");
        Connection connection = factory.newConnection();
        com.rabbitmq.client.Channel channel = connection.createChannel();
        return channel;
    

消费者

public class Consumer 
    // 队列名称
    public static final String QUEUE_NAME = "hello";
    // 接受消息
    public static void main(String[] args) throws IOException, TimeoutException 
        Channel channel = RabbitMqUtils.getChannel();
        // 接受消息参数
        DeliverCallback deliverCallback = (consumerTag, message) -> 
            System.out.println("接受到的消息:" + new String(message.getBody()));
        ;
        // 取消消费参数
        CancelCallback cancelCallback = consumerTag -> 
            System.out.println(consumerTag + "消费者取消消费借口回调逻辑");
        ;
        // 消息的接受
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
        System.out.println("C1接收消息~");
    

生产者

public class Producer 
    // 队列名称
    public static final String QUEUE_NAME = "hello";
​
    // 发送大量消息
    public static void main(String[] args) throws Exception 
        Channel channel = RabbitMqUtils.getChannel();
        // 队列的声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 从控制台中输入消息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) 
            String message = scanner.next();
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println("发送消息完成:" + message);
        
    

测试结果

消息应答

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成 了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息以及后续发送给该消费这的消息,因为它已经挂掉了。

为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。

自动应答

消息发送后立即被认为已经传送成功,如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。

手动应答

手动应答分为:手动不确认、手动拒绝、手动确认应答三类。手动应答的好处是可以批量应答并且减少网络拥堵,建议使用手动应答。

消息应答方法

(1)Channel.basicAck(用于肯定确认)

RabbitMQ已知道该消息并且成功处理,可以将其丢弃

(2)Channel.basicNack(用于否定确认)

(3)Channel.basicReject(用于否定确认),与Channel.basicNack相比少了一个参数,不处理该消息了,直接拒绝,可以将其丢弃了。

消息自动重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。

手动应答测试

工具类

public class SleepUtils 
    public static void sleep(int second)
        try 
            Thread.sleep(1000*second);
         catch (InterruptedException _ignored) 
            Thread.currentThread().interrupt();
        
    

消费者

这里我们需要启动两个消费者,两个消费者代码只有两处不同。复制粘贴新建即可。

public class Consumer1 
    // 队列名称
    public static final String TASK_QUEUE_NAME = "ack_queue";
​
    public static void main(String[] args) throws IOException, TimeoutException 
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C1等待接受消息处理时间较短");//不同点1:C2较长
​
        DeliverCallback deliverCallback = (consumerTag, message) -> 
            // 沉睡一秒
            SleepUtils.sleep(1);//不同点2:C2沉睡30秒
            System.out.println("接受到的消息是:" + new String(message.getBody()));
​
            //进行手动应答
            /*
             * 参数1:消息的标记  tag
             * 参数2:是否批量应答,false:不批量应答 true:批量应答
             * */
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        ;
​
        // 采用手动应答
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, (consumerTag) -> 
            System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
        );
    

生产者

public class Producer
    // 队列名称
    public static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException, TimeoutException, TimeoutException 
        Channel channel = RabbitMqUtils.getChannel();

        // 声明队列
        channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) 
            String message = scanner.next();
            channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println("生产者发出消息:" + message);
        
    

测试结果

当处理信息的过程中,终止C2,那么C2所接收到的信息会重新入队,并被C1处理。

持久化

当RabbitMQ服务停掉以后,生产者发布的消息不丢失,我们需要将队列和消息都标记为持久化。

队列持久化

修改声明队列代码,将第二个参数改为true,该参数表示是否持久化这个队列。

// 声明队列
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

再次启动程序,在web界面查看。

如果启动时代码报错,需要先到该界面删除这个队列,再次启动程序。

 

消息持久化

修改消息发送代码,将第三个参数设置为 MessageProperties.PERSISTENT_TEXT_PLAIN 即让消息持久化,但是将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。

//设置生产者发送消息为持久化消息
channel.basicPublish("",
                     TASK_QUEUE_NAME,
                     MessageProperties.PERSISTENT_TEXT_PLAIN, 
                     message.getBytes(StandardCharsets.UTF_8));

不公平分发

和轮询分发不同的是,不公平分发不是按照顺序分发,按照给定的分配方式进行分发,可以理解为能者多劳,谁处理速度快,谁就多分发点。

//设置分发方式
channel.basicQos(1);

 

只要C1为空闲的时候,就先分配给C1。

预取值

消息的发送是异步发送的,消费者的手动确认本质上也是异步的。因此存在一个消息缓冲区,通过限制此缓冲区的大小可以避免缓冲区里无限制的堆积未确认的消息。

设置预取值

设置预取值的代码和不公平分发的代码一致。

//设置预取值,数字即为缓冲区堆积信息的最大值
channel.basicQos(2);

给C1设置预取值5,给C2设置预取值2,发送7条数据,当C2缓冲区满时,不再接收新的数据直到缓冲区有空位,其余的数据全分发给C1。谁的缓冲区有空位,分发给谁。如果都没有空位且堆积的消息越来越多,可能会导致系统问题。

 

消息确认

生产者将信道设置为 confirm 模式,信道进入 confirm 模式之后,所有在该信道上面发布的消息都将会被指派一个唯一的ID,一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置 basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。

confirm 模式是异步的,在等待信道返回确认通知的同时可以继续发送下一条消息。当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。

开启发布确认

调用信道的 confirmSelect 方法。

channel.confirmSelect();//开启发布确认模式

获取信道代码

//得到信道
public static Channel getConfirmChannel(String queueName) throws Exception 
    Channel channel = RabbitMqUtils.getChannel();
    //队列的声明
    channel.queueDeclare(queueName, true, false, false, null);
    //开启发布确认
    channel.confirmSelect();
    return channel;

单个确认发布

这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有消息被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。

这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。

//单个发布确认
public static void publishMessageIndividually() throws Exception 
    String queueName = UUID.randomUUID().toString();
    Channel channel = getConfirmChannel(queueName);
    //开始时间
    long start = System.currentTimeMillis();
    for (int i = 0; i < COUNT; i++) 
        String message = i + "";
        channel.basicPublish("", queueName, null, message.getBytes(StandardCharsets.UTF_8));
        //单个消息马上进行发布确认
        boolean flag = channel.waitForConfirms();
        if (flag) System.out.println("消息发送成功~");
    
    //结束时间
    long end = System.currentTimeMillis();
    System.out.println("单个发布确认发送" + COUNT + "条信息所耗时间:" + (end - start) + "ms");

批量确认发布

先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。

//批量信息确认
public static void publishMessageBatch() throws Exception 
    String queueName = UUID.randomUUID().toString();
    Channel channel = getConfirmChannel(queueName);
    long start = System.currentTimeMillis();
    for (int i = 0; i < COUNT; i++) 
        String message = i + "";
        channel.basicPublish("", queueName, null, message.getBytes(StandardCharsets.UTF_8));
        //每100条信息确认一次
        if (i + 1 % 100 == 0) channel.waitForConfirms();
    
    long end = System.currentTimeMillis();
    System.out.println("批量信息确认发送" + COUNT + "条信息所耗时间:" + (end - start) + "ms");

异步确认发布

性价比最高,利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功。

生产者将消息发送到信道,信道类似于一个Map,给每一条数据编号,通过编号查找哪些消息成功发送哪些消息没有发送成功。交换机接收信息之后通过函数回调来标记确认收到和未确认收到。

//异步发布确认
public static void publicMessageAsync() throws Exception 
    String queueName = UUID.randomUUID().toString();
    Channel channel = getConfirmChannel(queueName);
    
    long start = System.currentTimeMillis();
    // 消息确认成功回调函数
    ConfirmCallback ackCallback = (deliveryTag, multiply) -> 
        System.out.println("确认的消息:" + deliveryTag);
    ;
    // 消息确认失败回调函数
    /*
     * 参数1:消息的标记
     * 参数2:是否为批量确认
     * */
    ConfirmCallback nackCallback = (deliveryTag, multiply) -> 
        System.out.println("未确认的消息:" + deliveryTag);
    ;
    // 准备消息的监听器,监听哪些消息成功,哪些消息失败
    /*
     * 参数1:监听哪些消息成功
     * 参数2:监听哪些消息失败
     * */
    channel.addConfirmListener(ackCallback, nackCallback);
    // 批量发送消息
    for (int i = 0; i < COUNT; i++) 
        String message = "消息" + i;
        channel.basicPublish("", queueName, null, message.getBytes(StandardCharsets.UTF_8));
    
    long end = System.currentTimeMillis();
    System.out.println("异步信息确认发送" + COUNT + "条信息所耗时间:" + (end - start) + "ms");

处理异步未确认的消息

最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列, 比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。

以上是关于RabbitMQ消息队列笔记的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ学习笔记5:RabbitMQ高级

RabbitMQ 笔记-工作队列

RabbitMQ消息队列笔记

RabbitMQ消息队列笔记

RabbitMQ消息队列笔记

RabbitMQ消息队列笔记