RabbitMQ简单应用の公平分发(fair dipatch)
Posted pengpengzhang
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ简单应用の公平分发(fair dipatch)相关的知识,希望对你有一定的参考价值。
公平分发(fair dipatch)和轮询分发其实基本一致,只是每次分发的机制变了,由原来的平均分配到现在每次只处理一条消息
1.MQ连接工厂类Connection
1 package com.mmr.rabbitmq.util; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.client.Connection; 6 import com.rabbitmq.client.ConnectionFactory; 7 8 public class ConnectionUtils { 9 /** 10 * @desc 获取Mq 的链接 11 * @author zp 12 * @throws IOException 13 * @date 2018-7-19 14 */ 15 public static Connection getConnection() throws IOException { 16 // 1.定义一个链接工厂 17 ConnectionFactory factroy = new ConnectionFactory(); 18 19 // 2.设置服务地址 20 factroy.setHost("127.0.0.1"); 21 22 // 3.设置端口号 23 factroy.setPort(5672); 24 25 // 4.vhost 设置数据库 26 factroy.setVirtualHost("vhtest"); 27 28 // 5.设置用户名 29 factroy.setUsername("jerry"); 30 31 // 6. 设置密码 32 factroy.setPassword("123456"); 33 34 // 7.返回链接 35 return factroy.newConnection(); 36 } 37 }
2.消息生产者Send,这里的变化是声明了“每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者,一次只处理一个消息” channel.basicQos(intnum);
1 package com.mmr.rabbitmq.workfair; 2 3 import java.io.IOException; 4 5 import com.mmr.rabbitmq.util.ConnectionUtils; 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.Connection; 8 9 public class Send { 10 11 /* 12 * |--C1 13 * P-------|--C2 14 * |--C3 15 * 16 * */ 17 private static final String QUEUE_NAME="test_work_queue"; 18 public static void main(String[] args) throws IOException, InterruptedException{ 19 // 获取链接 20 Connection connection = ConnectionUtils.getConnection(); 21 22 // 获取通道 23 Channel channel = connection.createChannel(); 24 // 声明队列 25 channel.queueDeclare(QUEUE_NAME,false,false,false,null); 26 /* 27 * 每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者,一次只处理一个消息 28 * 29 * 限制发送给同一个消费者只能发送一条 30 * */ 31 int prefetchCount =1; 32 channel.basicQos(prefetchCount); 33 34 35 for (int i = 0; i < 50; i++) { 36 String msg = "hello "+i; 37 channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); 38 System.out.println("send msg 的第"+i+"条"); 39 Thread.sleep(i*20); 40 } 41 channel.close(); 42 connection.close(); 43 } 44 }
3.消息处理者(消费者)Recv1 Recv2,这里的区别在于:
(1)每次只处理1条消息channel.basicQos(1);
(2)并且在消息处理完之后会手动返回回执单 channel.basicAck(envelope.getDeliveryTag(), false);
(3)最后将之前的自动应答true改为false boolean autoAck = false;
1 package com.mmr.rabbitmq.workfair; 2 3 import java.io.IOException; 4 5 import com.mmr.rabbitmq.util.ConnectionUtils; 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.Connection; 8 import com.rabbitmq.client.Consumer; 9 import com.rabbitmq.client.DefaultConsumer; 10 import com.rabbitmq.client.Envelope; 11 import com.rabbitmq.client.AMQP.BasicProperties; 12 13 public class Recv1 { 14 private static final String QUEUE_NAME="test_work_queue"; 15 public static void main(String[] args) throws IOException{ 16 // 获取链接 17 Connection connection = ConnectionUtils.getConnection(); 18 19 //获取频道 20 21 final Channel channel = connection.createChannel(); 22 23 // 声明队列 24 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 25 26 // 关闭自动应答 27 channel.basicQos(1); // 保证每次只被分发一个 28 29 // 定义一个消费者 30 Consumer consumer = new DefaultConsumer(channel){ 31 // 一旦有消息 就会触发这个方法 消息到达 32 @Override 33 public void handleDelivery(String consumerTag, Envelope envelope, 34 BasicProperties properties, byte[] body) throws IOException { 35 // TODO Auto-generated method stub 36 // 拿消息 37 String msg = new String(body,"utf-8"); 38 39 //搭出来 40 System.out.println("[1]Recv msg:"+msg); 41 try { 42 Thread.sleep(2000); 43 } catch (Exception e) { 44 // TODO: handle exception 45 e.printStackTrace(); 46 }finally{ 47 System.out.println("[1] done"); 48 // 手动回执 49 channel.basicAck(envelope.getDeliveryTag(), false); 50 } 51 } 52 }; 53 // boolean autoAck = true; // 自动应答改为false 54 boolean autoAck = false; 55 channel.basicConsume(QUEUE_NAME, autoAck,consumer); 56 57 } 58 }
1 package com.mmr.rabbitmq.workfair; 2 3 import java.io.IOException; 4 5 import com.mmr.rabbitmq.util.ConnectionUtils; 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.Connection; 8 import com.rabbitmq.client.Consumer; 9 import com.rabbitmq.client.DefaultConsumer; 10 import com.rabbitmq.client.Envelope; 11 import com.rabbitmq.client.AMQP.BasicProperties; 12 13 public class Recv2 { 14 private static final String QUEUE_NAME="test_work_queue"; 15 public static void main(String[] args) throws IOException{ 16 // 获取链接 17 Connection connection = ConnectionUtils.getConnection(); 18 19 //获取频道 20 21 final Channel channel = connection.createChannel(); 22 23 // 声明队列 24 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 25 26 // 保证每次只接收一条消息 27 channel.basicQos(1); 28 29 // 定义一个消费者 30 Consumer consumer = new DefaultConsumer(channel){ 31 // 一旦有消息 就会触发这个方法 消息到达 32 @Override 33 public void handleDelivery(String consumerTag, Envelope envelope, 34 BasicProperties properties, byte[] body) throws IOException { 35 // TODO Auto-generated method stub 36 // 拿消息 37 String msg = new String(body,"utf-8"); 38 39 //搭出来 40 System.out.println("[2]Recv msg:"+msg); 41 try { 42 Thread.sleep(1000); 43 } catch (Exception e) { 44 // TODO: handle exception 45 e.printStackTrace(); 46 }finally{ 47 System.out.println("[2] done"); 48 // 手动回执 49 channel.basicAck(envelope.getDeliveryTag(), false); 50 } 51 } 52 }; 53 // boolean autoAck = true; // 自动应答改为false 54 boolean autoAck = false; 55 channel.basicConsume(QUEUE_NAME, autoAck,consumer); 56 57 } 58 }
以上是关于RabbitMQ简单应用の公平分发(fair dipatch)的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMQ02_简单模式Publish/Subscribe发布与订阅模式Routing路由模式Topics通配符模式Work模式-轮询公平
RabbitMQ02_简单模式Publish/Subscribe发布与订阅模式Routing路由模式Topics通配符模式Work模式-轮询公平
RabbitMQ——消息手动应答队列/消息持久化不公平分发预取值的概念理解及应用举例