RabbitMQ支持的消息模型
Posted 李子怡
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ支持的消息模型相关的知识,希望对你有一定的参考价值。
目录
4.消费者 创建2个: // 区别是是否带下面这个线程休眠代码
1 RabbitMQ
基于AMQP协议,erlang语言开发, 是部署最广泛的开源消息中间件,是最受欢迎的开源消息中间件之一。
官方教程:https://www.rabbitmq.com/getstarted.html
AMQP协议
AMQP (advanced message queuing protocol,翻译是:高级消息队列协议)、 在2003年时被提出, 最早用于解决金融领不同平台之间的消息传递交互问题。顾名思义,AMQP是- 种协议,更准确的说是一种binary wirelevel protocol (链接协议)。 这是其和JMS的本质差别,AMQP不从API层进行限定, 而是直接定义网络交换的数据格式。这使得实现了AMQP的provider天然性就是跨平台的。以下是AMQP协议模型:
生产者把消息,发送给交换机。交换机和队列是一一绑定就是点对点,交换机路由到其他队列,可以做路由。
2 RabbitMQ支持的消息模型
2.1官网
https://www.rabbitmq.com/getstarted.html
2.2引入依赖
<!--引入rabbitmq的相关依赖-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
为什么写测试代码需要把下面这行删掉?因为这是作用范围,
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<!-- <scope>test</scope>-->
</dependency>
2.3第一种模型(直连)
在上图的模型中,有以下概念:
P:生产者,也就是要发送消息的程序
C:消费者:消息的接受者,会-直等待消息到来。
queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
1.开发生产者
public class Provider {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
// 创建到服务器的连接
ConnectionFactory factory = new ConnectionFactory();
// 连接rabbitmq主机
factory.setHost("192.168.231.141");
// 设置端口号
factory.setPort(5672);
// 设置连接那个虚拟主机
factory.setVirtualHost("/ems");
// 设置访问虚拟主机的用户名和密码
factory.setUsername("ems");
factory.setPassword("123456");
// 获取连接对象
try (Connection connection = factory.newConnection();
// 获取连接中的通道对象
Channel channel = connection.createChannel()) {
// 通道绑定对应消息队列
// 参数1.队列名 如果队列不存在,自动创建
// 2. durable 用来定义队列的特性是否要持久化 true 持久化
// 3. 是否独占队列 true独占
// 4. 是否在消费完成后,自动删除队列, true 自动删除
// 5. 额外参数
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
// 发布消息
// 1.交换机名称,因为没有交换机,所以为空
// 2.队列名
// 3.发布消息时的属性|传递消息的额外设置
// 4.发布消息的具体内容,要求是字节类型的数组,
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
}
这里没有明显的写关闭代码是因为官网说:
2.开发消费者
消费了消息
这里不需要关闭的代码是因为,希望他可以一直监听,等到有消息的时候,直接拿到消息:
2.4RabbitMQ中连接工具类封装
public abstract class RabbitMQUtils {
// 重量级资源,不希望使用一次创建一次。而是希望在类加载的时候就创建出来。
private static ConnectionFactory factory = new ConnectionFactory();
// 静态代码块,类加载的时候只执行一次
static {
// 连接rabbitmq主机
factory.setHost("192.168.231.141");
// 设置端口号
factory.setPort(5672);
// 设置连接那个虚拟主机
factory.setVirtualHost("/ems");
// 设置访问虚拟主机的用户名和密码
factory.setUsername("ems");
factory.setPassword("123456");
}
public static Connection getConnection() {
try {
// 获取连接对象
Connection connection = factory.newConnection();
return connection;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
2.5细节
队列绑定通道
// 通道绑定对应消息队列
// 参数1.队列名 如果队列不存在,自动创建
// 2. durable 用来定义队列的特性是否要持久化 true 持久化 持久化是指在磁盘中进行队列的保存,当重启rabbitmq服务时,队列以及队列中的消息,都会丢失
// 3. 是否独占队列 true独占
// 4. 是否在消费完成后,自动删除队列, true 自动删除
// 5. 额外参数
channel.queueDeclare("A", false, false, false, null);
但是,不是绑定了,就一定在这里发送消息。是由下面的代码决定的:
String message = "Hello World!";
// 发布消息
// 1.交换机名称,因为没有交换机,所以为空
// 2.队列名
// 3.发布消息时的属性|传递消息的额外设置
// 4.发布消息的具体内容,要求是字节类型的数组,
channel.basicPublish("", "B", null, message.getBytes());
虽然队列A和通道channel绑定了,但是实际是发送消息的是队列B。队列A和队列B如果在页面上没有,执行代码后会新建。
queueDeclare第二个参数:
下面出现D表示设置为true,现在是队列持久化。但是重启后,消息还是会丢失。
怎么既保证队列持久化,又保证消息持久化呢?
对消息做持久化。
basicPublish第三个参数:设置为:MessageProperties.PERSISTENT_TEXT_PLAIN
queueDeclare第三个参数:
一般是false,希望多个通道共用一个队列
queueDeclare第四个参数:
当设置为true时,会出现下面的标志。且关闭消费者与队列的连接(关闭消费者的程序),队列就会消失。
2.6第二种模型(work quene)
1.定义:
Work queues, 也被称为( Task queues), 任务模型。 当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
2.角色:
●P:生产者:任务的发布者
●C1:消费者,领取任务并且完成任务,假设完成速度较慢
●C2:消费者2:领取任务并完成任务,假设完成速度快
3.生产者:
public class Provider {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
// 获取连接对象
Connection connection = RabbitMQUtils.getConnection();
// 获取连接中的通道对象
Channel channel = connection.createChannel();
// 通过通道声明队列
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
// 发送10条消息
for (int i = 0; i < 20; i++) {
channel.basicPublish("", TASK_QUEUE_NAME, null, (i+"你好啊").getBytes());
}
}
}public class Provider {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
// 获取连接对象
Connection connection = RabbitMQUtils.getConnection();
// 获取连接中的通道对象
Channel channel = connection.createChannel();
// 通过通道声明队列
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
// 发送10条消息
for (int i = 0; i < 20; i++) {
channel.basicPublish("", TASK_QUEUE_NAME, null, (i+"你好啊").getBytes());
}
}
}
4.消费者 创建2个: // 区别是是否带下面这个线程休眠代码
public class Customer2 {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException {
// 获取连接对象
Connection connection = RabbitMQUtils.getConnection();
// 获取连接中的通道对象
Channel channel = connection.createChannel();
// 每次只能消费一个消息
channel.basicQos(1);
// 通道绑定消息队列
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
// 获取消息 参数2:消息自动确认,消费者自动向rabbitmq确认消息消费(只要消息队列有消息,就分配给消费者)
channel.basicConsume(TASK_QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者-1:" + new String(body));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 参数1:确认队列中那个具体消息 参数2:是否开启多个消息同时确认
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
5.总结:
默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。
6.缺点:
这样情况下,当其中一个消费比较慢,比如消费者3已经消费完。但是消费者1还是一个个的接收。这样会造成消息的积累。
我们希望处理快的可以多处理一点。怎么实现?
1.消费者:要关闭自动确认消息
// 获取消息 参数2:消息自动确认,消费者自动向rabbitmq确认消息消费(只要消息队列有消息,就分配给消费者,不管下面的代码是否执行完)
channel.basicConsume(TASK_QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者-1:" + new String(body));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
所以要把上面的参数设为false,这样
2.还需要告诉当前通道每一次只能消费一个消息。
// 每次只能消费一个消息|一次只接收一条未确认的消息
channel.basicQos(1);
3.消息确认:手动确认消息
出现下面情况,是因为少消息确认的代码
// 参数1:确认队列中那个具体消息 参数2:是否开启多个消息同时确认
channel.basicAck(envelope.getDeliveryTag(),false);
消息确认后,再从队列中删除
2.7.第三种模型(fanout| 广播)
fanout 扇出 也称为广播
适合于注册业务,如既要发积分,有要短信认证。?“??
比如购物车结算的时候,是订单系统,库存系统 ??
1.在广播模式下,消息发送流程是这样的:
●可以有多个消费者
●每个消费者有自己的queue (队列)
●每个队列都要绑定到Exchange (交换机)
●生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
●交换机把消息发送给绑定过的所有队列
●队列的消费者都能拿到消息。实现一 条消息被多个消费者消费
2.生产者
public class Provider {
public static void main(String[] args) throws IOException {
// 获取连接对象
Connection connection = RabbitMQUtils.getConnection();
// 获取连接中的通道对象
Channel channel = connection.createChannel();
// 将通道声明指定交换机 参数1:交换机名称 参数2:交换机类型,fanout是广播类型
channel.exchangeDeclare("logs","fanout");
// 发送消息
channel.basicPublish("logs","",null,"fanout type message".getBytes());
}
}
3.消费者
public class Customer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//通道绑定交换机
channel.exchangeDeclare("logs","fanout");
// 创建临时队列
String queue = channel.queueDeclare().getQueue();
// 绑定交换机和队列
channel.queueBind(queue,"logs","");
// 消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1"+new String(body));
}
});
}
}
public class Customer2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//通道绑定交换机
channel.exchangeDeclare("logs","fanout");
// 创建临时队列
String queue = channel.queueDeclare().getQueue();
// 绑定交换机和队列
channel.queueBind(queue,"logs","");
// 消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2"+new String(body));
}
});
}
}
广播模型:生产者发一条消息,消费者1和2会都拿到
2.8第四种模型(Routing) 路由
1. Routing之订阅模型-Direct(直连)
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
●队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey (路 由key)
●消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey。
●Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的Routing key完全一致,才会接收到消息
举个场景,出现error时,既需要在控制台打印,又需要在日志中持久化
图解:
●P:生产者,向Exchange发送消息, 发送消息时,会指定一 个routing key。
●x: Exchange (交换机),接收生产者的消息,然后把消息递交给与routing key完全匹配的队列
●C1:消费者,其所在队列指定了需要routing key为error的消息
●C2:消费者,其所在队列指定了需要routing key为info、 error、 warning的消息
2.生产者
public class Provider {
public static void main(String[] args) throws IOException {
// 创建connection
Connection connection = RabbitMQUtils.getConnection();
// 创建channel
Channel channel = connection.createChannel();
String exchangeName = "logs_direct";
// 通道声明指定交换机 参数1:交换机名称 参数2:交换机类型,direct是路由模式
channel.exchangeDeclare(exchangeName,"direct");
// 发送消息
channel.basicPublish(exchangeName,"error",null,("这是direct模型发布的基于route" +
" kye: ["+"info"+"]发送的消息").getBytes());
}
}
error时,消费者都会收到消息:
info时,消费者2会收到消息
3.消费者
public class Customer1 {
public static void main(String[] args) throws IOException {
// 1.建立连接
Connection connection = RabbitMQUtils.getConnection();
// 建立channel
Channel channel = connection.createChannel();
String exchangeName = "logs_direct";
// 绑定交换机和通道
channel.exchangeDeclare(exchangeName,"direct");
// 创建队列
String queue = channel.queueDeclare().getQueue();
// 基于路由key ,绑定队列和交换机
channel.queueBind(queue,exchangeName,"error");
// 消费
channel.basicConsume(queue,true,new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:"+ new String(body));
}
});
}
}
public class Customer2 {
public static void main(String[] args) throws IOException {
// 1.建立连接
Connection connection = RabbitMQUtils.getConnection();
// 建立channel
Channel channel = connection.createChannel();
String exchangeName = "logs_direct";
// 绑定交换机和通道
channel.exchangeDeclare(exchangeName,"direct");
// 创建队列
String queue = channel.queueDeclare().getQueue();
// 基于路由key ,绑定队列和交换机
channel.queueBind(queue,exchangeName,"info");
channel.queueBind(queue,exchangeName,"error");
channel.queueBind(queue,exchangeName,"warning");
// 消费
channel.basicConsume(queue,true,new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2:"+ new String(body));
}
});
}
}
2.9.第五种模型(Topics) 动态路由
因为第四种消费者这样不灵活:就是一个写一行代码
1.定义:
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符。这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以"."分割,例如: item . insert
# 通配符
* (star) can substitute for exactly one word. 匹配不多不少恰好1个词
# (hash) can substitute for zero or more words. 匹配0个,1个或多个词
#如:
audit.# 匹配audit . irs . corporate或者audit.irs 等
audit.* 只能匹配audit.irs
2.生产者 当前是user.save
public class Provider {
public static void main(String[] args) throws IOException {
// 建立连接工厂
Connection connection = RabbitMQUtils.getConnection();
// 建立channel
Channel channel = connection.createChannel();
String exchangeName = "topics";
// 交换机
channel.exchangeDeclare(exchangeName,"topic");
// 发消息
String routekey = "user.save";
channel.basicPublish(exchangeName,routekey,null,("这是topic动态路由模型发布的基于route" +
" key :["+routekey+"]").getBytes());
}
}
3.消费者
public class Customer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//通道绑定交换机
channel.exchangeDeclare("topics","topic");
// 创建临时队列
String queue = channel.queueDeclare().getQueue();
// 绑定交换机和队列
channel.queueBind(queue,"topics","user.*");
// 消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1"+new String(body));
}
});
}
}
public class Customer2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//通道绑定交换机
channel.exchangeDeclare("topics","topic");
// 创建临时队列
String queue = channel.queueDeclare().getQueue();
// 绑定交换机和队列
channel.queueBind(queue,"topics","user.#");
// 消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2"+new String(body));
}
});
}
}
以上是关于RabbitMQ支持的消息模型的主要内容,如果未能解决你的问题,请参考以下文章