RabbitMQ之六种队列模式
Posted ssh-html
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ之六种队列模式相关的知识,希望对你有一定的参考价值。
先学习一下RabbitMQ中的六种队列,只学习前五种,具体的官方文档地址是:http://next.rabbitmq.com/getstarted.html
导入maven依赖:
1 <dependency> 2 <groupId>com.rabbitmq</groupId> 3 <artifactId>amqp-client</artifactId> 4 <version>3.4.1</version> 5 </dependency>
一、简单队列
1、图示
P:消息的生产者
C:消息的消费者
红色:队列
生产者将消息发送到队列,消费者从队列中获取消息。
2、获取MQ的连接
1 public static Connection getConnection() throws Exception { 2 //定义连接工厂 3 ConnectionFactory factory = new ConnectionFactory(); 4 //设置服务地址 5 factory.setHost("localhost"); 6 //端口 7 factory.setPort(5672); 8 //设置账号信息,用户名、密码、vhost 9 factory.setVirtualHost("/taotao"); 10 factory.setUsername("taotao"); 11 factory.setPassword("taotao"); 12 // 通过工程获取连接 13 Connection connection = factory.newConnection(); 14 return connection; 15 }
3、生产者发送消息到队列
1 public class Send { 2 3 private final static String QUEUE_NAME = "test_queue"; 4 5 public static void main(String[] argv) throws Exception { 6 // 获取到连接以及mq通道 7 Connection connection = ConnectionUtil.getConnection(); 8 // 从连接中创建通道 9 Channel channel = connection.createChannel(); 10 11 // 声明(创建)队列 12 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 13 14 // 消息内容 15 String message = "Hello World!"; 16 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 17 System.out.println(" [x] Sent ‘" + message + "‘"); 18 19 //关闭通道和连接 20 channel.close(); 21 connection.close(); 22 } 23 }
4、管理工具中查看消息
点击上面的队列名称,查询具体的队列中的信息:
5、消费者从队列中获取消息
1 public class Recv { 2 3 private final static String QUEUE_NAME = "test_queue"; 4 5 public static void main(String[] argv) throws Exception { 6 7 // 获取到连接以及mq通道 8 Connection connection = ConnectionUtil.getConnection(); 9 Channel channel = connection.createChannel(); 10 11 // 声明队列 12 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 13 14 // 定义队列的消费者 15 QueueingConsumer consumer = new QueueingConsumer(channel); 16 // 监听队列 17 channel.basicConsume(QUEUE_NAME, true, consumer); 18 19 // 获取消息 20 while (true) { 21 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 22 String message = new String(delivery.getBody()); 23 System.out.println(" [x] Received ‘" + message + "‘"); 24 } 25 } 26 }
二、 Work模式
1、图示
一个生产者、2个消费者。
一个消息只能被一个消费者获取。
2、消费者1
1 public class Recv { 2 3 private final static String QUEUE_NAME = "test_queue_work"; 4 5 public static void main(String[] argv) throws Exception { 6 7 // 获取到连接以及mq通道 8 Connection connection = ConnectionUtil.getConnection(); 9 Channel channel = connection.createChannel(); 10 11 // 声明队列 12 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 13 14 // 同一时刻服务器只会发一条消息给消费者 15 //channel.basicQos(1); 16 17 // 定义队列的消费者 18 QueueingConsumer consumer = new QueueingConsumer(channel); 19 // 监听队列,手动返回完成 20 channel.basicConsume(QUEUE_NAME, false, consumer); 21 22 // 获取消息 23 while (true) { 24 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 25 String message = new String(delivery.getBody()); 26 System.out.println(" [x] Received ‘" + message + "‘"); 27 //休眠 28 Thread.sleep(10); 29 // 返回确认状态 30 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 31 } 32 } 33 }
3、消费者2
1 public class Recv2 { 2 3 private final static String QUEUE_NAME = "test_queue_work"; 4 5 public static void main(String[] argv) throws Exception { 6 7 // 获取到连接以及mq通道 8 Connection connection = ConnectionUtil.getConnection(); 9 Channel channel = connection.createChannel(); 10 11 // 声明队列 12 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 13 14 // 同一时刻服务器只会发一条消息给消费者 15 //channel.basicQos(1); 16 17 // 定义队列的消费者 18 QueueingConsumer consumer = new QueueingConsumer(channel); 19 // 监听队列,手动返回完成状态 20 channel.basicConsume(QUEUE_NAME, false, consumer); 21 22 // 获取消息 23 while (true) { 24 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 25 String message = new String(delivery.getBody()); 26 System.out.println(" [x] Received ‘" + message + "‘"); 27 // 休眠1秒 28 Thread.sleep(1000); 29 30 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 31 } 32 } 33 }
4、生产者
1 public class Send { 2 3 private final static String QUEUE_NAME = "test_queue_work"; 4 5 public static void main(String[] argv) throws Exception { 6 // 获取到连接以及mq通道 7 Connection connection = ConnectionUtil.getConnection(); 8 Channel channel = connection.createChannel(); 9 10 // 声明队列 11 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 12 13 //向队列中发送50条消息 14 for (int i = 0; i < 50; i++) { 15 // 消息内容 16 String message = "" + i; 17 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 18 System.out.println(" [x] Sent ‘" + message + "‘"); 19 20 Thread.sleep(i * 10); 21 } 22 23 channel.close(); 24 connection.close(); 25 } 26 }
5、测试结果
测试结果:
1、 消费者1和消费者2获取到的消息内容是不同的,同一个消息只能被一个消费者获取。
2、 消费者1和消费者2获取到的消息的数量是相同的,一个是奇数一个是偶数。
其实,这样是不合理的,应该是消费者1要比消费者2获取到的消息多才对。
6、Work模式的“能者多劳”
测试:
消费者1比消费者2获取的消息更多。
这种是比较符合实际情况的,能者多劳,RabbitMQ客户端同一时刻只会给消费者发送一条消息,消费者拿到消息后,重新向客户端拿消息,做的快的消费者要的消息多,同理,做的慢的消费者要的消息少,就体现出来能者多劳的机制了。
7、消息的确认模式
消费者从队列中获取消息,服务端如何知道消息已经被消费呢?
模式1:自动确认
只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费。
模式2:手动确认
消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态。
7.1 手动模式
7.2 自动模式
三、订阅模式
1、图示
解读:
1、1个生产者,多个消费者
2、每一个消费者都有自己的一个队列
3、生产者没有将消息直接发送到队列,而是发送到了交换机
4、每个队列都要绑定到交换机
5、生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的
注释:Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
2、消息的生产者(看作是后台系统)
向交换机中发行消息
1 public class Send { 2 3 private final static String EXCHANGE_NAME = "test_exchange_fanout"; 4 5 public static void main(String[] argv) throws Exception { 6 // 获取到连接以及mq通道 7 Connection connection = ConnectionUtil.getConnection(); 8 Channel channel = connection.createChannel(); 9 10 // 声明exchange 11 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); 12 13 // 消息内容 14 String message = "商品已经被更新,id=1001"; 15 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); 16 System.out.println(" 后台系统: ‘" + message + "‘"); 17 18 channel.close(); 19 connection.close(); 20 } 21 }
注意:消息发送到没有队列绑定的交换机时,消息将丢失,因为,交换机没有存储消息的能力,消息只能存在在队列中。
3、消费者1(看作是前台系统)
1 public class Recv { 2 3 private final static String QUEUE_NAME = "test_queue_ps_1"; 4 5 private final static String EXCHANGE_NAME = "test_exchange_fanout"; 6 7 public static void main(String[] argv) throws Exception { 8 9 // 获取到连接以及mq通道 10 Connection connection = ConnectionUtil.getConnection(); 11 Channel channel = connection.createChannel(); 12 13 // 声明队列 14 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 15 16 // 绑定队列到交换机 17 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); 18 19 // 同一时刻服务器只会发一条消息给消费者 20 channel.basicQos(1); 21 22 // 定义队列的消费者 23 QueueingConsumer consumer = new QueueingConsumer(channel); 24 // 监听队列,手动返回完成 25 channel.basicConsume(QUEUE_NAME, false, consumer); 26 27 // 获取消息 28 while (true) { 29 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 30 String message = new String(delivery.getBody()); 31 System.out.println(" 前台系统: ‘" + message + "‘"); 32 Thread.sleep(10); 33 34 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 35 } 36 } 37 }
4、消费者2(看作是搜索系统)
1 public class Recv2 { 2 3 private final static String QUEUE_NAME = "test_queue_ps_2"; 4 5 private final static String EXCHANGE_NAME = "test_exchange_fanout"; 6 7 public static void main(String[] argv) throws Exception { 8 9 // 获取到连接以及mq通道 10 Connection connection = ConnectionUtil.getConnection(); 11 Channel channel = connection.createChannel(); 12 13 // 声明队列 14 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 15 16 // 绑定队列到交换机 17 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); 18 19 // 同一时刻服务器只会发一条消息给消费者 20 channel.basicQos(1); 21 22 // 定义队列的消费者 23 QueueingConsumer consumer = new QueueingConsumer(channel); 24 // 监听队列,手动返回完成 25 channel.basicConsume(QUEUE_NAME, false, consumer); 26 27 // 获取消息 28 while (true) { 29 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 30 String message = new String(delivery.getBody()); 31 System.out.println(" 搜索系统: ‘" + message + "‘"); 32 Thread.sleep(10); 33 34 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 35 } 36 } 37 }
5、测试
测试结果:
同一个消息被多个消费者获取。
在管理工具中查看队列和交换机的绑定关系:
6、使用订阅模式能否实现商品数据的同步?
答案:可以的。
后台系统就是消息的生产者。
前台系统和搜索系统是消息的消费者。
后台系统将消息发送到交换机中,前台系统和搜索系统都创建自己的队列,然后将队列绑定到交换机,即可实现。
消息,新增商品、修改商品、删除商品。
前台系统:修改商品、删除商品。
搜索系统:新增商品、修改商品、删除商品。
所以使用订阅模式实现商品数据的同步并不合理。
四、路由模式
1、图示
注释:Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。
2、生产者
1 public class Send { 2 3 private final static String EXCHANGE_NAME = "test_exchange_direct"; 4 5 public static void main(String[] argv) throws Exception { 6 // 获取到连接以及mq通道 7 Connection connection = ConnectionUtil.getConnection(); 8 Channel channel = connection.createChannel(); 9 10 // 声明exchange 11 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); 12 13 // 消息内容 14 String message = "商品删除,id=1002"; 15 channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes()); 16 System.out.println(" 后台系统: ‘" + message + "‘"); 17 18 channel.close(); 19 connection.close(); 20 } 21 }
3、消费者1(前台系统)
1 public class Recv { 2 3 private final static String QUEUE_NAME = "test_queue_direct_1"; 4 5 private final static String EXCHANGE_NAME = "test_exchange_direct"; 6 7 public static void main(String[] argv) throws Exception { 8 9 // 获取到连接以及mq通道 10 Connection connection = ConnectionUtil.getConnection(); 11 Channel channel = connection.createChannel(); 12 13 // 声明队列 14 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 15 16 // 绑定队列到交换机 17 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update"); 18 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete"); 19 20 // 同一时刻服务器只会发一条消息给消费者 21 channel.basicQos(1); 22 23 // 定义队列的消费者 24 QueueingConsumer consumer = new QueueingConsumer(channel); 25 // 监听队列,手动返回完成 26 channel.basicConsume(QUEUE_NAME, false, consumer); 27 28 // 获取消息 29 while (true) { 30 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 31 String message = new String(delivery.getBody()); 32 System.out.println(" 前台系统: ‘" + message + "‘"); 33 Thread.sleep(10); 34 35 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 36 } 37 } 38 }
4、消费者2(搜索系统)
1 public class Recv2 { 2 3 private final static String QUEUE_NAME = "test_queue_direct_2"; 4 5 private final static String EXCHANGE_NAME = "test_exchange_direct"; 6 7 public static void main(String[] argv) throws Exception { 8 9 // 获取到连接以及mq通道 10 Connection connection = ConnectionUtil.getConnection(); 11 Channel channel = connection.createChannel(); 12 13 // 声明队列 14 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 15 16 // 绑定队列到交换机 17 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert"); 18 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update"); 19 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete"); 20 21 // 同一时刻服务器只会发一条消息给消费者 22 channel.basicQos(1); 23 24 // 定义队列的消费者 25 QueueingConsumer consumer = new QueueingConsumer(channel); 26 // 监听队列,手动返回完成 27 channel.basicConsume(QUEUE_NAME, false, consumer); 28 29 // 获取消息 30 while (true) { 31 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 32 String message = new String(delivery.getBody()); 33 System.out.println(" 搜索系统: ‘" + message + "‘"); 34 Thread.sleep(10); 35 36 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 37 } 38 } 39 }
五、通配符模式
1、图示
注释:Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。我在RedHat的朋友做了一张不错的图,来表明topic交换机是如何工作的:
2、生产者
1 public class Send { 2 3 private final static String EXCHANGE_NAME = "test_exchange_topic"; 4 5 public static void main(String[] argv) throws Exception { 6 // 获取到连接以及mq通道 7 Connection connection = ConnectionUtil.getConnection(); 8 Channel channel = connection.createChannel(); 9 10 // 声明exchange 11 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); 12 13 // 消息内容 14 String message = "商品删除,id=1003"; 15 channel.basicPublish(EXCHANGE_NAME, "item.delete", null, message.getBytes()); 16 System.out.println(" 后台系统: ‘" + message + "‘"); 17 18 channel.close(); 19 connection.close(); 20 } 21 }
3、消费者1(前台系统)
1 public class Recv { 2 3 private final static String QUEUE_NAME = "test_queue_topic_1"; 4 5 private final static String EXCHANGE_NAME = "test_exchange_topic"; 6 7 public static void main(String[] argv) throws Exception { 8 9 // 获取到连接以及mq通道 10 Connection connection = ConnectionUtil.getConnection(); 11 Channel channel = connection.createChannel(); 12 13 // 声明队列 14 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 15 16 // 绑定队列到交换机 17 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update"); 18 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete"); 19 20 // 同一时刻服务器只会发一条消息给消费者 21 channel.basicQos(1); 22 23 // 定义队列的消费者 24 QueueingConsumer consumer = new QueueingConsumer(channel); 25 // 监听队列,手动返回完成 26 channel.basicConsume(QUEUE_NAME, false, consumer); 27 28 // 获取消息 29 while (true) { 30 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 31 String message = new String(delivery.getBody()); 32 System.out.println(" 前台系统: ‘" + message + "‘"); 33 Thread.sleep(10); 34 35 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 36 } 37 } 38 }
4、消费者2(搜索系统)
1 public class Recv2 { 2 3 private final static String QUEUE_NAME = "test_queue_topic_2"; 4 5 private final static String EXCHANGE_NAME = "test_exchange_topic"; 6 7 public static void main(String[] argv) throws Exception { 8 9 // 获取到连接以及mq通道 10 Connection connection = ConnectionUtil.getConnection(); 11 Channel channel = connection.createChannel(); 12 13 // 声明队列 14 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 15 16 // 绑定队列到交换机 17 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.#"); 18 19 // 同一时刻服务器只会发一条消息给消费者 20 channel.basicQos(1); 21 22 // 定义队列的消费者 23 QueueingConsumer consumer = new QueueingConsumer(channel); 24 // 监听队列,手动返回完成 25 channel.basicConsume(QUEUE_NAME, false, consumer); 26 27 // 获取消息 28 while (true) { 29 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 30 String message = new String(delivery.getBody()); 31 System.out.println(" 搜索系统: ‘" + message + "‘"); 32 Thread.sleep(10); 33 34 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 35 } 36 } 37 }
以上是关于RabbitMQ之六种队列模式的主要内容,如果未能解决你的问题,请参考以下文章