#yyds干货盘点#RabbitMQ示例2:工作队列

Posted 汤圆学Java

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了#yyds干货盘点#RabbitMQ示例2:工作队列相关的知识,希望对你有一定的参考价值。

作者:汤圆

个人博客: javalover.cc

工作队列

目录

  • 定义
  • 基础DEMO
  • 知识点:轮询机制、公平机制、消息确认、消息持久

定义

比如web服务器,某个请求的后台操作可能要耗时十几分钟,此时如果没有工作队列,那么请求就会响应超时
但是有了工作队列,就可以先把请求要处理的任务放到工作队列中,然后快速响应请求

基础DEMO

1. 生产者

NewTask.java

public class NewTask 

    private final static String QUEUE_NAME = "work_queue";

    public static void main(String[] args) 
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置RabbitMQ服务器地址(默认也是localhsot)
        factory.setHost("localhost");
        try 
            // 创建连接
            Connection connection = factory.newConnection();
            // 创建频道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 这里"hello."包含一个".",表示耗时1S
            String message = "hello.";
            for (int i = 0; i < 9; i++) 
                // 发布消息
                channel.basicPublish("", QUEUE_NAME,null, message.getBytes());
                System.out.println("send:" + message);
            
         catch (IOException e) 
            e.printStackTrace();
         catch (TimeoutException e) 
            e.printStackTrace();
        
    

2. 消费者

Work.java

public class Work 
    private final static String QUEUE_NAME = "work_queue";

    public static void main(String[] args) 
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        try 
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();

            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println("waiting for messages, to exit press CTRL+C");
            DeliverCallback callback = (s, delivery)->
                String s1 = new String(delivery.getBody(), "utf-8");
                System.out.println("received: "+s1);
                try 
                    doWork(s1);
                 catch (InterruptedException e) 
                    e.printStackTrace();
                 finally 
                    System.out.println("done");
                
            ;
            channel.basicConsume(QUEUE_NAME, true, callback, consumeTag->);
         catch (IOException e) 
            e.printStackTrace();
         catch (TimeoutException e) 
            e.printStackTrace();
        
    

    // 有多个".",就耗时多少秒
    private static void doWork(String s1) throws InterruptedException 
        for (String c: s1.split("")) 
            if(c.equals("."))
                Thread.sleep(1000);
            
        
    

知识点

1. 轮询机制

比如有一个生产者,三个消费者;生产者发了九条消息,那么三个消费者每人可以拿到三条消息

与之对应的是公平机制,下面会讲到

示例:

将上面的消费者程序Work再复制两份,总共三个消费者Work1.java,Work2.java,Work3.java

步骤:

  1. 依次启动这三个消费者

  2. 最后启动生产者程序NewTask.java

  3. 从控制台可以看到,平均每个消费者拿到三条消息

2. 消息确认

之所以默认手动,是因为自动存在隐患(消息丢失):

  • 如果消费者执行的任务比较耗时,那么有可能在任务执行过程中,该消费者挂掉
  • 但是此时消费者已经删除了这条消息,那么结果就是这条消息丢失
  • 而且给这个消费者发送的其他后续消息(已确认收到,但是还没处理)也一并丢失了。

但是手动不存在这个隐患,手动会把挂掉的消费者里面的消息重新分配到其他消费者

下面用表格列出:手动确认和自动确认的区别

手动确认 自动确认
消息丢失 不会 会(已被确认的消息,会被删除,导致消息丢失)
消息阻塞 会(忘记确认消息,导致该消费者接收的其他后续消息也无法处理) 不会

推荐手动确认的方式来告诉生产者,我收到消息了(一般在任务完成后,手动回复确认消息)

示例1:

修改Work3.java,其中1. 和2. 是修改的部分

// 2. === begin ===
// 注释掉手动确认的代码,假设忘记了手动确认
// channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
System.out.println("forget ack");
// 2. === end ===

// 1. === begin ===
// 这里将之前的自动确认改为手动确认
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, callback, consumeTag->);
// 1. === end ===

步骤

  • 三个消费者顺序启动(Work1, Work2, Work3),然后启动生产者(NewTask)

  • 观察控制台,如下所示

示例2:

继续上面的示例1,代码不变

步骤:

  • 上面的例子中,每个消费者都有三条消息,其中Work3的三条消息都还没确认

  • 此时中断Work3,可以看到,Work1又执行了1条(总共4条),Work2又执行了2条(总共5条)

3. 消息持久化

上面的手动确认,只是保证了消费者如果挂掉,消息不会丢失
但是如果是RabbitMQ服务挂了呢?

可以通过设置参数来保证队列和消息基本不会丢失(不能完全保证不丢失,要完全保证不丢失,可以参考publisher confirms

// 1. 第二个参数设置为true,生产者和消费者都要改
channel.queueDeclare(QUEUE_NAME, true, false, false, null); 
// 2. basic_properties设置为PERSISTENT_TEXT_PLAIN
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); 

示例

修改NewTask.java

// 1. 重新定义一个队列:因为RabbitMQ不允许以不同的参数 重复定义同一个队列
private final static String QUEUE_NAME_DURABLE = "work_queue_durable";

// 2. 队列持久化:如果RabbitMQ服务挂了,保证队列还存在
boolean durable = true;
channel.queueDeclare(QUEUE_NAME_DURABLE, durable, false, false, null);

// 3.消息持久化:如果RabbitMQ服务挂了,保证消息还存在;
// 这里将basicProperties属性设置为 PERSISTENT_TEXT_PLAIN
channel.basicPublish("", QUEUE_NAME_DURABLE, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

修改Work1.java,Work2.java,Work3.java,下面的代码同步修改到三个消费者中

// 1. 重新定义一个队列:因为RabbitMQ不允许以不同的参数 重复定义同一个队列
private final static String QUEUE_NAME_DURABLE = "work_queue_durable";

// 2. 队列持久化:如果RabbitMQ服务挂了,保证队列还存在
boolean durable = true;
channel.queueDeclare(QUEUE_NAME_DURABLE, durable, false, false, null);

// 3. 修改消费时的队列名称
channel.basicConsume(QUEUE_NAME_DURABLE, true, callback, consumeTag->);

4. 公平机制

前面的例子,我们看到,就算消费者忘记手动确认,RabbitMQ还是将消息均匀的分配给了每个消费者(即忘记确认的消费者后续还会收到消息),
其实这是不合理不公平的,因为这样就导致,那些忘记确认的消费者一直占着消息不去处理,造成消息阻塞,RabbitMQ占用内存也会越来越大

示例

修改刚才的Work3.java,因为Work3.java中忘记了手动确认,符合这个测试场景

// 设置公平分配策略,即消费者确认了一个消息后,RabbitMQ才会给它分配下一个消息
int prefetchCount = 1;
channel.basicQos(prefetchCount);

步骤

  • 三个消费者顺序启动(Work1, Work2, Work3),然后启动生产者(NewTask)
  • 观察控制台,可以看到,Work3只接受了一条消息,Work1和Work2分别接收了4条消息

参考

RabbitMQ官网教程:第二节

以上是关于#yyds干货盘点#RabbitMQ示例2:工作队列的主要内容,如果未能解决你的问题,请参考以下文章

#yyds干货盘点#RabbitMQ示例5:主题topic交换机

#yyds干货盘点#RabbitMQ的简单模式案例讲解,非常详细

#yyds干货盘点#RabbitMQ的简单模式案例讲解,非常详细

#yyds干货盘点#「MQ」RabbitMQ的基本概念介绍,通俗易懂!

#yyds干货盘点# springcloud整合stream,rabbitmq实现消息驱动功能

#yyds干货盘点#使用php-amqplib实现RabbitMq