RabbitMQ学习笔记

Posted 咬鱼的胖橘猫

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ学习笔记相关的知识,希望对你有一定的参考价值。

目录

前言

首先,感谢学相伴的平台以及飞哥的知识分享。

本文的笔记整理于【学相伴】RabbitMQ最新完整教程IDEA版通俗易懂

本人才疏学浅,如果本文有错误之处,欢迎指正。

最后的碎碎念:

飞哥说的还是蛮不错的,认真听下去会很有收获的。我感觉飞哥的整体结构很清晰,有自己的理解成分在里面,就感觉看完之后就把知识点大纲理顺了,只不过讲述的知识点以及代码编写没有尚硅谷的那么详细。相应的,尚硅谷的就很乱,看了很蒙蔽。
个人推荐,跟B站尚硅谷发布的RabbitMQ 课件文档 1 食用更加,可以补充知识点。

一、MQ入门

为什么消息中间件采用的是http协议?

  1. http协议是比较复杂的,包含了cookie、数据的加密解密、状态码等,但是一个消息不需要这个复杂也没这个必要,主要追求的是高效、简洁、快捷;
  2. http协议是短链接,在实际的过程中,可能会出现消息的中断,不会持久化。而消息中间件需要的就是对出故障的消息进行持久化;

1.1 消息中间件的协议

1. AMQP协议

分布式事务;

消息的持久化;

高性能、高可靠的处理优势;

2. MQTT协议

物联网的重要组成部分。

低延迟、低带宽、不支持事务

3. OpenMessage协议

RocketMQ采用的协议。国内的阿里、雅虎等公司一起创作。

支持事务,持久化

4. Kafka协议

基于TCP/IP协议,采用二进制进行传输。

结构简单,不支持事务,支持持久化

1.2 消息分发机制

ActiveMQRabbitMQKafkaRocketMQ
发布订阅
轮询分发
公平分发
重发
消息拉取

轮询分发、公平分发它们都是保证消息只能够读取一次。

轮询分发:每个消费者消费的消息总数量是一致的;

公平分发:能者多劳,消费者性能好,处理的请求就会比较多;必须手动应答,不支持自动应答

1.3 消息的高可用

集群模式1:Master-Slave:主从共享数据

生产者将消息发送到主节点,所有的都节点连接这个消息队列共享这块的数据区域。主节点写入,一旦主节点挂掉,从节点继续服务。

集群模式2:Master-Slave:主从同步数据

与Redis的主从同步差不多

集群模式3:多主集群同步部署模式

与2差不多,写入是可以任意节点进行写入。

集群模式4:多主集群转发部署模式

元数据共享,当查找数据的时候,就会判断消息的元数据是否存在,存在则返回,否则就去问其他的消费者。

集群模式5:Master-Slave与Broker-Cluster组合方案

集群模式的总结

  • 消息共享
  • 消息同步
  • 元数据共享

1.4 消息的高可靠

  • 消息的传输:协议保证

  • 消息的存储:持久化

1.5 MQ的使用场景

流量消峰、应用解耦、异步处理

在说这个部分的时候,跟自己的业务结合一起去阐述三个场景。

二、RabbitMQ

2.1 RabbitMQ安装

Linux安装视频:https://www.bilibili.com/video/BV1dX4y1V73G?p=9

Windows安装文章:https://www.cnblogs.com/saryli/p/9729591.html

Docker安装视频:https://www.bilibili.com/video/BV1dX4y1V73G?p=10

2.2 RabbitMQ的核心概念

生产者、交换机、队列、消费者

2.3 RibbitMQ的核心组成部分

队列可以没有交换机吗?

不可以。没有指明交换机的时候,有一个默认的AMQP default交换机绑定队列,且默认的交换机是路由模式。

2.3 RabbitMQ的运行流程

三、简单入门

RabbitMQ的七种模式,其中前五种一定要掌握

2.1 简单模式

生产者:

public class Producer 

    public static void main(String[] args) 
        // 1. 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try 
            // 2. 创建链接
            connection = connectionFactory.newConnection("生产者");
            // 3. 通过链接获取通道
            channel = connection.createChannel();
            // 4. 通过通道,创建交换机、队列、绑定关系、路由key,发送消息以及接受消息
            // 队列名,持久化,排他性,自动删除,携带额外的参数
            // 将autoDelete设置为true ,当最后一个消费者消费完后并断开连接后  队列会自动进行删除
            String queueName = "queue1";
            channel.queueDeclare(queueName, false, false, false, null);
            // 5. 准备消息内容
            String message = "hello world";
            // 6. 发送消息给队列
            channel.basicPublish("", queueName, null, message.getBytes());
         catch (IOException e) 
            e.printStackTrace();
         catch (TimeoutException e) 
            e.printStackTrace();
        finally 
            // 7. 关闭通道
            if (channel!= null && channel.isOpen())
                try 
                    channel.close();
                catch (Exception ex)
                    ex.printStackTrace();
                
            
            // 8. 关闭链接
            if (connection!= null && connection.isOpen())
                try 
                    connection.close();
                catch (Exception ex)
                    ex.printStackTrace();
                
            
        
    


消费者:

public class Consumer 

    public static void main(String[] args) 
        // 1. 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try 
            // 2. 创建链接
            connection = connectionFactory.newConnection("生产者");
            // 3. 通过链接获取通道
            channel = connection.createChannel();
            // 4.通过通道,创建交换机、队列、绑定关系、路由key,发送消息以及接受消息
            // 队列名,持久化,排他性,自动删除,携带额外的参数
            channel.basicConsume("queue1", true, new DeliverCallback() 
                        public void handle(String consumerTag, Delivery message) throws IOException 
                            System.out.println("收到的消息是:" + new String(message.getBody(), "utf-8"));
                        
                    , new CancelCallback() 
                        public void handle(String consumerTag) throws IOException 
                            System.out.println("消息接受失败");
                        
                    
            );

            System.out.println("消息阻断");
            System.in.read();
         catch (IOException e) 
            e.printStackTrace();
         catch (TimeoutException e) 
            e.printStackTrace();
        finally 
            // 7. 关闭通道
            if (channel!= null && channel.isOpen())
                try 
                    channel.close();
                catch (Exception ex)
                    ex.printStackTrace();
                
            
            // 8. 关闭链接
            if (connection!= null && connection.isOpen())
                try 
                    connection.close();
                catch (Exception ex)
                    ex.printStackTrace();
                
            
        
    


/**
 * 消息阻断
 * 接收到的消息是:hello world
 */
  • 是否持久化:在服务重启之后,队列是否会消失?

持久化肯定是存入磁盘。但是非持久化也会存入磁盘,但是重启之后,会消失;

  • 是否自动删除?

自动删除,是队列中的最后一个消息被消费时。

如果设置了自动删除,那么就会未持久化的队列就会自动删除。

如果没有设置自动删除,那么未持久化的队列还会存在,直到重启。

2.2 四种exchage

1 . fanout类型

发布与订阅模式。

交换机为fanout类型,通过交换机,向他下面的队列都发送一样的消息。

指定路由key是没有意义的,仍然会所有的队列都会收到消息。

1) 通过图形化界面操作

创建faout的交换机

如下图所示:

添加新的队列

queue2、queue3,如下图所示:

交换机与队列之间的绑定

web图像化界面绑定:

在队列里面进行绑定:

在交换机里面进行绑定:

发布消息

因为我这里是发布订阅模式,所以没有路由key。

查看结果


queue2、queue3:

2) 核心代码

消费者1:

// 声明该通道的名称以及类型,是否持久化
String exchangeName = "faout-exchang";
String type = "fanout";
channel.exchangeDeclare(exchangeName, type,true );
// 声明队列
String queueName = "queue1";
channel.queueDeclare(queueName, false, false, false, null);
// 临时队列绑定交换机,其中 routingkey(也称之为 binding key)为空字符串
channel.queueBind(queueName, exchangeName, "");
// 发送消息给队列
channel.basicConsume(queueName,true,deliverCallback,cancelCallback);

消费者2:

// 声明该通道的名称以及类型
String exchangeName = "faout-exchang";
String type = "fanout";
channel.exchangeDeclare(exchangeName, type);
// 声明队列
String queueName = "queue2";
channel.queueDeclare(queueName, false, false, false, null);
// 临时队列绑定交换机,其中 routingkey(也称之为 binding key)为空字符串
channel.queueBind(queueName, exchangeName, "");
// 发送消息给队列
channel.basicConsume(queueName,true,deliverCallback,cancelCallback);

生产者:

// 声明该通道的名称以及类型
String exchangeName = "faout-exchang";
// 声明该通道的名称以及类型
channel.exchangeDeclare(exchangeName, "fanout");
// 声明该通道的交换机的类型
channel.basicPublish("faout-exchang", "", null, "hello world".getBytes());

2. direct类型

路由模式。

空的交换机有默认交换机,direct模式。

direct类型的交换机通过队列设置不同的Routing key ,来接受不同的消息。

交换机在发消息的时候,通过指定的不同的 Routing key ,来转发到指定的 Routing key的队列。

核心代码

消费者1:

// 声明该通道的名称以及类型
String exchangeName = "direct-exchange";
String type = "direct";
channel.exchangeDeclare(exchangeName, type);
// 声明队列
String queueName = "queue1";
channel.queueDeclare(queueName, false, false, false, null);
// 临时队列绑定交换机,其中 routingkey(也称之为 binding key)
String routingkey = "error";
channel.queueBind(queueName, exchangeName, routingkey);
// 发送消息给队列
channel.basicConsume(queueName,true,deliverCallback,cancelCallback);

消费者2:

// 声明该通道的名称以及类型
String exchangeName = "direct-exchange";
String type = "direct";
channel.exchangeDeclare(exchangeName, type);
// 声明队列
String queueName = "queue2";
channel.queueDeclare(queueName, false, false, false, null);
// 临时队列绑定交换机,其中 routingkey(也称之为 binding key)
String routingkey1 = "warning";
String routingkey2 = "info";
channel.queueBind(queueName, exchangeName, routingkey1);
channel.queueBind(queueName, exchangeName, routingkey2);
// 发送消息给队列
channel.basicConsume(queueName,true,deliverCallback,cancelCallback);

生产者:

// 声明该通道的名称以及类型
String exchangeName = "direct-exchange";
String type = "direct";
String routingkey = "warning";// "info","error"
// 声明该通道的名称以及类型
channel.exchangeDeclare(exchangeName, type);
// 声明该通道的交换机的类型
channel.basicPublish(exchangeName, routingkey, null, "hello world".getBytes());

3. topic类型

主题模式

设置交换机为topic类型,通过Routing key模式匹配来分发队列里面的消息。

* : 代表着必须有1级别

# : 代表着0个或者多个级别

Q1:

*.orange.* 代表:前面有1级,orange,后面有1级,

Q2:

*.*.rabbit代表:前面有1级,中间有1级,rabbit

lazy.# 代表:lazy后面可以有0个或者多个级别都是匹配的

测试:

com.lazy.orange : 没人收到消息

lazy.orange:Q2 收到消息,消息是匹配于lazy.#

lazy.orange.rabbit.com:Q2收到消息,消息是匹配于lazy.#

lazy.orange.rabbit:Q1 以及 Q2 均收到消息

核心代码

消费者1

// 声明该通道的名称以及类型
String exchangeName = "headers-exchange";
String type = "headers";
channel.exchangeDeclare(exchangeName, type);
// 声明队列
String queueName = "queue2";
channel.queueDeclare(queueName, false, false, false, null);
// 临时队列绑定交换机,其中 routingkey(也称之为 binding key)
String routingkey = "*.orange.*";
channel.queueBind(queueName, exchangeName, routingkey);
// 发送消息给队列
channel.basicConsume(queueName,true,deliverCallback,cancelCallback);

消费者2

// 声明该通道的名称以及类型
String exchangeName = "topic-exchange";
String type = "topic";
channel.exchangeDeclare(exchangeName, type);
// 声明队列
String queueName = "queue2";
channel.queueDeclare(queueName, false, false, false, null);
// 临时队列绑定交换机,其中 routingkey(也称之为 binding key)
String routingkey1 = "*.*.rabbit";
String routingkey2 = "lazy.#";
channel.queueBind(queueName, exchangeName, routingkey1);
channel.queueBind(queueName, exchangeName, routingkey2);
// 发送消息给队列
channel.basicConsume(queueName,true,deliverCallback,cancelCallback);

生产者:

// 声明该通道的名称以及类型
String exchangeName = "topic-exchange";
String type = "topic";
String routingkey = "com.lazy.orange";// "lazy.orange"....
// 声明该通道的名称以及类型
channel.exchangeDeclare(exchangeName, type);
// 声明该通道的交换机的类型
channel.basicPublish(exchangeName, routingkey, null, "hello world".getBytes());

4. Headers类型

交换机为Headers模式,队列设置不同的参数。

在发送消息的时候,通过不同的参数来进行消息转达到不同的队列。

ToRouting keyArguments
queue1x:1 y:1
queue2x:1
queue3x:2 y:1

x=1,Q2收到了

x=1,y=1,Q1、Q2都收到了

x=2,都没有队列匹配

核心代码

消费者

// 声明该通道的名称以及类型
String exchangeName = "topic-exchange";
String type = "topic";
channel.exchangeDeclare(exchangeName, type);
// 声明队列
String queueName = "queue1";
channel.queueDeclare(queueName, false, false, false, null);
// 临时队列绑定交换机,其中 routingkey(也称之为 binding key)
String routingkey = "";
channel.queueBind(queueName, exchangeName, "");
// queue1的参数组
Map<String, Object> header = new HashMap<String, Object>();
header.put("x-match", "all");  //x-match: all表所有key-value全部匹配才匹配成功 ,any表只需要匹配任意一个key-value 即匹配成功。
header.put("name", "张三");
header.put("idcard","123321");
// 消息消费
Consumer consumer = new DefaultConsumer(channel)
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQ

以上是关于RabbitMQ学习笔记的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ学习(中)——交换机死信队列和延迟队列

RabbitMQ学习09--死信队列(TTL过期)

RabbitMQ学习--死信队列

RabbitMQ学习--死信队列

RabbitMQ学习教程二(交换机,死信队列)

RabbitMQ一文带你搞定RabbitMQ延迟队列