MQ的订阅模式
Posted 曹军
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MQ的订阅模式相关的知识,希望对你有一定的参考价值。
一:介绍
1.模式
2.使用场景
一个生产者,多个消费者
每一个消费者都有自己的队列
生产者没有直接把消息发送给队列,而是发送到了交换机
每一个队列都要绑定到交换机
可以实现一个消息被多个消费者消费。
二:程序
1.生产者
1 package com.mq.PubSubFanout; 2 3 import com.mq.utils.ConnectionUtil; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 7 public class FanoutSend { 8 private static final String EXCHANGE_NAME="text_exchange_fanout"; 9 public static void main(String[] args) throws Exception { 10 //获取一个连接 11 Connection connection= ConnectionUtil.getConnection(); 12 //从连接中获取一个通道 13 Channel channel=connection.createChannel(); 14 //创建交换机 15 channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); 16 17 //消息 18 String msg="hello pubsub"; 19 20 //发送 21 channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes()); 22 23 System.out.println("send msg:"+msg); 24 //关闭连接 25 channel.close(); 26 connection.close(); 27 } 28 }
2.消费者一
1 package com.mq.PubSubFanout; 2 3 import com.mq.utils.ConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 8 public class FanoutReceive1 { 9 private static final String EXCHANGE_NAME="text_exchange_fanout"; 10 private static final String QUENE_NAME="test_fanout_queue_email"; 11 public static void main(String[] args)throws Exception{ 12 //获取一个连接 13 Connection connection = ConnectionUtil.getConnection(); 14 //创建通道 15 final Channel channel = connection.createChannel(); 16 //创建队列声明 17 channel.queueDeclare(QUENE_NAME,false,false,false,null); 18 19 //绑定交换机 20 channel.queueBind(QUENE_NAME,EXCHANGE_NAME,""); 21 22 //一次只能发送一个消息 23 channel.basicQos(1); 24 25 //创建消费者 26 DefaultConsumer consumer=new DefaultConsumer(channel){ 27 @Override 28 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 29 String msg=new String(body,"utf-8"); 30 System.out.println("[1]receive msg:"+msg); 31 try { 32 Thread.sleep(200); 33 } catch (InterruptedException e) { 34 e.printStackTrace(); 35 }finally { 36 System.out.println("done"); 37 //手动应答 38 channel.basicAck(envelope.getDeliveryTag(),false); 39 } 40 } 41 }; 42 //监听队列,不是自动应答 43 boolean autoAck=false; 44 channel.basicConsume(QUENE_NAME,autoAck,consumer); 45 } 46 }
3.消费者二
1 package com.mq.PubSubFanout; 2 3 import com.mq.utils.ConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 8 public class FanoutReceive2 { 9 private static final String EXCHANGE_NAME="text_exchange_fanout"; 10 private static final String QUENE_NAME="test_fanout_queue_ems"; 11 public static void main(String[] args)throws Exception{ 12 //获取一个连接 13 Connection connection = ConnectionUtil.getConnection(); 14 //创建通道 15 final Channel channel = connection.createChannel(); 16 //创建队列声明 17 channel.queueDeclare(QUENE_NAME,false,false,false,null); 18 19 //绑定交换机 20 channel.queueBind(QUENE_NAME,EXCHANGE_NAME,""); 21 22 //一次只能发送一个消息 23 channel.basicQos(1); 24 25 //创建消费者 26 DefaultConsumer consumer=new DefaultConsumer(channel){ 27 @Override 28 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 29 String msg=new String(body,"utf-8"); 30 System.out.println("[2]receive msg:"+msg); 31 try { 32 Thread.sleep(200); 33 } catch (InterruptedException e) { 34 e.printStackTrace(); 35 }finally { 36 System.out.println("done"); 37 //手动应答 38 channel.basicAck(envelope.getDeliveryTag(),false); 39 } 40 } 41 }; 42 //监听队列,不是自动应答 43 boolean autoAck=false; 44 channel.basicConsume(QUENE_NAME,autoAck,consumer); 45 } 46 }
4.效果
send:
receive1:
receive2:
5.运行注意点
如果之间运行receive类,会发现报错,因为没有交换机。
所以,可以先运行send类,虽然交换机不能存储发送的消息,但是可以创建交换机。
然后,就可以按照原来的方式。
先启动两个消费者进行监听,然后启动生产者。
现象:就是消费者都获取到了生产者生产的消息。
以上是关于MQ的订阅模式的主要内容,如果未能解决你的问题,请参考以下文章
aliyun-ons (node版)之 集群模式下模拟广播消费
ActiveMQ简单简绍(“点对点通讯”和 “发布订阅模式”)