RabbitMQ入门笔记
Posted 在编程的路上跌跌撞撞
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ入门笔记相关的知识,希望对你有一定的参考价值。
文章目录
第一章 安装RabbitMQ
RabbitMQ是一个消息中间件,遵循AMQP协议 (Advanced Message Queuing Protocol)。由Erlang语言开发,所以安装rabbitmq之前先安装Erlang环境。
Erlang与rabbitmq的版本必须匹配,版本匹配信息可以查看rabbit官网
官网 https://www.rabbitmq.com/which-erlang.html
1.1 安装Erlang
网址:https://www.erlang.org/downloads
右边是对应的版本信息,中间是选择下载的系统环境。
下载成功,点击安装,配置环境。
打开电脑的环境配置,变量值是erlang的安装路径。
修改Path的值,点击新建 复制 %ERLANG_HOME%\\bin
然后保存,这里的作用是为了在cmd窗口能够执行erlang的命令。
测试,win + r 打开cmd窗口,输入 erl -v
出现版本号表示安装成功。
1.2 安装 RabbitMQ
下载地址 https://www.rabbitmq.com/install-windows.html
往下滑,选择下载,这个下载资源来自GitHub,比较慢,可以找其他渠道下载。
下载安装成功之后,进入安装目录。
点击上方的目录,输入cmd 加一个空格 然后回车。
执行如下命令,安装管理插件 rabbitmq-plugins enable rabbitmq_management
执行命令 查看安装状态 rabbitmqctl status
点击运行脚本,运行服务器。
访问 后台管理地址 http://localhost:15672/#/
登录账号与密码都是 guest
登录成功如下
到此安装rabbitmq安装成功。
1.3 RabbitMQ后台管理系统
1.3.1 概述
由于当前没有进行任何配置,所以有效内容没有数据。
Totals:显示消息在单位时间内的数据情况。
Node:显示电脑的磁盘、内存等信息,选择表单的最右边加减号图标即可增加或者减少要识别的内容。
对连接,信道,队列流失统计
端口与上下文信息
配置文件导入与导出。
1.3.2 连接
信息的生产者与消费者的连接状态都在这里,由于没有创建,所以,没有任何信息。
1.3.3 信道
1.3.4 交换机
1.3.4 队列
1.3.4 admin
用于添加用户与虚拟主机。
1.4 配置用户与虚拟主机
为什么要配置用户与虚拟主机呢?
如果你是一个人开发,就你一个人那就没有问题,如果有很多人开发,他们都要查看rabbitmq时,只有一个账号是不行的,所以,就要配置一个用户,那虚拟机呢,主要是隔离每一个队列的配置。如,一个用户把自己的所有消息服务配置在一个虚拟主机当中,很容易管理各个的配置信息。
1.4.1 配置用户
如果不使用guest,我们也可以自己创建一个用户
1、 超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
2、 监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
3、 策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息。
4、 普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
5、 其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
添加成功如下,点击用户名配置权限。
1.4.2 新增虚拟主机
主机的名称必须是 /
开头
添加成功如下
设置虚拟主机的访问权限,如哪些用户能访问这个虚拟主机。
配置好之后,退出登录,使用刚刚配置好的用户登录虚拟主机。
退出登录
查看地址栏并登录,显示登录成功。
第二章 RabbitMQ 配置
2.1 rabbitmq 架构
组成部分说明:
- Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue
- Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
- Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的
- Producer:消息生产者,即生产方客户端,生产方客户端将消息发送
- Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
生产者发送消息流程:
1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列)
消费者接收消息流程:
1、消费者和Broker建立TCP连接
2、消费者和Broker建立通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息。
6、ack回复
点击Tutorials进入官网教程
进入之后,有7中消息模式,下面的语言是对应的案例实现。
2.2 web后台创建消息队列
2.2.1 基本消息模式 (交换机类型:direct)
这种模式没有交换机,直接生产消息,直接消费。
创建成功,如果没有出现队列,则查看当前页面的虚拟主机属性
配置队列,点击队列名称。
发送消息,其他默认
发布成功后,概述出现一条消息
获取消息
查看概述,消息已被消费
2.2.2 work queue 消息模式
work模式属于第一种的模式升级版,可以多个消费队列去消费生产者发布的消息,而且消息只能被一个消费者消费,消费者队列属于竞争关系,该模式需要用代码实现,web后台管理暂时不实现。
2.2.3 Publish/subscribe 模式(交换机类型:Fanout)
发现虚拟主机下已经默认配置了常用类型的交换机,我们就不创建了,直接点击交换机名称,进入交换机配置。
进入 amq.direct
交换机
绑定队列
通过交换机发送消息到queue-01
进入queue-01 队列获取消息
消息获取成功。
2.2.4 Routing 路由模型(交换机类型:direct)
该模式下,所有的队列的key是确定的,只有指定的key与消费者队列的key相匹配,消息才能被消费。
routing路由模式,只要指定的路由key匹配,该队列才会收到消息。
创建两个队列。
分别设置rout-queue-01 rout-queue-02的队列key,设置的这个队列key会与交换机的路由key相匹配,如果匹配成功,则对应的队列消费该消息。
进入交换机查看
发送消息
查看key所在的队列,是否有该消息。
点击rout-queue-01队列查看。
获取消息。
2.2.5 Topics 通配符模式(交换机类型:topics)
该模式是上个模式的升级版,可以对路由key进行通配符操作,满足通配符匹配的队列才能消费消息。
Routingkey一般都是有一个或者多个单词组成,多个单词之间以“.”分割,例如:inform.sms
通配符规则:
#
:匹配一个或多个词
*
:匹配不多不少恰好1个词
案例:
AA.# : AA.AA AA.AA.BB
#.AA : AA.AA AA.BB.AA
#.AA.# : CC.BB.AA.BB.CC
BB.* : BB.B
*.BB : AA.BB
结合
.#.A..# : B.B.B.A.B.C.C
创建队列
配置每一个队列的路由key
配置完成后,进入交换机查看 amq.top 交换机查看绑定的数据。
发送消息,这个消息的key能发送到三个topic队列。
三个队列全部收到消息
进入队列查看消息。
发送一个消息只有topic-01 能收到。
topic-01.topic.topic-01.topic-01
将匹配到 *.topic.#
发送成功如下
消息内容
总结
通过web后台管理系统,实现了4种模式,分别是基本消息模式、发布与订阅模式、路由模式,主题模式,还剩三种,留到代码中去实践,先分析已经实践过的4中。
第一种基本消息模式:不需要交换机,直接是点对点的发布信息与消费消息(默认是有交换机的,只不过不需要我们配置)。
第二种发布与订阅模式:需要用到交换机,还有配置消费队列与指定的交换机相匹配,生产者通过交换机把消息发送到指定的消费队列中。
第三种Routing模式:需要用到交换机,需要配置消费队列与交换机绑定,还有设置交换机路由,交换机路由负责跟所有绑定了交换机的消费队列进行路由匹配,只有路由匹配成功的,才能消费到消息。
第四种Topic模式:需要用到交换机,需要配置消费队列与交换机绑定,还有设置交换机路由,交换机的路由配置改成了通配符,使用更加灵活。
剩余 work queue、 rpc远程调用、 Publisher Confirms模式在代码中说明。
第三章 代码配置
3.1 创建项目
创建一个maven项目
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.14.0</version>
</dependency>
定义一个连接rabbitmq的工具类
public class ConnectionUtil
/**
* 建立与RabbitMQ的连接
* @return
* @throws Exception
*/
public static Connection getConnection() throws Exception
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址 如果使用的guest账号,则这里要设置成localhost,或者 127.0.0.1
factory.setHost("127.0.0.1");
//端口 默认的端口,不能修改
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
factory.setVirtualHost("/mq");
factory.setUsername("mqtest");
factory.setPassword("123456");
// 通过工厂获取连接
return factory.newConnection();
3.2 基本消息模式
P:生产者,也就是要发送消息的程序
C:消费者:消息的接受者,会一直等待消息到来。
queue:消息队列,图中红色部分。可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
生产者
public class Producer
private final static String QUEUE_NAME = "java_queue";
public static void main(String[] argv) throws Exception
// 1、获取到连接
Connection connection = ConnectionUtil.getConnection();
// 2、从连接中创建信道,使用通道才能完成消息相关的操作
Channel channel = connection.createChannel();
// 3、声明(创建)队列
//参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
/**
* 参数明细
* 1、queue 队列名称
* 2、durable 是否持久化,如果持久化,mq重启后队列还在
* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4、消息内容
String message = "Hello RabbitMQ!";
// 向指定的队列中发送消息
//参数:String exchange, String routingKey, BasicProperties props, byte[] body
/**
* 参数明细:
* 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")
* 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
* 3、props,消息的属性
* 4、body,消息内容
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//关闭通道和连接(资源关闭最好用try-catch-finally语句处理)
// channel.close();
// connection.close();
代码后面我们没有关闭工厂与信道连接,所以,在web后台管理能看到连接信息,及队列信息。
消费者
public class Consumer
private final static String QUEUE_NAME = "java_queue";
public static void main(String[] argv) throws Exception
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel();
//实现消费方法
DefaultConsumer consumer = new DefaultConsumer(channel)
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
/**
* 当接收到消息后此方法将被调用
*
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
//交换机
String exchange = envelope.getExchange();
// System.out.println("交换机:"+exchange);
//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
long deliveryTag = envelope.getDeliveryTag();
// System.out.println("信息id:"+deliveryTag);
// body 即消息体
String msg = new String(body, "utf-8");
System.out.println(" [x] received : " + msg + "!");
;
// 监听队列,第二个参数:是否自动进行消息确认。
//参数:String queue, boolean autoAck, Consumer callback
/**
* 参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复
* 3、callback,消费方法,当消费者接收到消息要执行的方法
*/
channel.basicConsume(QUEUE_NAME,true,consumer);
//关闭信道连接
// channel.close();
//关闭连接
//connection.close();
消息生产者与消费者已创建完成,显运行生产者生产消息,在运行消费者消费消息。
3.3 消息确认机制(ACK)
自动ACK:消息一旦被接收,消费者自动发送ACK
手动ACK:消息接收后,不会发送ACK,需要手动调用
两者的区别:
自动ACK机制:只要消费端收到消息,就回复消息被消费,然后删除该消息,出现异常,或者出现其他原因,只要消费者收到就回复确认,不管这个消息有没有处理完成。
手动ACK:需要手动确认消息已经被消费,如果消息消费了但是没有回复,消息会一直存在队列中,必须手动的回复才会消失在队列中。
消费者该为手动确认。
public class Consumer
private final static String QUEUE_NAME = "java_queue";
public static void main(String[] argv) throws Exception
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel();
//实现消费方法
DefaultConsumer consumer = new DefaultConsumer(channel)
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
/**
* 当接收到消息后此方法将被调用
*
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
//交换机
String exchange = envelope.getExchange();
// System.out.println("交换机:"+exchange);
//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
long deliveryTag = envelope.getDeliveryTag();
System.out.println("信息id:"+deliveryTag);
// body 即消息体
String msg = new String(body, "utf-8");
System.out.println(" [x] received : " + msg + "!");
/*
* void basicAck(long deliveryTag, boolean multiple) throws IOException;
* deliveryTag:用来标识消息的id
* multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
*/
channel.basicAck(envelope.getDeliveryTag(),true);
;
// 监听队列,第二个参数:是否自动进行消息确认。
//参数:String queue, boolean autoAck, Consumer callback
/**
* 参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复
* 3、callback,消费方法,当消费者接收到消息要执行的方法
*/
channel.basicConsume(QUEUE_NAME,false,consumer);
//关闭信道连接
// channel.close();
//关闭连接
//connection.close();
3.4 work queue模式
work queues与基本消息模式相比,多了一个消费端,两个消费端共同消费同一个队列中的消息,但是一个消息只能被一个消费者获取。
接下来我们来模拟这个流程:
P:生产者:任务的发布者
C1:消费者1:领取任务并且完成任务,假设完成速度较慢(模拟耗时)
C2:消费者2:领取任务并且完成任务,假设完成速度较快
通过 BasicQos 方法设置prefetchCount = 1。这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理1个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。相反,它会将其分派给不是仍然忙碌的下一个Consumer。
值得注意的是:prefetchCount在手动ack的情况下才生效,自动ack不生效。
生产者
public class ProducerWork
private final static String QUEUE_NAME = "work_queue";
public static void main(String[] argv) throws Exception
// 1、获取到连接
Connection connection = ConnectionUtil.getConnection();
// 2、从连接中创建信道,使用通道才能完成消息相关的操作
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i=0;i<30;i++)
// 4、消息内容
String message = "Hello RabbitMQ:"+i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" ["+i+"] Sent '" + message + "'");
Thread.sleep(i * 2);
channel.close();
connection.close();
消费者01
public class ConsumerWork01
private final static String QUEUE_NAME = "work_queue";
public static void main(String[] argv) throws Exception
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel();
channel.basicQos(1);
//实现消费方法
DefaultConsumer consumer = new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
String msg = new String(body, "utf-8");
System.out.println(" [x] received : " + msg + "!");
channel.basicAck(envelope.getDeliveryTag(),t以上是关于RabbitMQ入门笔记的主要内容,如果未能解决你的问题,请参考以下文章