RabbitMQ入门与五种工作模式
Posted 风雨秋烟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ入门与五种工作模式相关的知识,希望对你有一定的参考价值。
一、MQ 的基本概念
1.1 MQ概述
MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息进行持久化的容器。多用于分布式系统之间进行通信,比如在分布式系统中,多个项目就可以使用MQ来进行通信,A项目的数据发送MQ容器中,B项目在通过MQ容器拿到消息
1.2 应用解耦
上图可以看出库存、支付、物流、都依赖于订单系统,如果库存系统挂了势必会影响订单系统,如果订单系统也挂了,库存、支付、物流都会挂掉,如果我增加一个X系统也依赖于订单系统,也会挂掉,耦合性太高了,下面我们用MQ来解耦
上图使用MQ解耦后,就好多嘞,订单系统的数据发送给MQ,库存、支付、物流系统直接依赖于MQ,库存挂了不影响其他系统了,因为订单、支付、物流都和MQ打交到了,哪怕我再增加一个X系统,也和MQ打交道了,库存系统恢复了后,直接也和MQ建立关系,继续运作。
1.3 异步提速
上图可以看出订单系统去DB里查询数据需要20ms,库存、支付、物流三个再去查询订单各自都需要300ms,每个系统得到结果都需要320ms。
上图使用MQ解耦后,就快多了,订单数据在DB查询出来传递给MQ,库存、支付、物流三个在DB中拿取只有5秒,每个系统得到数据只有25ms。
1.4 削峰填谷
上图比如A系统的某个接口,一次最多可以接收到1000个请求,高并发情况下每秒达到了5000个请求,这样系统肯定会崩溃的,使用MQ进行削峰填谷
上图可以看出加上了MQ后,一次5000个请求全部到达MQ里面去了,但是这种情况MQ会积压很多消息,这就叫做削峰,等到高峰期过了之后,A系统设置每秒从MQ拉取1000次,慢慢的消费完MQ里的消息,叫做填谷。
1.5 常见的 MQ 产品
1.6 RabbitMQ 简介
2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。RabbitMQ 采用 Erlang 语言开发。Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。
上图就是RabbitMq中的简介
Producer:生产者,消息的发起者
Consumer:消费者,消息的接受者
Connection:生产者和消费者与MQ服务器建立的长连接
channel:相当于轻量级的Connection,每次发送接收消息都建立一个Connection长连接太浪费资源了,就在Connection内部建立逻辑连接channel。
Broker:就是MQ服务器
Virtual host:虚拟机,相当于一个MQ服务器的一个分组,可以有多个
Exchange:交换机,消息到达MQ的第一站,消息由交换机根据routing key转发给不同的队列。
Queue:队列,消息最终由交换机到达队列等待消费者来拿取
Binding:绑定交换机和队列之间的关系
1.7 RabbitMQ 提供了 5种工作模式
RabbitMQ 提供了 5 种工作模式:简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)。
官网对应模式介绍:https://www.rabbitmq.com/getstarted.html
下面就来熟悉5种模式,先看看项目结构
首先需要创建一个maven项目,这里不再讲述如何创建maven项目了
二、五种队列模式
创建maven项目后,需要先创建RabbitMQ所需要的工具类
在com.wdp.rabbitmq.utils包下创建RabbitUtils工具类,该工具类是存放RabbitMQ账号密码虚拟机连接等
package com.wdp.rabbitmq.utils;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitUtils {
private static ConnectionFactory connectionFactory = new ConnectionFactory();
static {
connectionFactory.setHost("39.107.90.1");//服务器地址
connectionFactory.setPort(5672);//5672是RabbitMQ的默认端口号
connectionFactory.setUsername("wdp123"); //mq的账号
connectionFactory.setPassword("wdp123"); //mq的密码
connectionFactory.setVirtualHost("wdp"); //mq的虚拟机
}
/**
* 建立与mq的长连接
* @return
*/
public static Connection getConnection(){
Connection conn = null;
try {
conn = connectionFactory.newConnection();
return conn;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
在com.wdp.rabbitmq.utils包下创建RabbitConstant类定义交换机和队列的名称
package com.wdp.rabbitmq.utils;
public class RabbitConstant {
public static final String QUEUE_HELLOWORLD = "helloworld";
public static final String QUEUE_SMS = "sms";
public static final String EXCHANGE_WEATHER = "weather";
public static final String EXCHANGE_WEATHER_ROUTING = "weather_routing";
public static final String QUEUE_BAIDU = "baidu";
public static final String QUEUE_SINA = "sina";
public static final String EXCHANGE_WEATHER_TOPIC = "weather_topic";
}
2.1、简单队列
P:消费者
C:消费者
红框:队列
简单队列只有一个生产者一个消费者,发送消息过后,消费者去队列拿取消息消费
在com.wdp.rabbitmq.hellword包下创建生产者
package com.wdp.rabbitmq.hellword;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wdp.rabbitmq.utils.RabbitConstant;
import com.wdp.rabbitmq.utils.RabbitUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//获取mq建立的长连接
Connection connection = RabbitUtils.getConnection();
//创建通信通道,生产者从通道发送数据到mq服务器
Channel channel = connection.createChannel();
//创建队列,声明并创建一个队列,如果队列已经存在直接使用
//第一个参数:队列名称
//第二个参数:数据是否持久化,false对应不持久化的数据,如果数据没有被消费者消费,消费者MQ停掉数据就会丢失
//第三个参数:队列是否私有化,false代表所有消费者都可以访问,true代表只有第一次使用它的消费者才可以访问
//第四个参数:是否自动删除,false代表停掉后不自动删除这个队列
//其他额外的参数,null
channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD,false,false,false,null);
String message = "hello word";
//第一个参数:交换机,简单队列暂时不需要
//第二个参数:队列的名称
//第三个参数:额外的设置属性
//第四个参数:传递消息的字节数组
channel.basicPublish("",RabbitConstant.QUEUE_HELLOWORLD,null,message.getBytes());
channel.close();
connection.close();
System.out.println("发送成功");
}
}
在com.wdp.rabbitmq.hellword包下创建消费者
package com.wdp.rabbitmq.hellword;
import com.rabbitmq.client.*;
import com.wdp.rabbitmq.utils.RabbitConstant;
import com.wdp.rabbitmq.utils.RabbitUtils;
import java.io.IOException;
public class Consumer {
public static void main(String[] args) throws IOException {
//获取mq建立的长连接
Connection connection = RabbitUtils.getConnection();
//创建通信通道,生产者从通道发送数据到mq服务器
Channel channel = connection.createChannel();
//创建队列,声明并创建一个队列,如果队列已经存在直接使用
//第一个参数:队列名称
//第二个参数:数据是否持久化,false对应不持久化的数据,如果数据没有被消费者消费,消费者MQ停掉数据就会丢失
//第三个参数:队列是否私有化,false代表所有消费者都可以访问,true代表只有第一次使用它的消费者才可以访问
//第四个参数:是否自动删除,false代表停掉后不自动删除这个队列
//其他额外的参数,null
channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD,false,false,false,null);
//从MQ服务器中获取数据
//第一个参数:队列名
//第二个参数代表是否自动确认收到消息,false代表手动编程来确认消息,这是MQ的推荐做法
//第三个参数要传入DefaultConsumer的实现类,手动进行确认消息
channel.basicConsume(RabbitConstant.QUEUE_HELLOWORLD,false,new Reciver(channel));
}
}
class Reciver extends DefaultConsumer{
private Channel channel;
//重写构造函数,Channel通道对象需要从外层传入,在handleDelivery中要用到
public Reciver(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("消费者接收到的消息:"+message);
System.out.println("消息的TagId:"+envelope.getDeliveryTag());
//false只确认签收当前的信息,设置为true的时候则代表签收该消费者所有未签收的消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
注意:简单队列是不需要交换机的,会走默认的交换机
2.2 Work queues 工作队列模式
在这里插入图片描述
P:消费者
C1:消费者
C2:消费者
红框:队列
工作队列:是生产者投递到了队列,消费者会有多个同时去队列去。
应用场景:多个消费者同时绑定一个队列,比如一条短信要让所有人收到,就可以使用工作队列模式
在com.wdp.rabbitmq.workqueue包下创建一个SMS类用于发送消息
package com.wdp.rabbitmq.workqueue;
public class SMS {
private String name;
private String mobile;
private String content;
public SMS(String name, String mobile, String content) {
this.name = name;
this.mobile = mobile;
this.content = content;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getMobile() {
return mobile;
}
public void setMobile(String mobile) {
this.mobile = mobile;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
在com.wdp.rabbitmq.workqueue下创建OrderSystem类
package com.wdp.rabbitmq.workqueue;
import com.google.gson.Gson;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wdp.rabbitmq.utils.RabbitConstant;
import com.wdp.rabbitmq.utils.RabbitUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class OrderSystem {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
for(int i = 1 ; i <= 100 ; i++) {
SMS sms = new SMS("乘客" + i, "13900000" + i, "您的车票已预订成功");
String jsonSMS = new Gson().toJson(sms);
channel.basicPublish("" , RabbitConstant.QUEUE_SMS , null , jsonSMS.getBytes());
}
System.out.println("发送数据成功");
channel.close();
connection.close();
}
}
在com.wdp.rabbitmq.workqueue包下创建SMSSender1消费者
package com.wdp.rabbitmq.workqueue;
import com.rabbitmq.client.*;
import com.wdp.rabbitmq.utils.RabbitConstant;
import com.wdp.rabbitmq.utils.RabbitUtils;
import java.io.IOException;
/**
*
* 消费者
*/
public class SMSSender1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
//如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
//basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
channel.basicQos(1);//处理完一个取一个
channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String jsonSMS = new String(body);
System.out.println("SMSSender1-短信发送成功:" + jsonSMS);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
在com.wdp.rabbitmq.workqueue包下创建SMSSender2消费者
package com.wdp.rabbitmq.workqueue;
import com.rabbitmq.client.*;
import com.wdp.rabbitmq.utils.RabbitConstant;
import com.wdp.rabbitmq.utils.RabbitUtils;
import java.io.IOException;
/**
*
* 消费者
*/
public class SMSSender2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
//如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
//basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
channel.basicQos(1);//处理完一个取一个
channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String jsonSMS = new String(body);
System.out.println("SMSSender2-短信发送成功:" + jsonSMS);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
在com.wdp.rabbitmq.workqueue包下创建SMSSender3消费者
package com.wdp.rabbitmq.workqueue;
import com.rabbitmq.client.*;
import com.wdp.rabbitmq.utils.RabbitConstant;
import com.wdp.rabbitmq.utils.RabbitUtils;
import java.io.IOException;
/**
*
* 消费者
*/
public class SMSSender3 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
//如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
//basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
channel.basicQos(1);//处理完一个取一个
channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String jsonSMS = new String(body);
System.out.println("SMSSender3-短信发送成功:" + jsonSMS);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
注意:
channel.basicQos(1);这句代码,这句代码的意思是一个消息必须处理签收完毕在能开始下一个消息,思想就是能者多劳,多个消费者有的执行的快多消费,有的执行的慢就慢消费,工作队列是一对多,必须多个消费者绑定一个队列。
2.3 Pub/Sub 订阅模式
P:生产者
X:交换机
C1:消费者
C2:消费者
红框:队列
发布订阅模式分为
1、生产者发送消息需要指定一个交换机,先发送到交换机里
2、消费者需要建立队列和交换机的绑定关系
3、交换机将消息推送给绑定了该交换机的队列
交换机有三种模式:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key的队列
Topic:通配符,把消息交给通过#,*匹配的(路由模式)队列
发布订阅模式的使用场景:比如北京市的天气预报,新浪和百度都需要北京市的天气预报,生产者将天气的消息投递给交换机,百度的队列和新浪的队列都绑定了这个交换机,百度和新浪都可以拿到北京市的天气预报,发布订阅模式使用的是广播模式
在com.wdp.rabbitmq.pubsub包下创建生产者WeatherBureau
package com.wdp.rabbitmq.pubsub;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wdp.rabbitmq.utils.RabbitConstant;
import com.wdp.rabbitmq.utils.RabbitUtils;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class WeatherBureau {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitUtils.getConnection();
String input = new Scanner(System.in).next();
Channel channel = connection.createChannel();
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER, "", null, input.getBytes());
channel.close();
connection.close();
}
}
在com.wdp.rabbitmq.pubsub包下创建Sina
package com.wdp.rabbitmq.pubsub;
import com.rabbitmq.client.*;
import com.wdp.rabbitmq.utils.RabbitConstant;
import com.wdp.rabbitmq.utils.RabbitUtils;
import java.io.IOException;
public class Sina {
public static void main(String[] args) throws IOException {
//获取TCP长连接
Connection connection = RabbitUtils.getConnection();
//获取虚拟连接
final Channel channel = connection.createChannel();
//声明队列信息
channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);
//queueBind用于交换机和队列的绑定
//参数1: 队列名 参数2:交互机名 参数3:路由key(暂时用不到)
channel.queueBind(RabbitConstant.QUEUE_SINA,RabbitConstant.EXCHANGE_WEATHER,"");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_SINA,false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("新浪天气收到气象信息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
在com.wdp.rabbitmq.pubsub包下创建BiaDu
package com.wdp.rabbitmq.pubsub;
import com.rabbitmq.client.*;
import com.wdp.rabbitmq.utils.RabbitConstant;
import com.wdp.rabbitmq.utils.RabbitUtils;
import java.io.IOException;
public class BiaDu {
public static void main(String[] args) throws IOException {
//获取TCP长连接
Connection connection = RabbitUtils.getConnection();
//获取虚拟连接
final Channel channel = connection.createChannel();
//声明队列信息
channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
//queueBind用于交换机和队列的绑定
//参数1: 队列名 参数2:交互机名 参数3:路由key(暂时用不到)
channel.queueBind(RabbitConstant.QUEUE_BAIDU,RabbitConstant.EXCHANGE_WEATHER,"");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_BAIDU,false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("百度天气收到气象信息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
2.4 Routing 路由模式
P:生产者
X:交换机
C1:消费者
C2:消费者
红框:队列
Routing 路由模式分为3个步骤
1、生产者发送消息到交换机的时候会绑定一个Routing key
2、消费者去队列取消息的时候,消费者建立队列和交换机和Routing key的关系
3、交换机会根据队列的Routing key进行分发消息
上图的图解:
生产者发送error、info、warning为Routing key的消息到交换机,交换机将Routing key为error的消息发送给c1消费者所在的队列,因为c1消费者指定队列消息的Routing key为error,交换机将Routing key为error、info、warning发送给c2消费者所在的队列,因为c2消费者指定队列消息的Routing key为error、info、warning。
应用场景:还是用天气举例,比如北京市和上海市,两个不同的城市天气也不一样,就需要交换机发送不同的天气信息给不同的城市
在com.wdp.rabbitmq.routing包下创建生产者WeatherBureau
package com.wdp.rabbitmq.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wdp.rabbitmq.utils.RabbitConstant;
import com.wdp.rabbitmq.utils.RabbitUtils;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
/**
*
* 发布者
*/
public class WeatherBureau {
public static void main(String[] args) throws Exception {
Map area = new LinkedHashMap<String, String>();
area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据");
area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据");
area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201127天气数据");
area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据");
area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据");
area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据");
area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据");
area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据");
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
Iterator<Map.Entry<String,String>> itr = area.entrySet().iterator();
while (itr.hasNext()){
Map.Entry<String, String> me = itr.next();
//第一个参数交换机名字 第二个参数作为消息的routing key
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING,me.getKey(),null,me.getValue().getBytes());
}
channel.close();
connection.close();
}
}
在com.wdp.rabbitmq.routing包下创建Sina消费者
package com.wdp.rabbitmq.routing;
import com.rabbitmq.client.*;
import com.wdp.rabbitmq.utils.RabbitConstant;
import com.wdp.rabbitmq.utils.RabbitUtils;
import java.io.IOException;
public class Sina {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);
//queueBind用于将队列与交换机绑定
//参数1:队列名 参数2:交互机名 参数3:路由key
channel.queueBind(RabbitConstant.QUEUE_SINA,RabbitConstant.EXCHANGE_WEATHER_ROUTING,"china.hunan.zhuzhou.20201127");
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20201127");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("新浪天气收到气象信息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
在com.wdp.rabbitmq.routing包下创建BaiDu消费者
package com.wdp.rabbitmq.routing;
import com.rabbitmq.client.*;
import com.wdp.rabbitmq.utils.RabbitConstant;
import com.wdp.rabbitmq.utils.RabbitUtils;
import java.io.IOException;
public class BaiDu {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
//queueBind用于将队列与交换机绑定
//参数1:队列名 参数2:交互机名 参数3:路由key
channel.queueBind(RabbitConstant.QUEUE_BAIDU,RabbitConstant.EXCHANGE_WEATHER_ROUTING,"china.hunan.changsha.20201127");
channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20201127");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("百度天气收到气象信息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
2.5 Topics 通配符模式
Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型Exchange 可以让队列在绑定 Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc 或者 item.insert,item.* 只能匹配 item.insert
应用场景:还是天气的例子,比如说,一个城市下有很多个区县,区县的天气会不一样,这点类似模糊查询,我需要所有东边区县的天气,就需要通过通配符匹配
在com.wdp.rabbitmq.routing包下创建WeatherBureau
package com.wdp.rabbitmq.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wdp.rabbitmq.utils.RabbitConstant;
import com.wdp.rabbitmq.utils.RabbitUtils;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
/**
*
* 发布者
*/
public class WeatherBureau {
public static void main(String[] args) throws Exception {
Map area = new LinkedHashMap<String, String>();
area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据");
area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据");
area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201127天气数据");
area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据");
area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据");
area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据");
area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据");
area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据");
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
Iterator<Map.Entry<String,String>> itr = area.entrySet().iterator();
while (itr.hasNext()){
Map.Entry<String, String> me = itr.next();
//第一个参数交换机名字 第二个参数作为消息的routing key
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING,me.getKey(),null,me.getValue().getBytes());
}
channel.close();
connection.close();
}
}
在com.wdp.rabbitmq.routing包下创建Sina
package com.wdp.rabbitmq.routing;
import com.rabbitmq.client.*;
import com.wdp.rabbitmq.utils.RabbitConstant;
import com.wdp.rabbitmq.utils.RabbitUtils;
import java.io.IOException;
public class Sina {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);
//queueBind用于将队列与交换机绑定
//参数1:队列名 参数2:交互机名 参数3:路由key
channel.queueBind(RabbitConstant.QUEUE_SINA,RabbitConstant.EXCHANGE_WEATHER_ROUTING,"china.hunan.zhuzhou.20201127");
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20201127");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("新浪天气收到气象信息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
在com.wdp.rabbitmq.routing包下创建BaiDu
package com.wdp.rabbitmq.routing;
import com.rabbitmq.client.*;
import com.wdp.rabbitmq.utils.RabbitConstant;
import com.wdp.rabbitmq.utils.RabbitUtils;
import java.io.IOException;
public class BaiDu {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
//queueBind用于将队列与交换机绑定
//参数1:队列名 参数2:交互机名 参数3:路由key
channel.queueBind(RabbitConstant.QUEUE_BAIDU,RabbitConstant.EXCHANGE_WEATHER_ROUTING,"china.hunan.changsha.20201127");
channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20201127");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("百度天气收到气象信息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
以上是关于RabbitMQ入门与五种工作模式的主要内容,如果未能解决你的问题,请参考以下文章