RabbitMQ学习笔记
Posted 咬鱼的胖橘猫
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ学习笔记相关的知识,希望对你有一定的参考价值。
目录
前言
首先,感谢学相伴的平台以及飞哥的知识分享。
本文的笔记整理于【学相伴】RabbitMQ最新完整教程IDEA版通俗易懂。
本人才疏学浅,如果本文有错误之处,欢迎指正。
最后的碎碎念:
飞哥说的还是蛮不错的,认真听下去会很有收获的。我感觉飞哥的整体结构很清晰,有自己的理解成分在里面,就感觉看完之后就把知识点大纲理顺了,只不过讲述的知识点以及代码编写没有尚硅谷的那么详细。相应的,尚硅谷的就很乱,看了很蒙蔽。
个人推荐,跟B站尚硅谷发布的RabbitMQ 课件文档 1 食用更加,可以补充知识点。
一、MQ入门
为什么消息中间件采用的是http协议?
- http协议是比较复杂的,包含了cookie、数据的加密解密、状态码等,但是一个消息不需要这个复杂也没这个必要,主要追求的是高效、简洁、快捷;
- http协议是短链接,在实际的过程中,可能会出现消息的中断,不会持久化。而消息中间件需要的就是对出故障的消息进行持久化;
1.1 消息中间件的协议
1. AMQP协议
分布式事务;
消息的持久化;
高性能、高可靠的处理优势;
2. MQTT协议
物联网的重要组成部分。
低延迟、低带宽、不支持事务
3. OpenMessage协议
RocketMQ采用的协议。国内的阿里、雅虎等公司一起创作。
支持事务,持久化
4. Kafka协议
基于TCP/IP协议,采用二进制进行传输。
结构简单,不支持事务,支持持久化
1.2 消息分发机制
ActiveMQ | RabbitMQ | Kafka | RocketMQ | |
---|---|---|---|---|
发布订阅 | √ | √ | √ | √ |
轮询分发 | √ | √ | √ | |
公平分发 | √ | √ | ||
重发 | √ | √ | √ | |
消息拉取 | √ | √ | √ |
轮询分发、公平分发它们都是保证消息只能够读取一次。
轮询分发:每个消费者消费的消息总数量是一致的;
公平分发:能者多劳,消费者性能好,处理的请求就会比较多;必须手动应答,不支持自动应答
1.3 消息的高可用
集群模式1:Master-Slave:主从共享数据
生产者将消息发送到主节点,所有的都节点连接这个消息队列共享这块的数据区域。主节点写入,一旦主节点挂掉,从节点继续服务。
集群模式2:Master-Slave:主从同步数据
与Redis的主从同步差不多
集群模式3:多主集群同步部署模式
与2差不多,写入是可以任意节点进行写入。
集群模式4:多主集群转发部署模式
元数据共享,当查找数据的时候,就会判断消息的元数据是否存在,存在则返回,否则就去问其他的消费者。
集群模式5:Master-Slave与Broker-Cluster组合方案
集群模式的总结
- 消息共享
- 消息同步
- 元数据共享
1.4 消息的高可靠
-
消息的传输:协议保证
-
消息的存储:持久化
1.5 MQ的使用场景
流量消峰、应用解耦、异步处理
在说这个部分的时候,跟自己的业务结合一起去阐述三个场景。
二、RabbitMQ
2.1 RabbitMQ安装
Linux安装视频:https://www.bilibili.com/video/BV1dX4y1V73G?p=9
Windows安装文章:https://www.cnblogs.com/saryli/p/9729591.html
Docker安装视频:https://www.bilibili.com/video/BV1dX4y1V73G?p=10
2.2 RabbitMQ的核心概念
生产者、交换机、队列、消费者
2.3 RibbitMQ的核心组成部分
队列可以没有交换机吗?
不可以。没有指明交换机的时候,有一个默认的AMQP default交换机绑定队列,且默认的交换机是路由模式。
2.3 RabbitMQ的运行流程
三、简单入门
RabbitMQ的七种模式,其中前五种一定要掌握
2.1 简单模式
生产者:
public class Producer
public static void main(String[] args)
// 1. 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try
// 2. 创建链接
connection = connectionFactory.newConnection("生产者");
// 3. 通过链接获取通道
channel = connection.createChannel();
// 4. 通过通道,创建交换机、队列、绑定关系、路由key,发送消息以及接受消息
// 队列名,持久化,排他性,自动删除,携带额外的参数
// 将autoDelete设置为true ,当最后一个消费者消费完后并断开连接后 队列会自动进行删除
String queueName = "queue1";
channel.queueDeclare(queueName, false, false, false, null);
// 5. 准备消息内容
String message = "hello world";
// 6. 发送消息给队列
channel.basicPublish("", queueName, null, message.getBytes());
catch (IOException e)
e.printStackTrace();
catch (TimeoutException e)
e.printStackTrace();
finally
// 7. 关闭通道
if (channel!= null && channel.isOpen())
try
channel.close();
catch (Exception ex)
ex.printStackTrace();
// 8. 关闭链接
if (connection!= null && connection.isOpen())
try
connection.close();
catch (Exception ex)
ex.printStackTrace();
消费者:
public class Consumer
public static void main(String[] args)
// 1. 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try
// 2. 创建链接
connection = connectionFactory.newConnection("生产者");
// 3. 通过链接获取通道
channel = connection.createChannel();
// 4.通过通道,创建交换机、队列、绑定关系、路由key,发送消息以及接受消息
// 队列名,持久化,排他性,自动删除,携带额外的参数
channel.basicConsume("queue1", true, new DeliverCallback()
public void handle(String consumerTag, Delivery message) throws IOException
System.out.println("收到的消息是:" + new String(message.getBody(), "utf-8"));
, new CancelCallback()
public void handle(String consumerTag) throws IOException
System.out.println("消息接受失败");
);
System.out.println("消息阻断");
System.in.read();
catch (IOException e)
e.printStackTrace();
catch (TimeoutException e)
e.printStackTrace();
finally
// 7. 关闭通道
if (channel!= null && channel.isOpen())
try
channel.close();
catch (Exception ex)
ex.printStackTrace();
// 8. 关闭链接
if (connection!= null && connection.isOpen())
try
connection.close();
catch (Exception ex)
ex.printStackTrace();
/**
* 消息阻断
* 接收到的消息是:hello world
*/
- 是否持久化:在服务重启之后,队列是否会消失?
持久化肯定是存入磁盘。但是非持久化也会存入磁盘,但是重启之后,会消失;
- 是否自动删除?
自动删除,是队列中的最后一个消息被消费时。
如果设置了自动删除,那么就会未持久化的队列就会自动删除。
如果没有设置自动删除,那么未持久化的队列还会存在,直到重启。
2.2 四种exchage
1 . fanout类型
发布与订阅模式。
交换机为fanout类型,通过交换机,向他下面的队列都发送一样的消息。
指定路由key是没有意义的,仍然会所有的队列都会收到消息。
1) 通过图形化界面操作
创建faout的交换机
如下图所示:
添加新的队列
queue2、queue3,如下图所示:
交换机与队列之间的绑定
web图像化界面绑定:
在队列里面进行绑定:
在交换机里面进行绑定:
发布消息
因为我这里是发布订阅模式,所以没有路由key。
查看结果
queue2、queue3:
2) 核心代码
消费者1:
// 声明该通道的名称以及类型,是否持久化
String exchangeName = "faout-exchang";
String type = "fanout";
channel.exchangeDeclare(exchangeName, type,true );
// 声明队列
String queueName = "queue1";
channel.queueDeclare(queueName, false, false, false, null);
// 临时队列绑定交换机,其中 routingkey(也称之为 binding key)为空字符串
channel.queueBind(queueName, exchangeName, "");
// 发送消息给队列
channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
消费者2:
// 声明该通道的名称以及类型
String exchangeName = "faout-exchang";
String type = "fanout";
channel.exchangeDeclare(exchangeName, type);
// 声明队列
String queueName = "queue2";
channel.queueDeclare(queueName, false, false, false, null);
// 临时队列绑定交换机,其中 routingkey(也称之为 binding key)为空字符串
channel.queueBind(queueName, exchangeName, "");
// 发送消息给队列
channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
生产者:
// 声明该通道的名称以及类型
String exchangeName = "faout-exchang";
// 声明该通道的名称以及类型
channel.exchangeDeclare(exchangeName, "fanout");
// 声明该通道的交换机的类型
channel.basicPublish("faout-exchang", "", null, "hello world".getBytes());
2. direct类型
路由模式。
空的交换机有默认交换机,direct模式。
direct类型的交换机通过队列设置不同的Routing key ,来接受不同的消息。
交换机在发消息的时候,通过指定的不同的 Routing key ,来转发到指定的 Routing key的队列。
核心代码
消费者1:
// 声明该通道的名称以及类型
String exchangeName = "direct-exchange";
String type = "direct";
channel.exchangeDeclare(exchangeName, type);
// 声明队列
String queueName = "queue1";
channel.queueDeclare(queueName, false, false, false, null);
// 临时队列绑定交换机,其中 routingkey(也称之为 binding key)
String routingkey = "error";
channel.queueBind(queueName, exchangeName, routingkey);
// 发送消息给队列
channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
消费者2:
// 声明该通道的名称以及类型
String exchangeName = "direct-exchange";
String type = "direct";
channel.exchangeDeclare(exchangeName, type);
// 声明队列
String queueName = "queue2";
channel.queueDeclare(queueName, false, false, false, null);
// 临时队列绑定交换机,其中 routingkey(也称之为 binding key)
String routingkey1 = "warning";
String routingkey2 = "info";
channel.queueBind(queueName, exchangeName, routingkey1);
channel.queueBind(queueName, exchangeName, routingkey2);
// 发送消息给队列
channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
生产者:
// 声明该通道的名称以及类型
String exchangeName = "direct-exchange";
String type = "direct";
String routingkey = "warning";// "info","error"
// 声明该通道的名称以及类型
channel.exchangeDeclare(exchangeName, type);
// 声明该通道的交换机的类型
channel.basicPublish(exchangeName, routingkey, null, "hello world".getBytes());
3. topic类型
主题模式
设置交换机为topic类型,通过Routing key模式匹配来分发队列里面的消息。
* : 代表着必须有1级别
# : 代表着0个或者多个级别
Q1:
*.orange.* 代表:前面有1级,orange,后面有1级,
Q2:
*.*.rabbit代表:前面有1级,中间有1级,rabbit
lazy.# 代表:lazy后面可以有0个或者多个级别都是匹配的
测试:
com.lazy.orange : 没人收到消息
lazy.orange:Q2 收到消息,消息是匹配于lazy.#
lazy.orange.rabbit.com:Q2收到消息,消息是匹配于lazy.#
lazy.orange.rabbit:Q1 以及 Q2 均收到消息
核心代码
消费者1
// 声明该通道的名称以及类型
String exchangeName = "headers-exchange";
String type = "headers";
channel.exchangeDeclare(exchangeName, type);
// 声明队列
String queueName = "queue2";
channel.queueDeclare(queueName, false, false, false, null);
// 临时队列绑定交换机,其中 routingkey(也称之为 binding key)
String routingkey = "*.orange.*";
channel.queueBind(queueName, exchangeName, routingkey);
// 发送消息给队列
channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
消费者2
// 声明该通道的名称以及类型
String exchangeName = "topic-exchange";
String type = "topic";
channel.exchangeDeclare(exchangeName, type);
// 声明队列
String queueName = "queue2";
channel.queueDeclare(queueName, false, false, false, null);
// 临时队列绑定交换机,其中 routingkey(也称之为 binding key)
String routingkey1 = "*.*.rabbit";
String routingkey2 = "lazy.#";
channel.queueBind(queueName, exchangeName, routingkey1);
channel.queueBind(queueName, exchangeName, routingkey2);
// 发送消息给队列
channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
生产者:
// 声明该通道的名称以及类型
String exchangeName = "topic-exchange";
String type = "topic";
String routingkey = "com.lazy.orange";// "lazy.orange"....
// 声明该通道的名称以及类型
channel.exchangeDeclare(exchangeName, type);
// 声明该通道的交换机的类型
channel.basicPublish(exchangeName, routingkey, null, "hello world".getBytes());
4. Headers类型
交换机为Headers模式,队列设置不同的参数。
在发送消息的时候,通过不同的参数来进行消息转达到不同的队列。
To | Routing key | Arguments |
---|---|---|
queue1 | x:1 y:1 | |
queue2 | x:1 | |
queue3 | x:2 y:1 |
x=1,Q2收到了
x=1,y=1,Q1、Q2都收到了
x=2,都没有队列匹配
核心代码
消费者
// 声明该通道的名称以及类型
String exchangeName = "topic-exchange";
String type = "topic";
channel.exchangeDeclare(exchangeName, type);
// 声明队列
String queueName = "queue1";
channel.queueDeclare(queueName, false, false, false, null);
// 临时队列绑定交换机,其中 routingkey(也称之为 binding key)
String routingkey = "";
channel.queueBind(queueName, exchangeName, "");
// queue1的参数组
Map<String, Object> header = new HashMap<String, Object>();
header.put("x-match", "all"); //x-match: all表所有key-value全部匹配才匹配成功 ,any表只需要匹配任意一个key-value 即匹配成功。
header.put("name", "张三");
header.put("idcard","123321");
// 消息消费
Consumer consumer = new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQ以上是关于RabbitMQ学习笔记的主要内容,如果未能解决你的问题,请参考以下文章