RabbitMQ学习笔记
Posted Shinka_YXS
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ学习笔记相关的知识,希望对你有一定的参考价值。
视频教程【编程不良人】MQ消息中间件之RabbitMQ以及整合SpringBoot2.x实战教程
1.MQ引言
1.1什么是MQ
MQ(Message Quene):消息队列,也叫消息中间件。
通过典型的生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。
消息的生产和消费都是异步的,只关心消息的发送和接收,没有业务逻辑的侵入,实现系统间解耦。
通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
1.2不同MQ特点
-
ActiveMQ
Apache出品,最流行的,能力强劲的开源消息总线。是一个完全支持JMS规范的的消息中间件。丰富的API,多种集群架构模式。
性能受人诟病,吞吐量不高,适合中小型企业。 -
Kafka
LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。
主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,最初目的就是用于日志收集和传输。
0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。 -
RocketMQ
阿里开源的消息中间件,它是纯Java开发,高吞吐量、高可用性、适合大规模分布式系统应用。
它的思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。 -
RabbitMQ
使用Erlang语言开发的开源消息队列系统,基于AMOP协议来实现。(erlang做socket编程很不错)
AMOP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
AMOP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
RabbitMQ比Kafka可靠,Kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用,比如ELK日志收集。
2.RabbitMQ引言
基于AMQP协议,erlang语言开发,是部署最广泛的开源消息中间件,是最受欢迎的开源消息中间件之一。
RabbitMQ可以和Spring框架无缝整合。
AMQP协议
AMQP(advanced message queuing protocol)在2003年时被提出,最早用于解决金融领域不同平台之间的消息传递交互问题。
AMQP是一种协议,更准确的说是一种binary wirelevel protoco1(链接协议),这是其和JMS的本质差别。
AMQP不从API层进行限定,而是直接定义网络交换的数据格式。这使得实现了AOP的provider天然性就是跨平台的。
3.RabbitMQ配置
3.1RabbitMQ管理命令行
# 服务启动相关
systemctl start|restart|stop|status rabbitmq-server
# 管理命令行用来 在不使用web管理界面情况下 命令操作RabbitMQ
rabbitmqctl help # 可以查看更多命令
# 插件管理命令行
rabbitmq-plugins enable|list|disable
3.2 web管理界面介绍
3.2.1 web管理界面介绍
【Exchanges 交换机】AMQP里面比较有名的 交换机 也叫路由。在RabbitMQ刚安装成功之后 默认内置了7种路由
AMQP默认协议为direct(直联)
Features的“D”代表“durable(持久)”,意指日后直联的消息是存在磁盘中的,即不会随着RabbitMQ的重启或错误而丢失数据。
后面6个不同交换机的名字对应6个不同特性的交换机:direct直联、fanout广播、headers以头的形式、match以匹配的形式、topic订阅、trace追踪等模式、
4.RabbitMQ的第一个程序
4.1AMQP协议回顾
生产者去与RabbitMQ的Server建立连接,之后会在连接里以通道的形式传递消息
每一个生产者会对应一个专门的虚拟主机Virtual Host,可以把虚拟主机想象成关系型数据库中“库”的概念、
若想访问到一个具体的虚拟主机,需要将虚拟主机与用户进行绑定,所以先创建用户 再创建虚拟主机 然后将虚拟主机与用户绑定、
消息会被放到交换机中还是直接到Queue中,取决于使用哪种消息模型、
消费者与生产者是完全解耦的,消费者不需要关心生产者有没有运行,只需要关心其监听的队列有没有消息即可、
4.2RabbitMQ支持的消息模型
https://www.rabbitmq.com/getstarted.html
1 "Hello World!"点对点、生产者直接发消息给队列 不经过任何交换机
2 Work queues广播、
3 Publish/Subscribe发布和订阅、
4 Routing路由、
5 Topics基于动态路由去做订阅、
6 RPC、
7 Publisher Confirms发布确认模式、
4.3Java实现
工具类
public class MyRabbitMQUtil
private static ConnectionFactory connectionFactory;
static
// 重量级资源 类加载时执行 且仅执行一次
// 创建连接mq的连接工厂对象
connectionFactory = new ConnectionFactory();
// 设置连接主机、端口号、虚拟主机、访问虚拟主机的用户名密码
connectionFactory.setHost("ip");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/test-vh");
connectionFactory.setUsername("testUser");
connectionFactory.setPassword("testUser");
/**
* 提供连接对象
*
* @return
*/
public static Connection getConnection()
try
return connectionFactory.newConnection();
catch (Exception e)
e.printStackTrace();
return null;
/**
* 关闭通道和连接
* @param channel
* @param connection
*/
public static void closeCollectionAndChannel(Channel channel, Connection connection)
try
if (channel != null)
channel.close();
if (connection != null)
connection.close();
catch (Exception e)
e.printStackTrace();
模型一(直联)
生产者用默认的交换机把消息发送到和routingKey名称相同的队列中,由消费者去队列中获取消息进行消费
P:生产者,也就是要发送消息的程序
C:消费者,消息的接受者,会一直等待消息到来。
queue︰消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
生产者
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
// 生产消息
@Test
public void sendMessageTest() throws IOException, TimeoutException
// 获取连接对象
Connection connection = MyRabbitMQUtil.getConnection();
// 创建连接中的通道
Channel channel = connection.createChannel();
// 通道绑定对应消息队列
// 参数1:queue 队列名称,若不存在则自动创建
// 参数2:durable 队列特性是否持久化
// 参数3:exclusive 是否独占队列
// 参数4:autoDelete 是否在消费完成后自动删除队列。设置为true时队列中没有消息且没有消费者在监听 该队列才会被删除
// 参数5:arguments 额外附加参数
channel.queueDeclare("hello", false, false, false, null);
// 发布消息
// 参数1:exchange 交换机名称
// 参数2:routingKey 路由名称。此处用默认的交换机把消息发送到和routingKey名称相同的队列中
// 参数3:props 传递消息额外设置,若需要消息持久化,传入参数MessageProperties.PERSISTENT_TEXT_PLAIN
// 参数4:body 消息的具体内容
channel.basicPublish("", "hello", null, "hello rabbitMQ~".getBytes());
channel.close();
connection.close();
消费者
// 消费消息
public static void main(String[] args) throws IOException, TimeoutException
// 获取连接对象
Connection connection = MyRabbitMQUtil.getConnection();
// 创建连接中的通道
Channel channel = connection.createChannel();
// 通道绑定对应消息队列
channel.queueDeclare("hello", false, false, false, null);
// 消费消息
// 参数1:队列名称 消费哪个队列的消息
// 参数2:开启消息的自动确认机制
// 参数3:消费时的回调接口,此处用匿名内部类实现
channel.basicConsume("hello", true, new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
System.out.println("body内容:"+ new String(body));
);
// 希望customer一直监听,可把以下两行注释掉
// channel.close();
// connection.close();
API参数细节
// 通道绑定对应消息队列
// 参数1:queue 声明通道对应的队列
// 参数2:durable 是否持久化队列
// 参数3:exclusive 是否独占队列
// 参数4:autoDelete 是否在消费完成后自动删除队列
// 参数5:arguments 对队列的额外配置
channel.queueDeclare("hello", false, false, false, null);
模型二(work queue)
Work queues(工作模型)也被称为Task queues(任务模型)。
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息会堆积越来越多,无法及时处理。
此时可使用work模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务不会被重复执行。
角色:
- P:生产者,任务的发布者
- C1:消费者1,领取任务并且完成任务,假设完成速度较慢
- C2:消费者2,领取任务并完成任务,假设完成速度快
生产者
// 通过通道声明队列
channel.queueDeclare("work", true, false, false, null);
for (int i = 0; i < 10; i++)
channel.basicPublish("", "work", null, (i+"-work").getBytes());
消费者1
// 通道绑定对应消息队列
channel.queueDeclare("work", false, false, false, null);
// 消费消息 参数1:队列名称;参数2:消息自动确认,true表示消费者自动向RabbitMQ确认消费
channel.basicConsume("work", true, new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
System.out.println("消费者1:" + new String(body));
);
消费者2
与消费者1代码相同,第8行打印“消费者2”。
测试结果
总结:默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。
消息自动确认机制
Message acknowledgment(消息确认)-rabbitMQ官网介绍
Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the consumer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We’ll also lose all the messages that were dispatched to this particular worker but were not yet handled.
But we don’t want to lose any tasks. If a worker dies, we’d like the task to be delivered to another worker.
在当前代码中,开启了自动确认,那么一旦RabbitMQ将消息传递给消费者,它就会立即将其标记为删除。
在这种情况下,如果一个消费者只处理了部分消息就宕机,我们将丢失它正在处理的消息 以及 发送给该特定工作进程但尚未处理的所有消息。但我们希望如果一个消费者宕机,把任务交给其他消费者。
需要修改:
1.关闭自动确认机制
2.告诉消息队列不能一次性地把消息都给当前消费者,让消费者一个通道只能消费一个消息。
消费者1
Connection connection = MyRabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// 每次只能消费一个消息
channel.basicQos(1);
channel.queueDeclare("work", false, false, false, null);
// 消费消息 参数1:队列名称;参数2:消息自动确认,true表示消费者自动向RabbitMQ确认消费
channel.basicConsume("work", false, new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
try
Thread.sleep(2000);
catch (Exception e)
e.printStackTrace();
System.out.println("消费者1:" + new String(body));
// 手动确认 参数1:手动确认参数标识,即确认队列中哪个具体的消息 参数2:false表示每次只确认一个
channel.basicAck(envelope.getDeliveryTag(), false);
);
消费者2
Connection connection = MyRabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// 每次只能消费一个消息
channel.basicQos(1);
channel.queueDeclare("work", false, false, false, null);
// 消费消息 参数1:队列名称;参数2:消息自动确认,true表示消费者自动向RabbitMQ确认消费
channel.basicConsume("work", false, new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
System.out.println("消费者2:" + new String(body));
// 手动确认 参数1:手动确认参数标识,即确认队列中哪个具体的消息 参数2:false表示每次只确认一个
channel.basicAck(envelope.getDeliveryTag(), false);
);
测试结果
模型三(Publish/Subscribe)
fanout 扇出 也就是广播
Publish/Subscribe 发布/订阅 模型,也是广播模型
消息发送流程
- 可有多个消费者
- 每个消费者有自己的queue(队列),queue可以是临时的(没有消息会自动删除)
- 每个队列都要绑定到exchange(交换机)
- 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
- 交换机把消息发送给绑定过的所有队列
- 队列的消费者都能拿到消息。实现一条消息被多个消费者消费。
生产者
// 获取连接和通道对象
Connection connection = MyRabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// 将通道声明指定交换机 参数1:交换机名称,不存在则创建;参数2:交换机类型,fanout是广播类型
channel.exchangeDeclare("testExchangeName", "fanout");
// 发送消息 在广播类型下routingKey是没有意义的,还是会广播
channel.basicPublish("testExchangeName", "", null, "这是一条消息".getBytes());
// 关闭资源
MyRabbitMQUtil.closeCollectionAndChannel(channel, connection);
运行后就创建了一个fanout类型的交换机testExchangeName
消费者1
Connection connection = MyRabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// 将通道绑定交换机
channel.exchangeDeclare("testExchangeName", "fanout");
// 临时队列
String queueName = channel.queueDeclare().getQueue();
// 绑定交换机和队列
channel.queueBind(queueName, "testExchangeName", "");
// 消费消息 参数1:队列名称;参数2:消息自动确认,true表示消费者自动向RabbitMQ确认消费
channel.basicConsume(queueName, true, new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
System.out.println("Customer1-body内容:" + new String(body));
);
消费者2 和 消费者3 与 消费者1 相同,打印的内容不同。
测试结果
先启动三个消费者,再启动生产者,最后三个消费者都消费了生产的发布的消息。
模型四(Routing)
1)Routing之订阅模型-Direct(直连)
在Fanout模式中,一条消息,会被所有订阅的队列都消费。
但在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
- 队列与交换机的绑定 不能是任意绑定,而是要指定一个
RoutingKey
(路由key) - 消息的发送方在向Exchange发送消息时,也必须指定消息的
RoutingKey
- Exchange不再把消息交给每一个绑定的队列,而是根据消息的
RoutingKey
进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致才会接收到消息
流程:
图解:
- P:生产者,向Exchange发送消息时,会指定一个RoutingKey
- X:Exchange (交换机),接收生产者的消息,然后把消息递交给与RoutingKey完全匹配的队列
- C1:消费者,其所在队列指定了需要RoutingKey为error的消息
- C2:消费者,其所在队列指定了需要RoutingKey为info、error、warning 的消息
生产者
// 获取连接和通道对象...
String exchangeName = "testExchangeName_direct";
// 将通道声明指定交换机 参数1:交换机名称,不存在则创建;参数2:交换机类型,direct是路由模式
channel.exchangeDeclare(exchangeName, "direct");
// 发送消息
String routingKey = "info";
channel.basicPublish(exchangeName, routingKey, null,
("这是direct模型基于routingKey["+routingKey+"]发布的消息").getBytes());
// 关闭资源...
消费者1
String exchangeName = "testExchangeName_direct";
// 将通道绑定交换机
channel.exchangeDeclare(exchangeName, "direct");
// 临时队列
String queueName = channel.queueDeclare().getQueue();
// 绑定交换机和队列
channel.queueBind(queueName, exchangeName, "error");
// 消费消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
System.out.println("Customer1-body内容:" + new String(body));
);
消费者2
String exchangeName = "testExchangeName_direct";
// 将通道绑定交换机
channel.exchangeDeclare(exchangeName, "direct");
// 创建一个临时队列
String queueName = channel.queueDeclare().getQueue();
// 临时队列和交换机绑定
channel.queueBind(queueName, exchangeName, "info");
channel.queueBind(queueName, exchangeName, "error");
channel.queueBind(queueName, exchangeName, "warning");
// 消费消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
System.out.println("Customer2-body内容:" + new String(body));
);
测试结果
先启动两个消费者,再启动生产者,消费者只消费其绑定RoutingKey的消息
当生产者发布消息的routingKey为info时:
当生产者发布消息的routingKey为error时:
2)Routing之订阅模型-Topic
Topic与Direct类型的Exchange都是可以根据RoutingKey把消息路由到不同的队列。
Topic类型Exchange可以让队列在绑定RoutingKey的时候使用通配符!这种模型Routingkey一般都是由一个或多个单词组成,多个单词之间以"."分割,例如:item.insert
- * (star) can substitute for exactly one word. 匹配1个词
- # (hash) can substitute for zero or more words. 匹配任意个词
生产者
// 获取连接和通道对象
String exchangeName = "testExchangeName_topic";
// 将通道声明指定交换机 参数1:交换机名称,不存在则创建;参数2:交换机类型,topic是路由模式
channel.exchangeDeclare(exchangeName, "topic");
// 发送消息
String routingKey = "user.save";
channel.basicPublish(exchangeName, routingKey, null,
("这是topic动态路由模型基于routingKey["+routingKey+"]发布的消息").getBytes());
// 关闭资源
消费者
String exchangeName = "testExchangeName_topic";
// 将通道绑定交换机
channel.exchangeDeclare(exchangeName, "topic");
// 临时队列
String q以上是关于RabbitMQ学习笔记的主要内容,如果未能解决你的问题,请参考以下文章