RabbitMQ入门笔记

Posted 在编程的路上跌跌撞撞

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ入门笔记相关的知识,希望对你有一定的参考价值。

文章目录

第一章 安装RabbitMQ

RabbitMQ是一个消息中间件,遵循AMQP协议 (Advanced Message Queuing Protocol)。由Erlang语言开发,所以安装rabbitmq之前先安装Erlang环境。

Erlang与rabbitmq的版本必须匹配,版本匹配信息可以查看rabbit官网
官网 https://www.rabbitmq.com/which-erlang.html

1.1 安装Erlang

网址:https://www.erlang.org/downloads

右边是对应的版本信息,中间是选择下载的系统环境。

下载成功,点击安装,配置环境。
打开电脑的环境配置,变量值是erlang的安装路径。

修改Path的值,点击新建 复制 %ERLANG_HOME%\\bin
然后保存,这里的作用是为了在cmd窗口能够执行erlang的命令。

测试,win + r 打开cmd窗口,输入 erl -v
出现版本号表示安装成功。

1.2 安装 RabbitMQ

下载地址 https://www.rabbitmq.com/install-windows.html


往下滑,选择下载,这个下载资源来自GitHub,比较慢,可以找其他渠道下载。

下载安装成功之后,进入安装目录。


点击上方的目录,输入cmd 加一个空格 然后回车。

执行如下命令,安装管理插件 rabbitmq-plugins enable rabbitmq_management

执行命令 查看安装状态 rabbitmqctl status

点击运行脚本,运行服务器。

访问 后台管理地址 http://localhost:15672/#/
登录账号与密码都是 guest

登录成功如下

到此安装rabbitmq安装成功。

1.3 RabbitMQ后台管理系统

1.3.1 概述

由于当前没有进行任何配置,所以有效内容没有数据。

Totals:显示消息在单位时间内的数据情况。
Node:显示电脑的磁盘、内存等信息,选择表单的最右边加减号图标即可增加或者减少要识别的内容。

对连接,信道,队列流失统计

端口与上下文信息

配置文件导入与导出。

1.3.2 连接

信息的生产者与消费者的连接状态都在这里,由于没有创建,所以,没有任何信息。

1.3.3 信道

1.3.4 交换机

1.3.4 队列

1.3.4 admin

用于添加用户与虚拟主机。

1.4 配置用户与虚拟主机

为什么要配置用户与虚拟主机呢?
如果你是一个人开发,就你一个人那就没有问题,如果有很多人开发,他们都要查看rabbitmq时,只有一个账号是不行的,所以,就要配置一个用户,那虚拟机呢,主要是隔离每一个队列的配置。如,一个用户把自己的所有消息服务配置在一个虚拟主机当中,很容易管理各个的配置信息。

1.4.1 配置用户

如果不使用guest,我们也可以自己创建一个用户
1、 超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
2、 监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
3、 策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息。
4、 普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
5、 其他
无法登陆管理控制台,通常就是普通的生产者和消费者。

添加成功如下,点击用户名配置权限。

1.4.2 新增虚拟主机

主机的名称必须是 /开头

添加成功如下

设置虚拟主机的访问权限,如哪些用户能访问这个虚拟主机。

配置好之后,退出登录,使用刚刚配置好的用户登录虚拟主机。
退出登录


查看地址栏并登录,显示登录成功。

第二章 RabbitMQ 配置

2.1 rabbitmq 架构

组成部分说明:

  • Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue
  • Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
  • Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的
  • Producer:消息生产者,即生产方客户端,生产方客户端将消息发送
  • Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。


生产者发送消息流程:
1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列)
消费者接收消息流程:
1、消费者和Broker建立TCP连接
2、消费者和Broker建立通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息。
6、ack回复

点击Tutorials进入官网教程

进入之后,有7中消息模式,下面的语言是对应的案例实现。

2.2 web后台创建消息队列

2.2.1 基本消息模式 (交换机类型:direct)

这种模式没有交换机,直接生产消息,直接消费。


创建成功,如果没有出现队列,则查看当前页面的虚拟主机属性

配置队列,点击队列名称。

发送消息,其他默认

发布成功后,概述出现一条消息

获取消息

查看概述,消息已被消费

2.2.2 work queue 消息模式

work模式属于第一种的模式升级版,可以多个消费队列去消费生产者发布的消息,而且消息只能被一个消费者消费,消费者队列属于竞争关系,该模式需要用代码实现,web后台管理暂时不实现。

2.2.3 Publish/subscribe 模式(交换机类型:Fanout)


发现虚拟主机下已经默认配置了常用类型的交换机,我们就不创建了,直接点击交换机名称,进入交换机配置。
进入 amq.direct交换机


绑定队列

通过交换机发送消息到queue-01

进入queue-01 队列获取消息

消息获取成功。

2.2.4 Routing 路由模型(交换机类型:direct)

该模式下,所有的队列的key是确定的,只有指定的key与消费者队列的key相匹配,消息才能被消费。

routing路由模式,只要指定的路由key匹配,该队列才会收到消息。
创建两个队列。

分别设置rout-queue-01 rout-queue-02的队列key,设置的这个队列key会与交换机的路由key相匹配,如果匹配成功,则对应的队列消费该消息。

进入交换机查看

发送消息

查看key所在的队列,是否有该消息。
点击rout-queue-01队列查看。

获取消息。

2.2.5 Topics 通配符模式(交换机类型:topics)

该模式是上个模式的升级版,可以对路由key进行通配符操作,满足通配符匹配的队列才能消费消息。

Routingkey一般都是有一个或者多个单词组成,多个单词之间以“.”分割,例如:inform.sms
通配符规则:

#:匹配一个或多个词

*:匹配不多不少恰好1个词

案例:

AA.# : AA.AA AA.AA.BB
#.AA : AA.AA AA.BB.AA
#.AA.# : CC.BB.AA.BB.CC

BB.* : BB.B
*.BB : AA.BB

结合

.#.A..# : B.B.B.A.B.C.C

创建队列


配置每一个队列的路由key

配置完成后,进入交换机查看 amq.top 交换机查看绑定的数据。

发送消息,这个消息的key能发送到三个topic队列。

三个队列全部收到消息

进入队列查看消息。

发送一个消息只有topic-01 能收到。
topic-01.topic.topic-01.topic-01 将匹配到 *.topic.#

发送成功如下

消息内容

总结

通过web后台管理系统,实现了4种模式,分别是基本消息模式、发布与订阅模式、路由模式,主题模式,还剩三种,留到代码中去实践,先分析已经实践过的4中。
第一种基本消息模式:不需要交换机,直接是点对点的发布信息与消费消息(默认是有交换机的,只不过不需要我们配置)。

第二种发布与订阅模式:需要用到交换机,还有配置消费队列与指定的交换机相匹配,生产者通过交换机把消息发送到指定的消费队列中。

第三种Routing模式:需要用到交换机,需要配置消费队列与交换机绑定,还有设置交换机路由,交换机路由负责跟所有绑定了交换机的消费队列进行路由匹配,只有路由匹配成功的,才能消费到消息。

第四种Topic模式:需要用到交换机,需要配置消费队列与交换机绑定,还有设置交换机路由,交换机的路由配置改成了通配符,使用更加灵活。

剩余 work queue、 rpc远程调用、 Publisher Confirms模式在代码中说明。

第三章 代码配置

3.1 创建项目

创建一个maven项目

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.7.1</version>
        </dependency>
 <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>2.14.0</version>
        </dependency>

定义一个连接rabbitmq的工具类

public class ConnectionUtil 
    /**
     * 建立与RabbitMQ的连接
     * @return
     * @throws Exception
     */
    public static Connection getConnection() throws Exception 
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址 如果使用的guest账号,则这里要设置成localhost,或者 127.0.0.1
        factory.setHost("127.0.0.1");
        //端口 默认的端口,不能修改
        factory.setPort(5672);
        //设置账号信息,用户名、密码、vhost
        //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
        factory.setVirtualHost("/mq");
        factory.setUsername("mqtest");
        factory.setPassword("123456");
        // 通过工厂获取连接
        return factory.newConnection();
    

3.2 基本消息模式


P:生产者,也就是要发送消息的程序

C:消费者:消息的接受者,会一直等待消息到来。

queue:消息队列,图中红色部分。可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

生产者

public class Producer 

        private final static String QUEUE_NAME = "java_queue";

        public static void main(String[] argv) throws Exception 
            // 1、获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 2、从连接中创建信道,使用通道才能完成消息相关的操作
            Channel channel = connection.createChannel();
            // 3、声明(创建)队列
            //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
            /**
             * 参数明细
             * 1、queue 队列名称
             * 2、durable 是否持久化,如果持久化,mq重启后队列还在
             * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
             * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
             * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
             */
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 4、消息内容
            String message = "Hello RabbitMQ!";
            // 向指定的队列中发送消息
            //参数:String exchange, String routingKey, BasicProperties props, byte[] body
            /**
             * 参数明细:
             * 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")
             * 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
             * 3、props,消息的属性
             * 4、body,消息内容
             */
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");

            //关闭通道和连接(资源关闭最好用try-catch-finally语句处理)
//            channel.close();
//            connection.close();
        


代码后面我们没有关闭工厂与信道连接,所以,在web后台管理能看到连接信息,及队列信息。


消费者

public class Consumer 

        private final static String QUEUE_NAME = "java_queue";

        public static void main(String[] argv) throws Exception 
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            //创建会话通道,生产者和mq服务所有通信都在channel通道中完成
            Channel channel = connection.createChannel();
            //实现消费方法
            DefaultConsumer consumer = new DefaultConsumer(channel) 
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用

                /**
                 * 当接收到消息后此方法将被调用
                 *
                 * @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
                 * @param envelope    信封,通过envelope
                 * @param properties  消息属性
                 * @param body        消息内容
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                    //交换机
                    String exchange = envelope.getExchange();
//                    System.out.println("交换机:"+exchange);
                    //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
                    long deliveryTag = envelope.getDeliveryTag();
//                    System.out.println("信息id:"+deliveryTag);
                    // body 即消息体
                    String msg = new String(body, "utf-8");
                    System.out.println(" [x] received : " + msg + "!");
                
            ;

            // 监听队列,第二个参数:是否自动进行消息确认。
            //参数:String queue, boolean autoAck, Consumer callback
            /**
             * 参数明细:
             * 1、queue 队列名称
             * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复
             * 3、callback,消费方法,当消费者接收到消息要执行的方法
             */
            channel.basicConsume(QUEUE_NAME,true,consumer);


            //关闭信道连接
           // channel.close();
            //关闭连接
            //connection.close();
        


消息生产者与消费者已创建完成,显运行生产者生产消息,在运行消费者消费消息。

3.3 消息确认机制(ACK)

自动ACK:消息一旦被接收,消费者自动发送ACK
手动ACK:消息接收后,不会发送ACK,需要手动调用

两者的区别:
自动ACK机制:只要消费端收到消息,就回复消息被消费,然后删除该消息,出现异常,或者出现其他原因,只要消费者收到就回复确认,不管这个消息有没有处理完成。

手动ACK:需要手动确认消息已经被消费,如果消息消费了但是没有回复,消息会一直存在队列中,必须手动的回复才会消失在队列中。

消费者该为手动确认。

public class Consumer 

        private final static String QUEUE_NAME = "java_queue";

        public static void main(String[] argv) throws Exception 
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            //创建会话通道,生产者和mq服务所有通信都在channel通道中完成
            Channel channel = connection.createChannel();
            //实现消费方法
            DefaultConsumer consumer = new DefaultConsumer(channel) 
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用

                /**
                 * 当接收到消息后此方法将被调用
                 *
                 * @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
                 * @param envelope    信封,通过envelope
                 * @param properties  消息属性
                 * @param body        消息内容
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                    //交换机
                    String exchange = envelope.getExchange();
//                    System.out.println("交换机:"+exchange);
                    //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
                    long deliveryTag = envelope.getDeliveryTag();
                    System.out.println("信息id:"+deliveryTag);
                    // body 即消息体
                    String msg = new String(body, "utf-8");
                    System.out.println(" [x] received : " + msg + "!");

                    /*
                     *  void basicAck(long deliveryTag, boolean multiple) throws IOException;
                     *  deliveryTag:用来标识消息的id
                     *  multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
                     */
                    channel.basicAck(envelope.getDeliveryTag(),true);
                
            ;

            // 监听队列,第二个参数:是否自动进行消息确认。
            //参数:String queue, boolean autoAck, Consumer callback
            /**
             * 参数明细:
             * 1、queue 队列名称
             * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复
             * 3、callback,消费方法,当消费者接收到消息要执行的方法
             */
            channel.basicConsume(QUEUE_NAME,false,consumer);

            //关闭信道连接
           // channel.close();
            //关闭连接
            //connection.close();
        


3.4 work queue模式

work queues与基本消息模式相比,多了一个消费端,两个消费端共同消费同一个队列中的消息,但是一个消息只能被一个消费者获取。
接下来我们来模拟这个流程:
P:生产者:任务的发布者
C1:消费者1:领取任务并且完成任务,假设完成速度较慢(模拟耗时)
C2:消费者2:领取任务并且完成任务,假设完成速度较快

通过 BasicQos 方法设置prefetchCount = 1。这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理1个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。相反,它会将其分派给不是仍然忙碌的下一个Consumer。

值得注意的是:prefetchCount在手动ack的情况下才生效,自动ack不生效。

生产者

public class ProducerWork 

        private final static String QUEUE_NAME = "work_queue";

        public static void main(String[] argv) throws Exception 
            // 1、获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 2、从连接中创建信道,使用通道才能完成消息相关的操作
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
           for (int i=0;i<30;i++)
               // 4、消息内容
               String message = "Hello RabbitMQ:"+i;
               channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
               System.out.println(" ["+i+"] Sent '" + message + "'");
               Thread.sleep(i * 2);
           
           channel.close();
            connection.close();
        

消费者01

public class ConsumerWork01 

        private final static String QUEUE_NAME = "work_queue";
        public static void main(String[] argv) throws Exception 
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            //创建会话通道,生产者和mq服务所有通信都在channel通道中完成
            Channel channel = connection.createChannel();

            channel.basicQos(1);
            //实现消费方法
            DefaultConsumer consumer = new DefaultConsumer(channel) 
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                    String msg = new String(body, "utf-8");
                    System.out.println(" [x] received : " + msg + "!");
                    channel.basicAck(envelope.getDeliveryTag(),t

以上是关于RabbitMQ入门笔记的主要内容,如果未能解决你的问题,请参考以下文章

利用rabbitMq的死信队列实现延时消息

RabbitMQ项目使用之死信队列

RabbitMQ项目使用之死信队列

RabbitMQ 死信队列DLX

RabbitMQ死信队列

RabbitMQ实现延迟发送消息