RabbitMQ学习笔记4-使用fanout交换器

Posted 自行车上的程序员

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ学习笔记4-使用fanout交换器相关的知识,希望对你有一定的参考价值。

fanout交换器会把发送给它的所有消息发送给绑定在它上面的队列,起到广播一样的效果。

本里使用实际业务中常见的例子,

订单系统:创建订单,然后发送一个事件消息

积分系统:发送订单的积分奖励

短信平台:发送订单的短信

 

消息生产者SenderWithFanoutExchange

 1 package com.yzl.test3;
 2 
 3 import java.util.Date;
 4 
 5 import com.google.gson.Gson;
 6 import com.rabbitmq.client.Channel;
 7 import com.rabbitmq.client.Connection;
 8 import com.rabbitmq.client.ConnectionFactory;
 9 
10 /**
11  *    使用fanout交换器产生事件,消费者订阅事件做相应的处理
12  * @author: yzl
13  * @date: 2016-10-22
14  */
15 public class SenderWithFanoutExchange {
16     //交换器名称
17     private static final String EXCHANGE_NAME = "myFanoutExchange";
18     
19     public static void main(String[] args) throws Exception {
20         //连接到rabbitmq服务器
21         ConnectionFactory factory = new ConnectionFactory();
22         factory.setHost("localhost");
23         Connection connection = factory.newConnection();
24         //创建一个信道
25         final Channel channel = connection.createChannel();
26         //定义一个名字为topicExchange的fanout类型的exchange
27         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
28         
29       //创建一个时间的Event对象
30         EventObj createOrderEvent = null;
31         for(int i=1; i<10; i++){
32             createOrderEvent = new EventObj();
33             createOrderEvent.setUserId(Long.valueOf(i));
34             createOrderEvent.setCreateTime(new Date());
35             createOrderEvent.setEventType("create_order");
36             //转成JSON
37             String msg = new Gson().toJson(createOrderEvent);
38             
39             System.out.println("send msg:" + msg);
40             
41             //使用order_event路由键来发送该事件消息
42             channel.basicPublish(EXCHANGE_NAME, "order_event", null, msg.getBytes());
43             
44             Thread.sleep(1000);
45         }
46         
47         channel.close();
48         connection.close();
49     }
50 }

消费消费者ReceiverWithFanoutExchange

 1 package com.yzl.test3;
 2 
 3 import java.io.IOException;
 4 
 5 import com.rabbitmq.client.Channel;
 6 import com.rabbitmq.client.Connection;
 7 import com.rabbitmq.client.ConnectionFactory;
 8 import com.rabbitmq.client.DefaultConsumer;
 9 import com.rabbitmq.client.Envelope;
10 import com.rabbitmq.client.AMQP.BasicProperties;
11 
12 /**
13  * 使用fanout交换器接收订单事件消息
14  * 
15  * @author: yzl
16  * @date: 2016-10-22
17  */
18 public class ReceiverWithFanoutExchange {
19     // 交换器名称
20     private static final String EXCHANGE_NAME = "myFanoutExchange";
21     //接收订单事件并发放积分的队列
22     private static final String QUEUE_ORDER_REWARD_POINTS = "rewardOrderPoints";
23     //发放订单积分的路由键
24     private static final String ROUTING_KEY_ORDER_POINTS = "reward_order_points";
25     //接收订单事件并发短信的队列
26     private static final String QUEUE_ORDER_SEND_SMS = "sendOrderSms";
27     //发送订单短信的路由键
28     private static final String ROUTING_KEY_ORDER_SMS = "send_order_sms";
29     
30     private static Channel channel = null;
31     
32     static{
33         try{
34             // 连接到rabbitmq服务器
35             ConnectionFactory factory = new ConnectionFactory();
36             factory.setHost("localhost");
37             Connection connection = factory.newConnection();
38             // 创建一个信道
39             channel = connection.createChannel();
40             // 定义一个名字为myFanoutExchange的fanout类型的exchange
41             channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
42         }catch (Exception e) {
43             // TODO: handle exception
44         }
45     }
46     
47     /**
48      * 发放订单的积分奖励
49      */
50     public static void rewardPoints() throws Exception {
51         channel.queueDeclare(QUEUE_ORDER_REWARD_POINTS, false, false, false, null);
52         channel.queueBind(QUEUE_ORDER_REWARD_POINTS, EXCHANGE_NAME, ROUTING_KEY_ORDER_POINTS);
53         
54         channel.basicConsume(QUEUE_ORDER_REWARD_POINTS, true, new DefaultConsumer(channel){
55             @Override
56             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
57                     throws IOException {
58                 String msg = new String(body);
59                 System.out.println("积分系统接收到订单创建的事件消息 :" + msg);
60                 System.out.println("准备发放积分.....");
61             }
62         });
63     }
64     
65     /**
66      * 发送订单成功的短信
67      */
68     public static void sendSms() throws Exception {
69         channel.queueDeclare(QUEUE_ORDER_SEND_SMS, false, false, false, null);
70         channel.queueBind(QUEUE_ORDER_SEND_SMS, EXCHANGE_NAME, ROUTING_KEY_ORDER_SMS);
71         
72         channel.basicConsume(QUEUE_ORDER_REWARD_POINTS, true, new DefaultConsumer(channel){
73             @Override
74             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
75                     throws IOException {
76                 String msg = new String(body);
77                 System.out.println("短信平台接收到订单创建的事件消息 :" + msg);
78                 System.out.println("准备发送短信.....");
79             }
80         });
81     }
82 
83     public static void main(String[] args) throws Exception {
84         rewardPoints();
85         sendSms();
86     }
87 }

运行结果输出:

1 send msg:{"userId":1,"createTime":"Oct 22, 2016 10:54:04 PM","eventType":"create_order"}
2 send msg:{"userId":2,"createTime":"Oct 22, 2016 10:54:05 PM","eventType":"create_order"}
3 send msg:{"userId":3,"createTime":"Oct 22, 2016 10:54:06 PM","eventType":"create_order"}
4 send msg:{"userId":4,"createTime":"Oct 22, 2016 10:54:07 PM","eventType":"create_order"}
5 send msg:{"userId":5,"createTime":"Oct 22, 2016 10:54:08 PM","eventType":"create_order"}
6 send msg:{"userId":6,"createTime":"Oct 22, 2016 10:54:09 PM","eventType":"create_order"}
7 send msg:{"userId":7,"createTime":"Oct 22, 2016 10:54:10 PM","eventType":"create_order"}
8 send msg:{"userId":8,"createTime":"Oct 22, 2016 10:54:11 PM","eventType":"create_order"}
9 send msg:{"userId":9,"createTime":"Oct 22, 2016 10:54:12 PM","eventType":"create_order"}
 1 积分系统接收到订单创建的事件消息 :{"userId":1,"createTime":"Oct 22, 2016 10:54:04 PM","eventType":"create_order"}
 2 准备发放积分.....
 3 短信平台接收到订单创建的事件消息 :{"userId":2,"createTime":"Oct 22, 2016 10:54:05 PM","eventType":"create_order"}
 4 准备发送短信.....
 5 积分系统接收到订单创建的事件消息 :{"userId":3,"createTime":"Oct 22, 2016 10:54:06 PM","eventType":"create_order"}
 6 准备发放积分.....
 7 短信平台接收到订单创建的事件消息 :{"userId":4,"createTime":"Oct 22, 2016 10:54:07 PM","eventType":"create_order"}
 8 准备发送短信.....
 9 积分系统接收到订单创建的事件消息 :{"userId":5,"createTime":"Oct 22, 2016 10:54:08 PM","eventType":"create_order"}
10 准备发放积分.....
11 短信平台接收到订单创建的事件消息 :{"userId":6,"createTime":"Oct 22, 2016 10:54:09 PM","eventType":"create_order"}
12 准备发送短信.....
13 积分系统接收到订单创建的事件消息 :{"userId":7,"createTime":"Oct 22, 2016 10:54:10 PM","eventType":"create_order"}
14 准备发放积分.....
15 短信平台接收到订单创建的事件消息 :{"userId":8,"createTime":"Oct 22, 2016 10:54:11 PM","eventType":"create_order"}
16 准备发送短信.....
17 积分系统接收到订单创建的事件消息 :{"userId":9,"createTime":"Oct 22, 2016 10:54:12 PM","eventType":"create_order"}
18 准备发放积分.....

以上是关于RabbitMQ学习笔记4-使用fanout交换器的主要内容,如果未能解决你的问题,请参考以下文章

等待 fanout 交换上的所有 rabbitmq 响应?

RabbitMQ学习笔记

spring boot整合RabbitMQ(Fanout模式)

RabbitMQ 交换器持久化

RabbitMQ——使用Exchange中的fanout交换机实现消息发送和接收

RabbitMQ——使用Exchange中的fanout交换机实现消息发送和接收