RabbitMQ:工作队列模式

Posted 不断前进的皮卡丘

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ:工作队列模式相关的知识,希望对你有一定的参考价值。

✨ RabbitMQ:工作队列模式


📃个人主页:不断前进的皮卡丘
🌞博客描述:梦想也许遥不可及,但重要的是追梦的过程,用博客记录自己的成长,记录自己一步一步向上攀登的印记
🔥个人专栏:消息中间件

1.基本介绍

工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务并等待其完成。相反,我们将任务安排在以后完成。我们将_任务_封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当您运行多个工作线程时,任务将在它们之间共享。
这个概念在 Web 应用程序中特别有用,因为在 Web 应用程序中,不可能在较短的 HTTP 请求窗口中处理复杂的任务。

在Work Queues工作队列模式中,我们不需要设置交换机(会使用默认的交换机进行消息转换),但是我们需要指定唯一的消息队列来进行消息传递,可以有多个消费者。
多个消费者通过轮询的方式来依次接收消息队列中存储的消息,一旦消息被某个消费者接收了,消息队列就会把消息移除,其他消费者就不能接收这条消息了。消费者必须要等消费完一条消息后才可以准备接收下一条消息。
对于任务过重或者任务比较多的情况,使用工作队列可以提高任务处理速度

2.轮询发送消息

1.如果一个队列中有多个消费者,那么消费者之间对于同一个消息是竞争关系
2.对于任务过重或者任务比较多的情况,使用工作队列可以提高任务处理速度,比如发送短信,我们可以部署多个短信服务,只要有一个节点发送成功即可。

2.1抽取工具类

public class ConnectUtil 
    public static Connection getConnection() throws IOException, TimeoutException 
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置连接参数
        //服务器IP地址
        factory.setHost("192.168.88.133");
        //连接端口
        factory.setPort(5672);
        //设置连接的虚拟机名称
        factory.setVirtualHost("/myhost");
        //用户名
        factory.setUsername("admin");
        //密码
        factory.setPassword("123456");

        //2.创建Connection对象
        Connection connection = factory.newConnection();
        return connection;
    

    /**
     * 创建信道对象
     * @return
     * @throws IOException
     * @throws TimeoutException
     */
    public static Channel getChannel() throws IOException, TimeoutException 
        Connection connection = getConnection();
        Channel channel = connection.createChannel();
        return channel;

    



2.2 生产者

public class Producer 
    static final String QUEUE_NAME="work_queue";
    public static void main(String[] args) 
        try 
            Channel channel = ConnectUtil.getChannel();
            //声明队列(队列名称,是否持久化,是否独占连接,是否在不适用队列的时候自动删除,队列其他参数)
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            //发送消息
            for (int i = 1; i <= 10; i++) 
                String msg="hello rabbitmq!"+i;
                /**
                 * 参数1:交换机名称,不填写交换机名称的话则使用默认的交换机
                 * 参数2:队列名称(路由key)
                 * 参数3:其他参数
                 * 参数4:消息内容
                 */
                channel.basicPublish("", QUEUE_NAME, null,msg.getBytes() );
            
            System.out.println("消息已经发送完毕");


         catch (IOException e) 
            e.printStackTrace();
         catch (TimeoutException e) 
            e.printStackTrace();
        
    


2.3消费者

消费者1

public class Consumer1 
    static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) 

        try 
            Channel channel = ConnectUtil.getChannel();
            //声明队列(队列名称,是否持久化,是否独占连接,是否在不适用队列的时候自动删除,队列其他参数)
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            //接受消息
            DefaultConsumer consumer = new DefaultConsumer(channel) 
                /**
                 * 消费回调函数,当收到消息以后,会自动执行这个方法
                 * @param consumerTag 消费者标识
                 * @param envelope    消息包的内容(比如交换机,路由key,消息id等)
                 * @param properties   属性信息
                 * @param body         消息数据
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                    System.out.println("消息者1接受到的消息:" + new String(body, "UTF-8"));

                
            ;
            //监听消息(队列名称,是否自动确认消息,消费对象)

            channel.basicConsume(QUEUE_NAME, true, consumer);


         catch (IOException e) 
            e.printStackTrace();
         catch (TimeoutException e) 
            e.printStackTrace();
        


    


消费者2

public class Consumer2 
    static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) 

        try 
            Channel channel = ConnectUtil.getChannel();
            //声明队列(队列名称,是否持久化,是否独占连接,是否在不适用队列的时候自动删除,队列其他参数)
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            //接受消息
            DefaultConsumer consumer = new DefaultConsumer(channel) 
                /**
                 * 消费回调函数,当收到消息以后,会自动执行这个方法
                 * @param consumerTag 消费者标识
                 * @param envelope    消息包的内容(比如交换机,路由key,消息id等)
                 * @param properties   属性信息
                 * @param body         消息数据
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                    System.out.println("消息者2接受到的消息:" + new String(body, "UTF-8"));

                
            ;
            //监听消息(队列名称,是否自动确认消息,消费对象)

            channel.basicConsume(QUEUE_NAME, true, consumer);


         catch (IOException e) 
            e.printStackTrace();
         catch (TimeoutException e) 
            e.printStackTrace();
        


    


2.4测试

为了方便测试,我们需要先启动消费者,然后再启动生产者,不然生产者发送的消息会瞬间被某个消费者消费完

3.消息应答

3.1消息应答基本介绍

👀我们知道消费者完成一个任务是需要一定的时间的,如果消费者在处理一个长任务的时候,当它只处理一部分但是此时消费者却挂掉了,可能会出现下面的情况:
👀如果说RabbitMQ向消费者传递一条消息以后,不管消费者有没有处理完或者有没有接收到,就马上把消息标记为删除,那么,如果这个时候消费者挂掉了,就会导致丢失当前正在处理的消息,以及后续发送给消费者的消息,因为消费者不能接收到。
👀为了保证消息在发送过程中不会丢失,RabbitMQ引入了消息应答机制,消息应答就是消费者在接收到消息并且处理该消息以后,告诉RabbitMQ它已经处理了,RabbitMQ就可以把这个消息从消息 队列中删除了。

3.2消息自动应答

  • 消息发送后就马上认为已经传递成功了,这种模式需要在高吞吐量和数据传输安全性方面做权衡。因为如果使用这种模式,如果消息在被接收到之前,消费者那么出现连接或者信道关闭,那么消息就会丢失;不过,对于这种模式来说,消费者那里可以传递过载的消息,没有对传递的消息数量进行限制,这样就可能使得消费者这边因为接收了太多还来不及处理的消息,导致消息积压,最后使得内存耗尽,导致这些消费者线程被操作系统杀死,所以这种模式仅仅适用消费者可以高效并且以某种苏联能够处理这些消息的情况下使用。
  • 信息过载:是指社会信息超过了个人或系统所能接受、处理或有效利用的范围,并导致故障的状况。

3.3消息手动应答

  • 消费者从队列中消费消息,默认采用的是自动应答,自动应答可能导致消息没有完全消费而导致消息失效问题,所以我们要想实现消息消费过程中不丢失,需要把自动应答改为手动应答。而且,使用手动应答可以批量应答减少网络拥堵,下面三个方法可以用于手动应答消息:
  • Channel。basicAck():用于肯定确认,RabbitMQ已经知道消息被消费并且成功处理消息,可以把消息丢弃。
  • Channle.basicNack():用于否定确认
  • Channel.basicReject():用于否定确认,不处理该消息直接拒绝,然后把消息丢弃

3.4批量确认(Multiple)

批量确认的方法是channel.basicAck(deliverTag,true),参数2标识是否批量确认。如果为true,表示批量确认队列中没有应答的消息。
比如channel中传送tag的消息5,6,7,8,当前tag为8,如果参数2为true,那么此时5-8这些还没有被应答的消息都会被确认收到消息应答。如果为false,那么只会应答tag=8的消息。5,6,7这三个消息仍然不会被确认收到消息应答

3.5消息自动重新入队

如果一个消费者死了(它的通道被关闭,连接被关闭,或者TCP连接丢失)而没有发送一个ack,RabbitMQ就会明白一条消息没有被完全处理,并会重新排队。如果同时有其他消费者在线,它将迅速将其重新交付给另一个消费者。通过这种方式,您可以确保即使消费者偶尔死亡,也不会丢失任何消息。

3.6消息手动应答代码

生产者

public class Producer2 
    static final String QUEUE_NAME="ack_work_queue";
    public static void main(String[] args) 
        try 
            Channel channel = ConnectUtil.getChannel();
            //声明队列(队列名称,是否持久化,是否独占连接,是否在不适用队列的时候自动删除,队列其他参数)
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            //发送消息
            for (int i = 1; i <= 10; i++) 
                String msg="你好,小兔子!"+i;
                /**
                 * 参数1:交换机名称,不填写交换机名称的话则使用默认的交换机
                 * 参数2:队列名称(路由key)
                 * 参数3:其他参数
                 * 参数4:消息内容
                 */
                channel.basicPublish("", QUEUE_NAME, null,msg.getBytes() );
            
            System.out.println("消息已经发送完毕");


         catch (IOException e) 
            e.printStackTrace();
         catch (TimeoutException e) 
            e.printStackTrace();
        
    


消费者1

public class Consumer3 
    static final String QUEUE_NAME = "ack_work_queue";

    public static void main(String[] args) 

        try 
            Channel channel = ConnectUtil.getChannel();
            //声明队列(队列名称,是否持久化,是否独占连接,是否在不适用队列的时候自动删除,队列其他参数)
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            System.out.println("消费者1-消费消息的时间比较短。");
            //接受消息
            DefaultConsumer consumer = new DefaultConsumer(channel) 
                /**
                 * 消费回调函数,当收到消息以后,会自动执行这个方法
                 * @param consumerTag 消费者标识
                 * @param envelope    消息包的内容(比如交换机,路由key,消息id等)
                 * @param properties   属性信息
                 * @param body         消息数据
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                   //睡眠一秒
                    SleepUtil.sleep(1);
                    System.out.println("消息者1接受到的消息:" + new String(body, "UTF-8"));
                    //手动确认
                    //每条消息都有对应的id,表明是第几条消息,false表示不批量
                    channel.basicAck(envelope.getDeliveryTag(), false);
                
            ;
            //监听消息(队列名称,是否自动确认消息,消费对象)

            channel.basicConsume(QUEUE_NAME, false, consumer);


         catch (IOException e) 
            e.printStackTrace();
         catch (TimeoutException e) 
            e.printStackTrace();
        


    


消费者2

public class Consumer4 
    static final String QUEUE_NAME = "ack_work_queue";

    public static void main(String[] args) 

        try 
            Channel channel = ConnectUtil.getChannel();
            //声明队列(队列名称,是否持久化,是否独占连接,是否在不适用队列的时候自动删除,队列其他参数)
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            System.out.println("消费者2-消费消息的时间比较长。");
            //接受消息
            DefaultConsumer consumer = new DefaultConsumer(channel) 
                /**
                 * 消费回调函数,当收到消息以后,会自动执行这个方法
                 * @param consumerTag 消费者标识
                 * @param envelope    消息包的内容(比如交换机,路由key,消息id等)
                 * @param properties   属性信息
                 * @param body         消息数据
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                   //睡眠一秒
                    SleepUtil.sleep(50);
                    System.out.println("消息者2接受到的消息:" + new String(body, "UTF-8"));
                    //手动确认
                    //每条消息都有对应的id,表明是第几条消息,false表示不批量
                    channel.basicAck(envelope.getDeliveryTag(), false);
                
            ;
            //监听消息(队列名称,是否自动确认消息,消费对象)

            channel.basicConsume(QUEUE_NAME, false, consumer);


         catch (IOException e) 
            e.printStackTrace();
         catch (TimeoutException e) 
            e.printStackTrace();
        


    


3.7消息手动应答效果

第一次测试,两个消费者都睡眠1秒



第二次测试,让消费者2睡眠30秒,然后观察两个消费者的消费情况,



接着把消费者2停掉,再次观察消费者1控制台打印的消息,发现队列中没有被消费的消息重新进入到队列中,并且被消费者1进行消费

4.消息的持久化

我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是,如果 RabbitMQ 服务器停止,我们的任务仍将丢失。
当 RabbitMQ 退出或崩溃时,它会忘记队列和消息,除非您告诉它不要这样做。要确保消息不会丢失,需要做两件事:我们需要将队列和消息都标记为持久。
首先,我们需要确保队列在 RabbitMQ 节点重新启动后仍能存活下来。为此,我们需要将其声明为_持久:_
如果我们之前创建的队列是非持久化的,如果RabbitMQ重启的话,该队列就会被删除掉,如果要队列实现持久化需要在声明队列的时候把durable参数设置为持久化;

4.1队列持久化

**如果之前创建队列的时候,没有设置成持久化,我们需要把原来的队列先删除掉,或者说重新创建一个新的持久化队列,不然会报错。因为RabbitMQ 不允许我们使用不同的参数重新定义现有队列,并且会向任何尝试执行此操作的程序返回错误。但是有一个快速的解决方法 - 让我们声明一个具有不同名称的队列, **

// 参数1 queue :队列名
// 参数2 durable :是否持久化
// 参数3 exclusive :仅创建者可以使用的私有队列,断开后自动删除
// 参数4 autoDelete : 当所有消费客户端连接断开后,是否自动删除队列
// 参数5 arguments
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

4.2消息持久化

我们需要将消息标记为持久性 - 通过将消息属性(实现基本属性)设置为PERSISTENT_TEXT_PLAIN的值。

//交换机名称,队列名称,消息持久化,消息
channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

将消息标记为持久性并不能完全保证消息不会丢失。尽管它告诉 RabbitMQ 将消息保存到磁盘,但当 RabbitMQ 接受消息但尚未保存消息时,仍有一个较短的时间窗口。另外, RabbitMQ 不会对每条消息都执行 fsync( fsync函数同步内存中所有已修改的文件数据到储存设备。) – 它可能只是保存到缓存中,而不是真正写入磁盘。持久性保证并不强,但对于我们的简单任务队列来说已经足够了。如果您需要更强的保证,则可以使用发布者确认模式。

4.3公平调度

int prefetchCount = 1;
channel.basicQos(prefetchCount);

以上是关于RabbitMQ:工作队列模式的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ的六种工作模式

rabbitmq 重复ACK导致消息丢失

四RabbitMQ中模式—工作队列(Work Queues)模式

RabbitMQ六种队列模式-工作队列模式

RabbitMQ入门订阅模式

RabbitMQ 原文译04--发布和订阅