学习笔记编程不良人老师的RabbitMQ教程的学习笔记
Posted 棉花糖灬
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了学习笔记编程不良人老师的RabbitMQ教程的学习笔记相关的知识,希望对你有一定的参考价值。
本文是B站up主“编程不良人”的RabbitMQ教程的学习笔记,up用的是CentOS,而我平常用Ubuntu比较多,所以本文是基于Ubuntu来操作的。此外貌似RabbitMQ需要root用户权限,所以以下所有的RabbitMQ相关的命令最好带有sudo
,否则可能会报错。此外,如果队列和交换机重复创建时也会报错,在网页端删除即可。
一、消息队列MQ
1. 是什么是MQ
MQ(Message Queue) : 翻译为消息队列,通过典型的生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间解耦。别名为 消息中间件通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
2. MQ有哪些
当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发RocketMQ
等。
3. 不同MQ的特点
(1) ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。它是一个完全支持JMS规范的的消息中间件。丰富的API,多种集群架构模式让ActiveMQ在业界成为老牌的消息中间件,在中小型企业颇受欢迎!
(2) Kafka
Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。
(3) RocketMQ
RocketMQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。
(4) RabbitMQ
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
RabbitMQ比Kafka可靠,Kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用,比如ELK日志收集。
二、初识RabbitMQ
基于AMQP协议,erlang语言开发,是部署最广泛的开源消息中间件,是最受欢迎的开源消息中间件之一。RabbitMQ的官网为https://www.rabbitmq.com/
。
1. AMQP 协议
AMQP(advanced message queuing protocol)在2003年时被提出,最早用于解决金融领不同平台之间的消息传递交互问题。顾名思义,AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。这使得实现了AMQP的provider天然性就是跨平台的。以下是AMQP协议模型:
如图所示,生产者(Publisher)将消息发送给虚拟主机(Virtual host)中的交换机(Exchange),交换机和消息队列(Message Queue)之间有绑定关系,消费者(Consumer)通过消息队列来消费消息。
2. RabbitMQ的安装
Ubuntu安装RabiitMQ,参考了文章Ubuntu16.04 18.04 安装rabbitmq 配置、使用详细教程。
-
sudo apt-get install erlang-nox
:因为RabbitMQ是用erlang语言开发的,所以要先安装好语言环境 -
sudo apt-get install rabbitmq-server
:安装RabbitMQ -
sudo rabbitmq-plugins enable rabbitmq_management
:启动RabbitMQ中的插件管理 -
systemctl start rabbitmq-server
:启动RabbitMQ -
由于RabbitMQ默认占用的是15672端口,所以在浏览器访问
localhost:15672
,显示如下界面登录的用户名和密码都是
guest
,登录后页面如下:
安装好之后,RabbitMQ配置文件的位置为/etc/rabbitmq/rabbitmq-env.conf
。在使用时需要关闭防火墙:
systemctl disable firewalld
systemctl stop firewalld
三、RabiitMQ配置
1. RabbitMQ管理命令行
(1) 服务启动相关
systemctl start rabbitmq-server
:启动RabbitMQsystemctl restart rabbitmq-server
:重启RabbitMQsystemctl stop rabbitmq-server
:停止RabbitMQsystemctl status rabbitmq-server
:查看RabbitMQ的状态
(2) 管理命令行
rabbitmqctl help
:查看更多命令,用来在不使用web管理界面情况下命令操作RabbitMQ
(3) 插件管理命令行
rabbitmq-plugins enable
:启动插件rabbitmq-plugins list
:列出所有插件rabbitmq-plugins disable
:关闭插件
2. Web管理界面
(1) Overview概览
connections
:无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费,在这里可以查看连接情况channels
:通道,建立连接后,会形成通道,消息的投递获取依赖通道Exchanges
:交换机,用来实现消息的路由Queues
:队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列
(2) Admin用户和虚拟主机管理
i) 添加用户
上面的Tags选项,其实是指定用户的角色,可选的有以下几个:
-
超级管理员(administrator):可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作
-
监控者(monitoring):可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
-
策略制定者(policymaker):可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)
-
普通管理者(management):仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理
-
其他:无法登陆管理控制台,通常就是普通的生产者和消费者
ii) 创建虚拟主机
iii) 绑定虚拟主机和用户
创建好虚拟主机,我们还要给用户添加访问权限:
点击添加好的虚拟主机:
进入虚拟机设置界面
四、RabbitMQ的第一个程序
1. AMQP协议回顾
生产者通过通道发送消息,每个生产者对应一个虚拟主机,需要将虚拟主机和用户绑定之后才有访问权限。消息要不要放到交换机中取决于所使用的消息模型,消息不放到交换机时会直接放到消息队列中。消费者和生产者是解耦的,它只关心消息队列中有没有相应的消息,消费者消费消息时也需要连接虚拟主机。
2. AMQP支持的消息模型
本文不涉及第6种模型,并且有一种新出的模型,也不涉及。
3. 引入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.2</version>
</dependency>
4. 第一种模型(直连)
在上图的模型中,有以下概念:
- P:生产者,也就是要发送消息的程序
- C:消费者:消息的接受者,会一直等待消息到来
- queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息
该模型中只有一个生产者和一个消费者,生产者将消息发送到消息队列,生产者对消息队列进行监听,从消息队列取出消息进行消费。
(1) 在网页创建虚拟主机和用户
在Admin选项中
- 创建名为
/ems
虚拟主机,虚拟主机都是以/
开头的 - 创建名为
ems
的用户 - 点击
ems
的用户名与/ems
虚拟主机进行绑定
(2) 开发生产者
package HelloWorld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import org.junit.Test;
import utils.MQConnection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Provider {
// 生产消息
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 创建连接MQ的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置连接RabbitMQ主机
connectionFactory.setHost("192.168.114.129");
// 设置端口号
connectionFactory.setPort(5672);
// 设置连接哪个虚拟主机
connectionFactory.setVirtualHost("/ems");
// 设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("ems");
connectionFactory.setPassword("ems");
// 获取连接
Connection connection = connectionFactory.newConnection();
// 获取连接中通道
Channel channel = connection.createChannel();
// 通道绑定对应消息队列
// 参数:队列名称、队列是否持久化(重启RabbitMQ后队列是否还存在,消息仍会丢失)、是否独占队列、是否在消费完成后自动删除队列、额外附加参数
channel.queueDeclare("hello", false, false, false, null);
// 发布消息
// 参数:交换机名称、队列名称、传递消息额外设置、消息的具体内容
// MessageProperties.PERSISTENT_TEXT_PLAIN:重启RabbitMQ后消息仍会存在
channel.basicPublish("","hello",null,"hello RabbitMQ".getBytes());
// 关闭通道和连接
channel.close();
connection.close();
}
}
注意需要关闭RabbitMQ所在系统的防火墙,否则会报错。
(3) 开发消费者
package HelloWorld;
import com.rabbitmq.client.*;
import utils.MQConnection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
// 注意需要是main函数,因为消费者要一直监听
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接MQ的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置连接RabbitMQ主机
connectionFactory.setHost("192.168.114.129");
// 设置端口号
connectionFactory.setPort(5672);
// 设置连接哪个虚拟主机
connectionFactory.setVirtualHost("/ems");
// 设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("ems");
connectionFactory.setPassword("ems");
// 获取连接
Connection connection = connectionFactory.newConnection();
// 获取连接中通道
Channel channel = connection.createChannel();
// 通道绑定对应消息队列
// 参数:队列名称、队列是否持久化、是否独占队列、是否在消费完成后自动删除队列、额外附加参数
// 要保证生产者和消费者队列的参数一致
channel.queueDeclare("hello", false, false, false, null);
// 参数:队列名、开始消息时的自动确认机制、费消息时的回调接口
channel.basicConsume("hello", true, new DefaultConsumer(channel) {
@Override
// body:从队列中取出的消息
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Message: " + new String(body));
}
});
}
}
(4) 封装工具类
将生产者和消费者中重复的代码封装为工具类
package Utils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class MQConnection {
// 创建连接MQ的连接工厂对象
public static ConnectionFactory connectionFactory;
// 静态代码块中的内容只在类加载的时候执行一次
static {
connectionFactory = new ConnectionFactory();
}
// 提供连接对象的方法
public static Connection getConnection() {
try {
// 设置连接RabbitMQ主机
connectionFactory.setHost("192.168.114.129");
// 设置端口号
connectionFactory.setPort(5672);
// 设置连接哪个虚拟主机
connectionFactory.setVirtualHost("/ems");
// 设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("ems");
connectionFactory.setPassword("ems");
// 获取连接
return connectionFactory.newConnection();
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
// 关闭通道和连接的方法
public static void closeChannelAndConnection(Channel channel, Connection connection) throws IOException, TimeoutException {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
}
}
5. 第二种模型(work queue)
Work queues,也被称为(Task queues),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
角色:
- P:生产者:任务的发布者
- C1:消费者-1,领取任务并且完成任务,假设完成速度较慢
- C2:消费者-2:领取任务并完成任务,假设完成速度快
(1) 开发生产者
package workqueue;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.MQConnection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Provider {
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接对象
Connection connection = MQConnection.getConnection();
// 获取通道对象
Channel channel = connection.createChannel();
// 通过通道声明队列
channel.queueDeclare("work", true, false, false, null);
for (int i = 0; i < 10; i++) {
// 生产消息
channel.basicPublish("", "work", null, (i + " hello work queque").getBytes());
}
// 关闭资源
MQConnection.closeChannelAndConnection(channel, connection);
}
}
(2) 开发消费者1
package workqueue;
import com.rabbitmq.client.*;
import utils.MQConnection;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) throws IOException {
// 获取连接对象
Connection connection = MQConnection.getConnection();
// 获取通道对象
Channel channel = connection.createChannel();
// 通过通道声明队列
channel.queueDeclare("work", true, false, false, null);
channel.basicConsume("work", true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:"+new String(body));
}
});
}
}
(3) 开发消费者2
package workqueue;
import com.rabbitmq.client.*;
import utils.MQConnection;
import java.io.IOException;
public class Consumer2 {
public static void main(String[] args) throws IOException {
// 获取连接对象
Connection connection = MQConnection.getConnection();
// 获取通道对象
Channel channel = connection.createChannel();
// 通过通道声明队列
channel.queueDeclare("work", true, false, false, null);
channel.basicConsume("work", true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2:"+new String(body));
}
});
}
}
(4) 测试结果
先运行Consumer1和Consumer2,再运行Provider。默认RabbitMQ将按顺序将每个消息发送给下一个使用者,无论两个消费者对消息的处理速度是否一致,其能消耗的消息数都是平均分配的,这种分发消息的方式称为循环。
(5) 消息自动确认机制
当多个消费者处理消息的速度不同时,可以关闭自动确认并设置每次能消费的消息个数来实现能者多劳。
package workqueue;
import com.rabbitmq.client.*;
import utils.MQConnection;
import java.io.IOException;
public class Consumer2 {
public static void main(String[] args) throws IOException {
// 获取连接对象
Connection connection = MQConnection.getConnection();
// 获取通道对象
Channel channel = connection.createChannel();
// 每次只能消费1个消息
channel.basicQos(1);
// 通过通道声明队列
channel.queueDeclare("work", true, false, false, null);
// 关闭自动确认,需要手动确认
channel.basicConsume("work", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者2:" + new String(body));
// 手动确认
// 参数:确认队列中哪个具体消息、是否开启多个消息同时确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
6. 第三种模型(fanout)
fanout:扇出,也称为广播
在广播模式下,消息发送流程是这样的:
- 可以有多个消费者
- 每个消费者有自己的queue(队列)
- 每个队列都要绑定到Exchange(交换机)
- 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
- 交换机把消息发送给绑定过的所有
以上是关于学习笔记编程不良人老师的RabbitMQ教程的学习笔记的主要内容,如果未能解决你的问题,请参考以下文章