rabbitmq队列清理
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rabbitmq队列清理相关的知识,希望对你有一定的参考价值。
参考技术A 如果队列拥塞,紧急情况下,为恢复业务,不得不进行队列清理(需要确认清楚,数据丢失的后果是你能承受的).在web管理端可以进行操作,选择到要操作的队列,下拉里面有一个Purge(不要选到delete)
消息队列RabbitMQ
1.RabbitMQ简介
MQ全称为Message Queue,即消息队列, RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开 发中应用非常广泛。RabbitMQ官方地址:http://www.rabbitmq.com/
市场上其他消息队列:ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ、Redis。
erlang:一种支持高并发的语言
AMQP:AMQP是一套公开的消息队列协议,最早在2003年被提出,它旨在从协议层定义消息通信数据的标准格式,为的就是解决MQ市场上协议不统一的问题。RabbitMQ就是遵循AMQP标准协议开发的MQ服务。
JMS:JMS是java提供的一套消息服务API标准,其目的是为所有的java应用程序提供统一的消息通信的标准,类似java的 jdbc,只要遵循jms标准的应用程序之间都可以进行消息通信。它和AMQP有什么 不同,jms是java语言专属的消息服务标准,它是在api层定义标准,并且只能用于java应用;而AMQP是在协议层定义的标准,是跨语言的 。
应用场景:
1、任务异步处理。 将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。
2、应用程序解耦合
MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。
优点:
1、使得简单,功能强大。
2、基于AMQP协议。
3、社区活跃,文档完善。
4、高并发性能好,这主要得益于Erlang语言。
5、Spring Boot默认已集成RabbitMQ
2.RabbitMQ基本结构:
组成部分说明如下:
Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue。
Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。
Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。
Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
消息发布接收流程:
-----发送消息-----
1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列)
----接收消息-----
1、消费者和Broker建立TCP连接
2、消费者和Broker建立通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息。
3.RabbitMQ基本使用
Rabbit研究-入门程序·生成者
创建springBoot工程,引入RabbitMQ客户端的起步依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
向RabbitMQ发送一个消息
public class RabbitMQTest { //队列 private static final String QUEUE = "helloworld"; @Test public void test01(){ //通过连接工厂创建新的连接和mq建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.72.128"); connectionFactory.setPort(5672);//端口 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //建立新连接 connection = connectionFactory.newConnection(); //创建会话通道,生产者和mq服务所有通信都在channel通道中完成 channel = connection.createChannel(); //声明队列,如果队列在mq 中没有则要创建 //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments /** * 参数明细 * 1、queue 队列名称 * 2、durable 是否持久化,如果持久化,mq重启后队列还在 * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建 * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除) * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间 */ channel.queueDeclare(QUEUE,true,false,false,null); //发送消息 //参数:String exchange, String routingKey, BasicProperties props, byte[] body /** * 参数明细: * 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"") * 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称 * 3、props,消息的属性 * 4、body,消息内容 */ //消息内容 String message = "hello world 黑马程序员"; channel.basicPublish("",QUEUE,null,message.getBytes()); System.out.println("send to mq "+message); } catch (Exception e) { e.printStackTrace(); } finally { //关闭连接 //先关闭通道 try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }
Rabbit研究-入门程序·消费者
注意:在消费者中不需要关闭通道,不然程序无法实时监听消息队列的消息
public class RabbitMQTestConsumer { //队列 private static final String QUEUE = "helloworld"; public static void main(String[] args) { //通过连接工厂创建新的连接和mq建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.72.128"); connectionFactory.setPort(5672);//端口 // connectionFactory.setUsername("guest"); // connectionFactory.setPassword("guest"); //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try{ //建立新连接 connection = connectionFactory.newConnection(); //创建会话通道,生产者和mq服务所有通信都在channel通道中完成 channel = connection.createChannel(); //声明队列,如果队列在mq 中没有则要创建 //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments /** * 参数明细 * 1、queue 队列名称 * 2、durable 是否持久化,如果持久化,mq重启后队列还在 * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建 * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除) * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间 */ channel.queueDeclare(QUEUE,true,false,false,null); //定义消费方法 DefaultConsumer consumer=new DefaultConsumer(channel){ /** *消费者接收消息调用此方法 * @param consumerTag 消费者的标签,在channel.basicConsume()去指定 * * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志 (收到消息失败后是否需要重新发送) * @param properties * @param body * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交换机 String exchange = envelope.getExchange(); //路由key String routingKey = envelope.getRoutingKey(); //消息id long deliveryTag = envelope.getDeliveryTag(); //消息内容 String msg = new String(body,"utf-8"); System.out.println("receive message.." + msg); } }; /** * 监听队列String queue, boolean autoAck,Consumer callback * 参数明细 * 1、队列名称 * 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置 为false则需要手动回复 * 3、消费消息的方法,消费者接收到消息后调用此方法 */ channel.basicConsume(QUEUE, true, consumer); } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }finally { //这里不能关闭通道,因为关闭之后,消费者就不能实时监听消息队列 //channel.close(); } } }
4.RabbitMQ有以下几种工作模式 :
1、Work queues
2、Publish/Subscribe
3、Routing
4、Topics
5、Header
6、RPC
Java声明交换机类型:
//声明一个交换机 //参数:String exchange, String type /** * 参数明细: * 1、交换机的名称 * 2、交换机的类型 * fanout:对应的rabbitmq的工作模式是 publish/subscribe * direct:对应的Routing 工作模式 * topic:对应的Topics工作模式 * headers: 对应的headers工作模式 */ channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
4.1work工作模式(资源竞争)
工作模式图:
work queues与入门程序相比,多了一个消费端,两个消费端共同消费同一个队列中的消息。
应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
测试:
1、使用入门程序,启动多个消费者。
2、生产者发送多个消息。
结果:
1、一条消息只会被一个消费者接收;
2、rabbit采用轮询的方式将消息是平均发送给消费者的;
3、消费者在处理完某条消息后,才会收到下一条消息。
4.2publish/subscribe发布订阅(共享资源)
工作模式图:
发布订阅模式:
1、每个消费者监听自己的队列。
2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息
3.如果一个队列有多个消费者,相对于一个队列来说就会变成work工作模式,竞争资源
所以说发布订阅模式功能更强大
生产者代码:
一个交换机中所有队列发送的是同样的消息指令。
public class RabbitMQTestPublic { //队列名称 private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; private static final String QUEUE_INFORM_SMS = "queue_inform_sms"; private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform"; public static void main(String[] args) { //通过连接工厂创建新的连接和mq建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672);//端口 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //建立新连接 connection = connectionFactory.newConnection(); //创建会话通道,生产者和mq服务所有通信都在channel通道中完成 channel = connection.createChannel(); //声明队列,如果队列在mq 中没有则要创建 //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments /** * 参数明细 * 1、queue 队列名称 * 2、durable 是否持久化,如果持久化,mq重启后队列还在 * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建 * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除) * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间 */ channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null); channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null); //声明一个交换机 //参数:String exchange, String type /** * 参数明细: * 1、交换机的名称 * 2、交换机的类型 * fanout:对应的rabbitmq的工作模式是 publish/subscribe * direct:对应的Routing 工作模式 * topic:对应的Topics工作模式 * headers: 对应的headers工作模式 */ channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT); //进行交换机和队列绑定 //参数:String queue, String exchange, String routingKey /** * 参数明细: * 1、queue 队列名称 * 2、exchange 交换机名称 * 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串 */ channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,""); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,""); //发送消息 //参数:String exchange, String routingKey, BasicProperties props, byte[] body /** * 参数明细: * 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"") * 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称 * 3、props,消息的属性 * 4、body,消息内容 */ for(int i=0;i<5;i++){ //消息内容 String message = "send inform message to user"; channel.basicPublish(EXCHANGE_FANOUT_INFORM,"",null,message.getBytes()); System.out.println("send to mq "+message); } } catch (Exception e) { e.printStackTrace(); } finally { //关闭连接 //先关闭通道 try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }
两个消费者
Email消费者:
public class Consumer02_subscribe_email { //队列名称 private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform"; public static void main(String[] args) throws IOException, TimeoutException { //通过连接工厂创建新的连接和mq建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.72.128"); connectionFactory.setPort(5672);//端口 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq connectionFactory.setVirtualHost("/"); //建立新连接 Connection connection = connectionFactory.newConnection(); //创建会话通道,生产者和mq服务所有通信都在channel通道中完成 Channel channel = connection.createChannel(); /** * 参数明细 * 1、queue 队列名称 * 2、durable 是否持久化,如果持久化,mq重启后队列还在 * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建 * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除) * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间 */ channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null); //声明一个交换机 //参数:String exchange, String type /** * 参数明细: * 1、交换机的名称 * 2、交换机的类型 * fanout:对应的rabbitmq的工作模式是 publish/subscribe * direct:对应的Routing 工作模式 * topic:对应的Topics工作模式 * headers: 对应的headers工作模式 */ channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT); //进行交换机和队列绑定 //参数:String queue, String exchange, String routingKey /** * 参数明细: * 1、queue 队列名称 * 2、exchange 交换机名称 * 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串 */ channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_FANOUT_INFORM, ""); //实现消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * 当接收到消息后此方法将被调用 * @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume * @param envelope 信封,通过envelope * @param properties 消息属性 * @param body 消息内容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交换机 String exchange = envelope.getExchange(); //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收 long deliveryTag = envelope.getDeliveryTag(); //消息内容 String message= new String(body,"utf-8"); System.out.println("receive message:"+message); } }; //监听队列 //参数:String queue, boolean autoAck, Consumer callback /** * 参数明细: * 1、queue 队列名称 * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复 * 3、callback,消费方法,当消费者接收到消息要执行的方法 */ channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer); } }
SMS消费者:
public class Consumer02_subscribe_sms { //队列名称 private static final String QUEUE_INFORM_SMS = "queue_inform_sms"; private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform"; public static void main(String[] args) throws IOException, TimeoutException { //通过连接工厂创建新的连接和mq建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.72.128"); connectionFactory.setPort(5672);//端口 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq connectionFactory.setVirtualHost("/"); //建立新连接 Connection connection = connectionFactory.newConnection(); //创建会话通道,生产者和mq服务所有通信都在channel通道中完成 Channel channel = connection.createChannel(); /** * 参数明细 * 1、queue 队列名称 * 2、durable 是否持久化,如果持久化,mq重启后队列还在 * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建 * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除) * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间 */ channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null); //声明一个交换机 //参数:String exchange, String type /** * 参数明细: * 1、交换机的名称 * 2、交换机的类型 * fanout:对应的rabbitmq的工作模式是 publish/subscribe * direct:对应的Routing 工作模式 * topic:对应的Topics工作模式 * headers: 对应的headers工作模式 */ channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT); //进行交换机和队列绑定 //参数:String queue, String exchange, String routingKey /** * 参数明细: * 1、queue 队列名称 * 2、exchange 交换机名称 * 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串 */ channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_FANOUT_INFORM, ""); //实现消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * 当接收到消息后此方法将被调用 * @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume * @param envelope 信封,通过envelope * @param properties 消息属性 * @param body 消息内容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交换机 String exchange = envelope.getExchange(); //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收 long deliveryTag = envelope.getDeliveryTag(); //消息内容 String message= new String(body,"utf-8"); System.out.println("receive message:"+message); } }; //监听队列 //参数:String queue, boolean autoAck, Consumer callback /** * 参数明细: * 1、queue 队列名称 * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复 * 3、callback,消费方法,当消费者接收到消息要执行的方法 */ channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer); } }
思考:
1、publish/subscribe与work queues有什么区别。
区别:
1)work queues不用定义交换机,而publish/subscribe需要定义交换机。
2)publish/subscribe的生产方是面向交换机发送消息,work queues的生产方是面向队列发送消息(底层使用默认交换机)。
3)publish/subscribe需要设置队列和交换机的绑定,work queues不需要设置,实质上work queues会将队列绑 定到默认的交换机 。
相同点:
所以两者实现的发布/订阅的效果是一样的,多个消费端监听同一个队列不会重复消费消息。
2、实质工作用什么 publish/subscribe还是work queues。
建议使用 publish/subscribe,发布订阅模式比工作队列模式更强大,并且发布订阅模式可以指定自己专用的交换机。
4.3routing路由模式
工作模式图:
路由模式:
1、每个消费者监听自己的队列,并且设置routingkey。
2、生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列。
3.如果当每个队列有一个相同的routingkey的时候,此时routing路由模式,就相当于发布订阅模式发送的队列消息,每一个队列都会接收并发送
生产者代码:
public class Producer03_routing { //队列名称 private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; private static final String QUEUE_INFORM_SMS = "queue_inform_sms"; private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform"; private static final String ROUTINGKEY_EMAIL="inform_email"; private static final String ROUTINGKEY_SMS="inform_sms"; public static void main(String[] args) { //通过连接工厂创建新的连接和mq建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.72.128"); connectionFactory.setPort(5672);//端口 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //建立新连接 connection = connectionFactory.newConnection(); //创建会话通道,生产者和mq服务所有通信都在channel通道中完成 channel = connection.createChannel(); //声明队列,如果队列在mq 中没有则要创建 //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments /** * 参数明细 * 1、queue 队列名称 * 2、durable 是否持久化,如果持久化,mq重启后队列还在 * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建 * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除) * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间 */ channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null); channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null); //声明一个交换机 //参数:String exchange, String type /** * 参数明细: * 1、交换机的名称 * 2、交换机的类型 * fanout:对应的rabbitmq的工作模式是 publish/subscribe * direct:对应的Routing 工作模式 * topic:对应的Topics工作模式 * headers: 对应的headers工作模式 */ channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); //进行交换机和队列绑定 //参数:String queue, String exchange, String routingKey /** * 参数明细: * 1、queue 队列名称 * 2、exchange 交换机名称 * 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串 */ channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL); //channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,"inform"); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS); //channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,"inform"); //发送消息 //参数:String exchange, String routingKey, BasicProperties props, byte[] body /** * 参数明细: * 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"") * 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称 * 3、props,消息的属性 * 4、body,消息内容 */ for(int i=0;i<5;i++){ //发送消息的时候指定routingKey String message = "send email inform message to user"; channel.basicPublish(EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL,null,message.getBytes()); System.out.println("send to mq "+message); } for(int i=0;i<5;i++){ //发送消息的时候指定routingKey String message = "send sms inform message to user"; channel.basicPublish(EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS,null,message.getBytes()); System.out.println("send to mq "+message); } // for(int i=0;i<5;i++){ // //发送消息的时候指定routingKey // String message = "send inform message to user"; // channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform",null,message.getBytes()); // System.out.println("send to mq "+message); // } } catch (Exception e) { e.printStackTrace(); } finally { //关闭连接 //先关闭通道 try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }
两个消费者代码
Email消费者:
public class Consumer03_routing_email { //队列名称 private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform"; private static final String ROUTINGKEY_EMAIL="inform_email"; public static void main(String[] args) throws IOException, TimeoutException { //通过连接工厂创建新的连接和mq建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.72.128"); connectionFactory.setPort(5672);//端口 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq connectionFactory.setVirtualHost("/"); //建立新连接 Connection connection = connectionFactory.newConnection(); //创建会话通道,生产者和mq服务所有通信都在channel通道中完成 Channel channel = connection.createChannel(); /** * 参数明细 * 1、queue 队列名称 * 2、durable 是否持久化,如果持久化,mq重启后队列还在 * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建 * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除) * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间 */ channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null); //声明一个交换机 //参数:String exchange, String type /** * 参数明细: * 1、交换机的名称 * 2、交换机的类型 * fanout:对应的rabbitmq的工作模式是 publish/subscribe * direct:对应的Routing 工作模式 * topic:对应的Topics工作模式 * headers: 对应的headers工作模式 */ channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); //进行交换机和队列绑定 //参数:String queue, String exchange, String routingKey /** * 参数明细: * 1、queue 队列名称 * 2、exchange 交换机名称 * 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串 */ channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL); channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM,"inform"); //实现消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * 当接收到消息后此方法将被调用 * @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume * @param envelope 信封,通过envelope * @param properties 消息属性 * @param body 消息内容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交换机 String exchange = envelope.getExchange(); //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收 long deliveryTag = envelope.getDeliveryTag(); //消息内容 String message= new String(body,"utf-8"); System.out.println("receive message:"+message); } }; //监听队列 //参数:String queue, boolean autoAck, Consumer callback /** * 参数明细: * 1、queue 队列名称 * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复 * 3、callback,消费方法,当消费者接收到消息要执行的方法 */ channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer); } }
SMS消费者代码:
public class Consumer03_routing_sms { //队列名称 private static final String QUEUE_INFORM_SMS = "queue_inform_sms"; private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform"; private static final String ROUTINGKEY_SMS="inform_sms"; public static void main(String[] args) throws IOException, TimeoutException { //通过连接工厂创建新的连接和mq建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.72.128"); connectionFactory.setPort(5672);//端口 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq connectionFactory.setVirtualHost("/"); //建立新连接 Connection connection = connectionFactory.newConnection(); //创建会话通道,生产者和mq服务所有通信都在channel通道中完成 Channel channel = connection.createChannel(); /** * 参数明细 * 1、queue 队列名称 * 2、durable 是否持久化,如果持久化,mq重启后队列还在 * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建 * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除) * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间 */ channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null); //声明一个交换机 //参数:String exchange, String type /** * 参数明细: * 1、交换机的名称 * 2、交换机的类型 * fanout:对应的rabbitmq的工作模式是 publish/subscribe * direct:对应的Routing 工作模式 * topic:对应的Topics工作模式 * headers: 对应的headers工作模式 */ channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); //进行交换机和队列绑定 //参数:String queue, String exchange, String routingKey /** * 参数明细: * 1、queue 队列名称 * 2、exchange 交换机名称 * 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串 */ channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS); channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_ROUTING_INFORM,"inform"); //实现消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * 当接收到消息后此方法将被调用 * @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume * @param envelope 信封,通过envelope * @param properties 消息属性 * @param body 消息内容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交换机 String exchange = envelope.getExchange(); //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收 long deliveryTag = envelope.getDeliveryTag(); //消息内容 String message= new String(body,"utf-8"); System.out.println("receive message:"+message); } }; //监听队列 //参数:String queue, boolean autoAck, Consumer callback /** * 参数明细: * 1、queue 队列名称 * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复 * 3、callback,消费方法,当消费者接收到消息要执行的方法 */ channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer); } }
思考:
1、Routing模式和Publish/subscibe有啥区别?
Routing模式要求队列在绑定交换机时要指定routingkey,消息会转发到符合routingkey的队列。
4.4topic 主题模式(路由模式的一种)
工作模式图:
主题路由模式:
1.这种模式下需要RouteKey,客户端要提前绑定Exchange与Queue。
2.如果Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息。
3.客户端在进行绑定时,要提供一个该队列“感兴趣”的主题,如“#.log.#” 表示该队列关心所有涉及log的消息(一个RouteKey为”MQ.log.error”的 消息会被转发到该队列)。
4.“#”表示0个或若干个关键字,“*”表示一个关键字。如“log.*”能 与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#” 能与上述两者匹配
生产者代码:
public class Producer04_topics { //队列名称 private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; private static final String QUEUE_INFORM_SMS = "queue_inform_sms"; private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform"; private static final String ROUTINGKEY_EMAIL="inform.#.email.#"; private static final String ROUTINGKEY_SMS="inform.#.sms.#"; public static void main(String[] args) { //通过连接工厂创建新的连接和mq建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.56.101"); connectionFactory.setPort(5672);//端口 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //建立新连接 connection = connectionFactory.newConnection(); //创建会话通道,生产者和mq服务所有通信都在channel通道中完成 channel = connection.createChannel(); //声明队列,如果队列在mq 中没有则要创建 //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments /** * 参数明细 * 1、queue 队列名称 * 2、durable 是否持久化,如果持久化,mq重启后队列还在 * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建 * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除) * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间 */ channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null); channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null); //声明一个交换机 //参数:String exchange, String type /** * 参数明细: * 1、交换机的名称 * 2、交换机的类型 * fanout:对应的rabbitmq的工作模式是 publish/subscribe * direct:对应的Routing 工作模式 * topic:对应的Topics工作模式 * headers: 对应的headers工作模式 */ channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC); //进行交换机和队列绑定 //参数:String queue, String exchange, String routingKey /** * 参数明细: * 1、queue 队列名称 * 2、exchange 交换机名称 * 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串 */ channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_SMS); //发送消息 //参数:String exchange, String routingKey, BasicProperties props, byte[] body /** * 参数明细: * 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"") * 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称 * 3、props,消息的属性 * 4、body,消息内容 */ for(int i=0;i<5;i++){ //发送消息的时候指定routingKey String message = "send email inform message to user"; channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.email",null,message.getBytes()); System.out.println("send to mq "+message); } for(int i=0;i<5;i++){ //发送消息的时候指定routingKey String message = "send sms inform message to user"; channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms",null,message.getBytes()); System.out.println("send to mq "+message); } for(int i=0;i<5;i++){ //发送消息的时候指定routingKey String message = "send sms and email inform message to user"; channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms.email",null,message.getBytes()); System.out.println("send to mq "+message); } } catch (Exception e) { e.printStackTrace(); } finally { //关闭连接 //先关闭通道 try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }
消费者代码:
public class Consumer04_topics_email { //队列名称 private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform"; private static final String ROUTINGKEY_EMAIL="inform.#.email.#"; public static void main(String[] args) throws IOException, TimeoutException { //通过连接工厂创建新的连接和mq建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.56.101"); connectionFactory.setPort(5672);//端口 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq connectionFactory.setVirtualHost("/"); //建立新连接 Connection connection = connectionFactory.newConnection(); //创建会话通道,生产者和mq服务所有通信都在channel通道中完成 Channel channel = connection.createChannel(); /** * 参数明细 * 1、queue 队列名称 * 2、durable 是否持久化,如果持久化,mq重启后队列还在 * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建 * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除) * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间 */ channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null); //声明一个交换机 //参数:String exchange, String type /** * 参数明细: * 1、交换机的名称 * 2、交换机的类型 * fanout:对应的rabbitmq的工作模式是 publish/subscribe * direct:对应的Routing 工作模式 * topic:对应的Topics工作模式 * headers: 对应的headers工作模式 */ channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC); //进行交换机和队列绑定 //参数:String queue, String exchange, String routingKey /** * 参数明细: * 1、queue 队列名称 * 2、exchange 交换机名称 * 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串 */ channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL); //实现消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * 当接收到消息后此方法将被调用 * @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume * @param envelope 信封,通过envelope * @param properties 消息属性 * @param body 消息内容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交换机 String exchange = envelope.getExchange(); //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收 long deliveryTag = envelope.getDeliveryTag(); //消息内容 String message= new String(body,"utf-8"); System.out.println("receive message:"+message); } }; //监听队列 //参数:String queue, boolean autoAck, Consumer callback /** * 参数明细: * 1、queue 队列名称 * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复 * 3、callback,消费方法,当消费者接收到消息要执行的方法 */ channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer); } }
public class Consumer04_topics_sms { //队列名称 private static final String QUEUE_INFORM_SMS = "queue_inform_sms"; private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform"; private static final String ROUTINGKEY_SMS="inform.#.sms.#"; public static void main(String[] args) throws IOException, TimeoutException { //通过连接工厂创建新的连接和mq建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.56.101"); connectionFactory.setPort(5672);//端口 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq connectionFactory.setVirtualHost("/"); //建立新连接 Connection connection = connectionFactory.newConnection(); //创建会话通道,生产者和mq服务所有通信都在channel通道中完成 Channel channel = connection.createChannel(); /** * 参数明细 * 1、queue 队列名称 * 2、durable 是否持久化,如果持久化,mq重启后队列还在 * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建 * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除) * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间 */ channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null); //声明一个交换机 //参数:String exchange, String type /** * 参数明细: * 1、交换机的名称 * 2、交换机的类型 * fanout:对应的rabbitmq的工作模式是 publish/subscribe * direct:对应的Routing 工作模式 * topic:对应的Topics工作模式 * headers: 对应的headers工作模式 */ channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC); //进行交换机和队列绑定 //参数:String queue, String exchange, String routingKey /** * 参数明细: * 1、queue 队列名称 * 2、exchange 交换机名称 * 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串 */ channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_TOPICS_INFORM,ROUTINGKEY_SMS); //实现消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * 当接收到消息后此方法将被调用 * @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume * @param envelope 信封,通过envelope * @param properties 消息属性 * @param body 消息内容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交换机 String exchange = envelope.getExchange(); //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收 long deliveryTag = envelope.getDeliveryTag(); //消息内容 String message= new String(body,"utf-8"); System.out.println("receive message:"+message); } }; //监听队列 //参数:String queue, boolean autoAck, Consumer callback /** * 参数明细: * 1、queue 队列名称 * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复 * 3、callback,消费方法,当消费者接收到消息要执行的方法 */ channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer); } }
思考:
1、本案例的需求使用Routing工作模式能否实现?
使用Routing模式也可以实现本案例,共设置三个 routingkey,分别是email、sms、all,email队列绑定email和all,sms队列绑定sms和all,这样就可以实现上边案例的功能,实现过程比topics复杂。
Topic模式更多加强大,它可以实现Routing、publish/subscirbe模式的功能。
4.5Header模式
header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配 队列。
案例:
根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,设置接收sms的用户只接收sms,设置两种通知类型都接收的则两种通知都有效。
队列与交换机绑定
Map<String, Object> headers_email = new Hashtable<String, Object>(); headers_email.put("inform_type", "email"); Map<String, Object> headers_sms = new Hashtable<String, Object>(); headers_sms.put("inform_type", "sms"); channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_HEADERS_INFORM,"",headers_sms);
生产者
String message = "email inform to user"+i; Map<String,Object> headers = new Hashtable<String, Object>(); headers.put("inform_type", "email");//匹配email通知消费者绑定的header //headers.put("inform_type", "sms");//匹配sms通知消费者绑定的header AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder(); properties.headers(headers); //Email通知 channel.basicPublish(EXCHANGE_HEADERS_INFORM, "", properties.build(), message.getBytes());
4.6RPC模式
工作模式:
RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:
1、客户端即是生产者就是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。
2、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果
3、服务端将RPC方法 的结果发送到RPC响应队列
4、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。
以上是关于rabbitmq队列清理的主要内容,如果未能解决你的问题,请参考以下文章