rabbit工作队列模式
Posted fuguang
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rabbit工作队列模式相关的知识,希望对你有一定的参考价值。
工作队列比简单队列在消费者这边多了一个方法。
channel.basicQos(1);公平队列消费(参数设置为1,表示消费者消费完一条才会去接受再次发来的消息)
生产者:
1 package com.kf.queueDemo.fairQueue; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 import com.kf.utils.RabbitConnectionUtils; 7 import com.rabbitmq.client.Channel; 8 import com.rabbitmq.client.Connection; 9 /** 10 * 公平模式发送消息 11 * @author kf 12 * 13 */ 14 public class FairProducer { 15 16 //队列名称 17 private static String QUEUENAME = "SIMPLEQUEUE"; 18 19 public static void main(String[] args) throws IOException, TimeoutException{ 20 Connection connection = RabbitConnectionUtils.getConnection(); 21 22 //创建通道 23 Channel channel = connection.createChannel(); 24 25 //通道里放入队列 26 /** 27 * 第一个参数是 队列名称 28 * 第二个参数指 要不要持久化 29 */ 30 channel.queueDeclare(QUEUENAME, false, false, false, null); 31 32 /* //消息体 33 String mes = "demo_message汉字"; 34 35 //发送消息 36 *//** 37 * 参数为 exchange, routingKey, props, body 38 * exchange 交换机 39 * routingKey 路由键 40 * 41 * body 消息体 42 *//* 43 channel.basicPublish("", QUEUENAME, null, mes.getBytes());*/ 44 45 /** 46 * 集群环境下,多个消费者情况下。消费者默认采用均摊 47 */ 48 for(int i=1; i<11; i++){ 49 String mes = "demo_message汉字"+i; 50 System.out.println("发送消息"+mes); 51 channel.basicPublish("", QUEUENAME, null, mes.getBytes()); 52 } 53 54 55 // System.out.println("发送消息"+mes); 56 57 channel.close(); 58 connection.close(); 59 } 60 61 }
消费者1:
1 package com.kf.queueDemo.fairQueue; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 import com.kf.utils.RabbitConnectionUtils; 7 import com.rabbitmq.client.Channel; 8 import com.rabbitmq.client.Connection; 9 import com.rabbitmq.client.DefaultConsumer; 10 import com.rabbitmq.client.Envelope; 11 import com.rabbitmq.client.AMQP.BasicProperties; 12 /** 13 * 公平模式消费者 14 * @author kf 15 * 16 */ 17 public class FairConsumer1 { 18 //队列名称 19 private static String QUEUENAME = "SIMPLEQUEUE"; 20 21 public static void main(String[] args) throws IOException, TimeoutException{ 22 System.out.println("01开始接收消息"); 23 Connection connection = RabbitConnectionUtils.getConnection(); 24 25 //创建通道 26 final Channel channel = connection.createChannel(); 27 28 //通道里放入队列 29 /** 30 * 第一个参数是 队列名称 31 * 第二个参数指 要不要持久化 32 */ 33 channel.queueDeclare(QUEUENAME, false, false, false, null); 34 35 //公平队列消费(参数设置为1,表示消费者消费完一条才会去接受再次发来的消息) 36 channel.basicQos(1); 37 38 DefaultConsumer consumer = new DefaultConsumer(channel){ 39 //监听队列 40 @Override 41 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, 42 byte[] body) throws IOException {try { 43 Thread.sleep(500); 44 } catch (Exception e) { 45 }finally { 46 System.out.println("------------进入监听---------"); 47 String s = new String(body, "utf-8"); 48 System.out.println("获取到的消息是:"+s); 49 //手动应答。 50 /** 51 * 当 channel.basicConsume(QUEUENAME, true, consumer);第二个参数为false时 是手动应答模式 52 */ 53 channel.basicAck(envelope.getDeliveryTag(), false); 54 }} 55 }; 56 57 //设置应答模式 58 /** 59 * 参数: 对列名,是否自动签收,监听的类 60 */ 61 System.out.println("获取消息的方法之前"); 62 channel.basicConsume(QUEUENAME, false, consumer); 63 System.out.println("获取消息的方法之后"); 64 65 } 66 67 68 }
消费者2:
package com.kf.queueDemo.fairQueue; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.kf.utils.RabbitConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; /** * 公平模式消费者 * @author kf * */ public class FairConsumer2 { //队列名称 private static String QUEUENAME = "SIMPLEQUEUE"; public static void main(String[] args) throws IOException, TimeoutException{ System.out.println("02开始接收消息"); Connection connection = RabbitConnectionUtils.getConnection(); //创建通道 final Channel channel = connection.createChannel(); //通道里放入队列 /** * 第一个参数是 队列名称 * 第二个参数指 要不要持久化 */ channel.queueDeclare(QUEUENAME, false, false, false, null); //公平队列消费(参数设置为1,表示消费者消费完一条才会去接受再次发来的消息) channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel){ //监听队列 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { try { Thread.sleep(1000); } catch (Exception e) { }finally { System.out.println("------------进入监听---------"); String s = new String(body, "utf-8"); System.out.println("获取到的消息是:"+s); //手动应答。 /** * 当 channel.basicConsume(QUEUENAME, true, consumer);第二个参数为false时 是手动应答模式 */ channel.basicAck(envelope.getDeliveryTag(), false); } } }; //设置应答模式 /** * 参数: 对列名,是否自动签收,监听的类 */ System.out.println("获取消息的方法之前"); channel.basicConsume(QUEUENAME, false, consumer); System.out.println("获取消息的方法之后"); } }
以上是关于rabbit工作队列模式的主要内容,如果未能解决你的问题,请参考以下文章