RabbitMQ简单应用の公平分发(fair dipatch)

Posted pengpengzhang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ简单应用の公平分发(fair dipatch)相关的知识,希望对你有一定的参考价值。

公平分发(fair dipatch)和轮询分发其实基本一致,只是每次分发的机制变了,由原来的平均分配到现在每次只处理一条消息

1.MQ连接工厂类Connection

技术分享图片
 1 package com.mmr.rabbitmq.util;
 2 
 3 import java.io.IOException;
 4 
 5 import com.rabbitmq.client.Connection;
 6 import com.rabbitmq.client.ConnectionFactory;
 7 
 8 public class ConnectionUtils {
 9     /**
10      * @desc 获取Mq 的链接
11      * @author zp
12      * @throws IOException 
13      * @date 2018-7-19
14      */
15     public static  Connection getConnection() throws IOException {
16         // 1.定义一个链接工厂
17         ConnectionFactory factroy = new ConnectionFactory();
18         
19         // 2.设置服务地址
20         factroy.setHost("127.0.0.1");
21         
22         // 3.设置端口号
23         factroy.setPort(5672);
24         
25         // 4.vhost  设置数据库
26         factroy.setVirtualHost("vhtest");
27         
28         // 5.设置用户名
29         factroy.setUsername("jerry");
30         
31         // 6. 设置密码
32         factroy.setPassword("123456");
33         
34         // 7.返回链接
35         return factroy.newConnection();
36     }
37 }
View Code

2.消息生产者Send,这里的变化是声明了“每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者,一次只处理一个消息” channel.basicQos(intnum);

技术分享图片
 1 package com.mmr.rabbitmq.workfair;
 2 
 3 import java.io.IOException;
 4 
 5 import com.mmr.rabbitmq.util.ConnectionUtils;
 6 import com.rabbitmq.client.Channel;
 7 import com.rabbitmq.client.Connection;
 8 
 9 public class Send {
10     
11     /*
12      *         |--C1
13      * P-------|--C2
14      *         |--C3
15      * 
16      * */
17     private static final String QUEUE_NAME="test_work_queue";
18     public static void main(String[] args) throws IOException, InterruptedException{
19         // 获取链接
20         Connection connection = ConnectionUtils.getConnection();
21         
22         // 获取通道
23         Channel channel = connection.createChannel();
24         // 声明队列
25         channel.queueDeclare(QUEUE_NAME,false,false,false,null);
26         /*
27          * 每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者,一次只处理一个消息
28          * 
29          * 限制发送给同一个消费者只能发送一条
30          * */
31         int prefetchCount =1;
32         channel.basicQos(prefetchCount);
33         
34         
35         for (int i = 0; i < 50; i++) {
36             String msg = "hello "+i;
37             channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
38             System.out.println("send msg 的第"+i+"条");
39             Thread.sleep(i*20);
40         }
41         channel.close();
42         connection.close();
43     }
44 }
View Code

3.消息处理者(消费者)Recv1 Recv2,这里的区别在于:

(1)每次只处理1条消息channel.basicQos(1);

(2)并且在消息处理完之后会手动返回回执单 channel.basicAck(envelope.getDeliveryTag(), false);

(3)最后将之前的自动应答true改为false boolean autoAck = false;

技术分享图片
 1 package com.mmr.rabbitmq.workfair;
 2 
 3 import java.io.IOException;
 4 
 5 import com.mmr.rabbitmq.util.ConnectionUtils;
 6 import com.rabbitmq.client.Channel;
 7 import com.rabbitmq.client.Connection;
 8 import com.rabbitmq.client.Consumer;
 9 import com.rabbitmq.client.DefaultConsumer;
10 import com.rabbitmq.client.Envelope;
11 import com.rabbitmq.client.AMQP.BasicProperties;
12 
13 public class Recv1 {
14     private static final String QUEUE_NAME="test_work_queue";
15     public static void main(String[] args) throws IOException{
16         // 获取链接
17         Connection connection = ConnectionUtils.getConnection();
18         
19         //获取频道
20         
21         final Channel channel = connection.createChannel();
22         
23         // 声明队列
24         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
25         
26         // 关闭自动应答
27         channel.basicQos(1); // 保证每次只被分发一个
28         
29         // 定义一个消费者
30         Consumer consumer = new DefaultConsumer(channel){
31             // 一旦有消息 就会触发这个方法  消息到达 
32             @Override
33             public void handleDelivery(String consumerTag, Envelope envelope,
34                     BasicProperties properties, byte[] body) throws IOException {
35                 // TODO Auto-generated method stub
36                 // 拿消息
37                 String msg = new String(body,"utf-8");
38                 
39                 //搭出来
40                 System.out.println("[1]Recv msg:"+msg);
41                 try {
42                     Thread.sleep(2000);
43                 } catch (Exception e) {
44                     // TODO: handle exception
45                     e.printStackTrace();
46                 }finally{
47                     System.out.println("[1] done");
48                     // 手动回执
49                     channel.basicAck(envelope.getDeliveryTag(), false);
50                 }
51             }
52         };
53         // boolean autoAck = true; // 自动应答改为false
54         boolean autoAck = false;
55         channel.basicConsume(QUEUE_NAME, autoAck,consumer);
56         
57     }
58 }
View Code
技术分享图片
 1 package com.mmr.rabbitmq.workfair;
 2 
 3 import java.io.IOException;
 4 
 5 import com.mmr.rabbitmq.util.ConnectionUtils;
 6 import com.rabbitmq.client.Channel;
 7 import com.rabbitmq.client.Connection;
 8 import com.rabbitmq.client.Consumer;
 9 import com.rabbitmq.client.DefaultConsumer;
10 import com.rabbitmq.client.Envelope;
11 import com.rabbitmq.client.AMQP.BasicProperties;
12 
13 public class Recv2 {
14     private static final String QUEUE_NAME="test_work_queue";
15     public static void main(String[] args) throws IOException{
16         // 获取链接
17         Connection connection = ConnectionUtils.getConnection();
18         
19         //获取频道
20         
21         final Channel channel = connection.createChannel();
22         
23         // 声明队列
24         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
25         
26         // 保证每次只接收一条消息
27         channel.basicQos(1);
28         
29         // 定义一个消费者
30         Consumer consumer = new DefaultConsumer(channel){
31             // 一旦有消息 就会触发这个方法  消息到达 
32             @Override
33             public void handleDelivery(String consumerTag, Envelope envelope,
34                     BasicProperties properties, byte[] body) throws IOException {
35                 // TODO Auto-generated method stub
36                 // 拿消息
37                 String msg = new String(body,"utf-8");
38                 
39                 //搭出来
40                 System.out.println("[2]Recv msg:"+msg);
41                 try {
42                     Thread.sleep(1000);
43                 } catch (Exception e) {
44                     // TODO: handle exception
45                     e.printStackTrace();
46                 }finally{
47                     System.out.println("[2] done");
48                     // 手动回执
49                     channel.basicAck(envelope.getDeliveryTag(), false);
50                 }
51             }
52         };
53         // boolean autoAck = true; // 自动应答改为false
54         boolean autoAck = false;
55         channel.basicConsume(QUEUE_NAME, autoAck,consumer);
56         
57     }
58 }
View Code

 

以上是关于RabbitMQ简单应用の公平分发(fair dipatch)的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ02_简单模式Publish/Subscribe发布与订阅模式Routing路由模式Topics通配符模式Work模式-轮询公平

RabbitMQ02_简单模式Publish/Subscribe发布与订阅模式Routing路由模式Topics通配符模式Work模式-轮询公平

RabbitMQ——消息手动应答队列/消息持久化不公平分发预取值的概念理解及应用举例

RabbitMQ——消息手动应答队列/消息持久化不公平分发预取值的概念理解及应用举例

Java使用RabbitMQ之公平分发

RabbitMQ-08 不公平分发与预取值