rabbit工作队列模式

Posted fuguang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rabbit工作队列模式相关的知识,希望对你有一定的参考价值。

工作队列比简单队列在消费者这边多了一个方法。

channel.basicQos(1);公平队列消费(参数设置为1,表示消费者消费完一条才会去接受再次发来的消息)

生产者:

 1 package com.kf.queueDemo.fairQueue;
 2 
 3 import java.io.IOException;
 4 import java.util.concurrent.TimeoutException;
 5 
 6 import com.kf.utils.RabbitConnectionUtils;
 7 import com.rabbitmq.client.Channel;
 8 import com.rabbitmq.client.Connection;
 9 /**
10  * 公平模式发送消息
11  * @author kf
12  *
13  */
14 public class FairProducer {
15     
16     //队列名称
17     private static String QUEUENAME = "SIMPLEQUEUE";
18     
19     public static void main(String[] args) throws IOException, TimeoutException{
20         Connection connection = RabbitConnectionUtils.getConnection();
21         
22         //创建通道
23         Channel channel = connection.createChannel();
24         
25         //通道里放入队列
26         /**
27          * 第一个参数是  队列名称
28          * 第二个参数指 要不要持久化
29          */
30         channel.queueDeclare(QUEUENAME, false, false, false, null);
31         
32 /*        //消息体
33         String mes = "demo_message汉字";
34         
35         //发送消息
36         *//**
37          * 参数为  exchange, routingKey, props, body
38          * exchange   交换机
39          * routingKey 路由键
40          * 
41          * body 消息体
42          *//*
43         channel.basicPublish("", QUEUENAME, null, mes.getBytes());*/
44         
45         /**
46          * 集群环境下,多个消费者情况下。消费者默认采用均摊
47          */
48         for(int i=1; i<11; i++){
49             String mes = "demo_message汉字"+i;
50             System.out.println("发送消息"+mes);
51             channel.basicPublish("", QUEUENAME, null, mes.getBytes());
52         }
53         
54         
55 //        System.out.println("发送消息"+mes);
56         
57         channel.close();
58         connection.close();
59     }
60 
61 }

消费者1:

 1 package com.kf.queueDemo.fairQueue;
 2 
 3 import java.io.IOException;
 4 import java.util.concurrent.TimeoutException;
 5 
 6 import com.kf.utils.RabbitConnectionUtils;
 7 import com.rabbitmq.client.Channel;
 8 import com.rabbitmq.client.Connection;
 9 import com.rabbitmq.client.DefaultConsumer;
10 import com.rabbitmq.client.Envelope;
11 import com.rabbitmq.client.AMQP.BasicProperties;
12 /**
13  * 公平模式消费者
14  * @author kf
15  *
16  */
17 public class FairConsumer1 {
18     //队列名称
19     private static String QUEUENAME = "SIMPLEQUEUE";
20     
21     public static void main(String[] args) throws IOException, TimeoutException{
22         System.out.println("01开始接收消息");
23         Connection connection = RabbitConnectionUtils.getConnection();
24         
25         //创建通道
26         final Channel channel = connection.createChannel();
27         
28         //通道里放入队列
29         /**
30          * 第一个参数是  队列名称
31          * 第二个参数指 要不要持久化
32          */
33         channel.queueDeclare(QUEUENAME, false, false, false, null);
34         
35         //公平队列消费(参数设置为1,表示消费者消费完一条才会去接受再次发来的消息)
36         channel.basicQos(1);
37         
38         DefaultConsumer consumer = new DefaultConsumer(channel){
39             //监听队列
40                 @Override
41                 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
42                         byte[] body) throws IOException {try {
43                             Thread.sleep(500);
44                         } catch (Exception e) {
45                         }finally {
46                             System.out.println("------------进入监听---------");
47                             String s = new String(body, "utf-8");
48                             System.out.println("获取到的消息是:"+s);
49                             //手动应答。
50                             /**
51                              * 当  channel.basicConsume(QUEUENAME, true, consumer);第二个参数为false时  是手动应答模式
52                              */
53                             channel.basicAck(envelope.getDeliveryTag(), false);
54                         }}
55         };
56         
57         //设置应答模式
58         /**
59          * 参数:  对列名,是否自动签收,监听的类
60          */
61         System.out.println("获取消息的方法之前");
62         channel.basicConsume(QUEUENAME, false, consumer);
63         System.out.println("获取消息的方法之后");
64         
65     }
66 
67 
68 }

消费者2:

package com.kf.queueDemo.fairQueue;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.kf.utils.RabbitConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
/**
 * 公平模式消费者
 * @author kf
 *
 */
public class FairConsumer2 {
    //队列名称
    private static String QUEUENAME = "SIMPLEQUEUE";
    
    public static void main(String[] args) throws IOException, TimeoutException{
        System.out.println("02开始接收消息");
        Connection connection = RabbitConnectionUtils.getConnection();
        
        //创建通道
        final Channel channel = connection.createChannel();
        
        //通道里放入队列
        /**
         * 第一个参数是  队列名称
         * 第二个参数指 要不要持久化
         */
        channel.queueDeclare(QUEUENAME, false, false, false, null);
        
        //公平队列消费(参数设置为1,表示消费者消费完一条才会去接受再次发来的消息)
                channel.basicQos(1);
        
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //监听队列
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    try {
                        Thread.sleep(1000);
                    } catch (Exception e) {
                    }finally {
                        System.out.println("------------进入监听---------");
                        String s = new String(body, "utf-8");
                        System.out.println("获取到的消息是:"+s);
                        //手动应答。
                        /**
                         * 当  channel.basicConsume(QUEUENAME, true, consumer);第二个参数为false时  是手动应答模式
                         */
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                    
                }
        };
        
        //设置应答模式
        /**
         * 参数:  对列名,是否自动签收,监听的类
         */
        System.out.println("获取消息的方法之前");
        channel.basicConsume(QUEUENAME, false, consumer);
        System.out.println("获取消息的方法之后");
        
    }


}

 

以上是关于rabbit工作队列模式的主要内容,如果未能解决你的问题,请参考以下文章

Rabbit 5大模式

Rabbit简单队列模式

何时使用 RabbitMQ 声明/绑定队列和交换

rabbitMQ高可用方案

RabbitMQ_5主题模式

接收订单用redis做缓存好还是rabbit做消息队列好