一文入门RabbitMQ消息队列
Posted 微笑pro
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一文入门RabbitMQ消息队列相关的知识,希望对你有一定的参考价值。
本文是基于B站-黑马程序员发布的RabbitMQ教程所记录(高级部分后续补上),仅供学习参考,如果需要更详细的文档,请移步官网哦
1、MQ基本概念
1、概述
MQ全称Message Queue(消息队列),是在消息的传输过程中保存消息的容器,多用于分布式系统之间进行通信。
即:
- MQ,消息列表,存储消息的中间件
- 分布式系统通信的两种方式:直接远程调用和借助第三方完成间接通信
- 发送方称为生产者,接收方称为消费者。
2、MQ的优势和劣势
优势
-
应用解耦:使用MQ使得应用间解耦,提升容错性和可维护性。举个例子,如订单系统需要调用库存系统、支付系统、物流系统完成服务,如果通过远程调用,二者就耦合在一起了,如果使用MQ,订单系统将消息发送到MQ,其它系统去MQ中取出消息进行消费即可,大大降低了模块之间的耦合。且如果需要加入新系统,原订单系统也无需修改代码,只需要让新模块同样去MQ中取出订单系统的消息消费即可。
-
异步提速:使用MQ提升用户体验和系统吞吐量。举个例子,如用户下订单,根据传统的执行流程,首先需要花费300ms调用库存系统,再花费300ms调用支付系统,再花费300ms调用物流系统,最后花费20ms写入数据库。这样依赖就花费了大约1s时间,较为影响用户体验,因此,如果使用MQ,则用户下完订单,花费20ms写入数据库,花费5ms将消息发送至MQ中,然后返回成功消息,一共花费25ms,而其它系统自行取消息执行,时间不计入。这样依赖就大大降低了用户等待时间,提升了响应速度。
-
削峰填谷:使用MQ提高系统稳定性。举个例子,如某网站推出十二点准时一元抢家电活动,等到十二点,一瞬间有五千个请求涌入,如果直接用服务器去接收这些流量,服务器会瞬间宕机,严重影响用户体验。如果使用MQ,作为中间间缓存请求,即让请求首先访问的是MQ,而不是直接打到服务器,然后让服务器去按处理阈值去MQ中取请求进行处理,这样服务器就能正常工作,这就是MQ削峰,服务器填谷。
劣势
- 系统可用性降低:传统方式使用远程调用只要考虑两个系统的可用性,如果使用MQ,则还需要考虑MQ的工作状态。即系统引入的外部依赖越多,系统稳定性越差。一旦MQ宕机,就会对业务造成严重影响,所以还需要考虑MQ的高可用。
- 系统复杂度提高:使用了MQ,还需要考虑很多可能发生的问题,如如何保证消息没有被重复消费,如何处理消息丢失情况,如何保证消息传递的顺序性等等问题,对这些问题的处理将大大增加系统的复杂度。
- 一致性问题:举个例子,如A系统处理完业务,通过MQ给B,C,D三个系统发送消息数据,如果B系统、C系统处理成功,D系统处理失败,这个就需要探讨一致性问题的处理,如何保证数据消息处理的一致性问题。
总结
使用MQ既有优势也有劣势,使用MQ最好满足以下条件再考虑使用:
- 生产者不需要从消费者处获得反馈。即使用MQ允许异步的条件是生产者发送完消息后不需要考虑消费者的返回值,这才让所谓异步称为可能。
- 容许短暂的不一致性。
- 使用后效益大于成本。即解耦、提速、削峰这些方面的收益,超过加入MQ带来的系统成本。
3、常见的MQ产品
目前业界有很多的MQ产品,例如RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ等,当然,也有直接使用Redis充当消息队列的案例,而这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及MQ产品特征,综合考虑。
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里巴巴 | Apache |
开发语言 | Erlang | Java | Java | Scale&Java |
协议支持 | AMOP、XMPP、SMTP、STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义 | 自定义协议,社区封装了HTTP协议支持 |
客户端支持语言 | 官方支持Erlang、Java、Ruby等,社区产出多种API,几乎支持所有语言 | Java,C,C++,Python,php,perl,.net等 | Java,C++(不成熟) | 官方支持Java,社区产出多种API,如PHP,Python |
单机吞吐量 | 万级(其次) | 万级(最差) | 十万级(最好) | 十万级(次之) |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
功能特性 | 并发能力强,性能极其好,延时低,社区活跃,管理界面丰富 | 老牌产品,成熟度高,文档较多 | MQ功能比较完备,扩展性佳 | 只支持主要的MQ功能,毕竟是为大数据领域准备的。 |
4、RabbitMQ简介
介绍RabbitMQ前,先介绍下AMQP协议,即Advanced Message Queuing Protocol(高级消息队列协议)是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计规范。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品、不同的开发语言等条件的限制,该协议规范于2006年发布。
在2007年,Rabbit公司基于AMQP标准开发的RabbitMQ1.0发布。RabbitMQ采用Erlang语言开发。架构如下:
其中的相关概念为:
- Broker:接收和分发消息的应用,RabbitMQ Server就是Message Broker
- Virtual host:基于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ Server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange / queue等。
- Connection:publisher / consumer 和 broker之间的TCP连接。
- Channel:如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通信,AMQP method包含了channel id帮助客户端和message broker 识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。
- Exchange:message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct(point-to-point),topic(public-subscribe),fanout(multicast)。
- Queue:消息最终被送到这里等待consumer取走。
- Binding:exchange和queue之间的虚拟连接,binding中可以包含routing key。Binding信息被保存到exchange中的查询表中,用于message的分发依据。
RabbitMQ提供了六种工作模式:简单模式、work queues、Publish/Subscribe发布与订阅模式、Routing路由模式、Topic主题模式、RPC远程调用模式(远程调用,不太算MQ,不作介绍)。
2、RabbitMQ的安装和配置
3、RabbitMQ简单模式入门
使用 Java 代码测试一下RabbitMQ的简单模式。首先在项目中引入RabbitMQ的客户端依赖(操作RabbitMQ),并且RabbitMQ的服务要启动起来。
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
然后根据MQ架构来编写生产者相关代码:
1、生产者代码
//1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2、设置参数
factory.setHost("192.168.xx.xx");//ip地址 默认值为Localhost(不设置情况下)
factory.setPort(5672);//端口 默认值为5672
factory.setVirtualHost("/");//设置虚拟机 默认为/
factory.setUsername("guest");//用户名 默认为guest
factory.setPassword("guest");//密码 默认为guest
//3、创建连接 Connection
Connection connection = factory.newConnection();
//4、创建Channel
Channel channel = connection.createChannel();
//5、创建队列Queue
/*
* 队列创建方法: queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguements):没有该队列时自动创建该队列
* 参数:
* 1、queue: 队列名称
* 2、durable: 是否持久化,即当mq重启之后,其中的消息还在
* 3、exclusive: 是否独占,即是否只能有一个消费者监听这队列,当Connection关闭时,是否删除队列。
* 4、autoDelete: 是否自动删除。即当没有Consumer时,自动删除队列。
* 5、arguments: 删除的一些参数
*/
channel.queueDeclare("hello_world", true, false, false, null);
//6、发送消息
/*
* 消息发送方法: basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
* 参数:
* 1、exchange: 交换机名称。简单模式下交换机会使用默认的""
* 2、routingKey: 路由名称。
* 3、props: 配置信息
* 4、body: 发送消息数据
*
*/
channel.basicPublic("", "hello_world", null, "hello rabbitmq".getBytes());
//7、释放资源
channel.close();
connection.close();
2、消费者代码
//1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2、设置参数
factory.setHost("192.168.xx.xx");//ip地址 默认值为Localhost(不设置情况下)
factory.setPort(5672);//端口 默认值为5672
factory.setVirtualHost("/");//设置虚拟机 默认为/
factory.setUsername("guest");//用户名 默认为guest
factory.setPassword("guest");//密码 默认为guest
//3、创建连接 Connection
Connection connection = factory.newConnection();
//4、创建Channel
Channel channel = connection.createChannel();
//5、接收消息
//创建回调对象在接收到消息后进行方法回调
Consumer consumer = new DefaultConsumer(channel)
/*
* 覆写回调方法,当受到消息后,会自动执行该方法
* 参数:
* 1、consumerTag: 唯一标识
* 2、envelope: 获取一些信息,交换机,路由Key...
* 3、properties: 配置信息
* 4、body: 消息数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
System.out.println("consumerTag:" + consumerTag);
System.out.println("Exchange:" + envelope.getExchange());
System.out.println("RoutingKey:" + envelope.getRoutingKey());
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
/*
* 监听方法: basicConsume(String queue, boolean autoAck, Consumer callback)
* 参数:
* 1、queue: 队列名称
* 2、autoAck: 是否自动确认
* 3、callback: 回调对象
*/
channel.basicConsume("hello_world", true, consumer);
//释放资源?不用
4、RabbitMQ的工作模式
上面已经对简单模式进行了测试。不难看出,RabbitMQ的各种工作模式其实就是消息的路由策略和分发方式不一样。
1、Work queues 工作队列模式
Work queues工作队列模式相较于简单模式,只是在多了一个或多个消费者,这些消费者竞争同一个队列中的消息。对于任务较多情况或任务过重的情况下使用工作队列可以提高任务处理的速度。
2、Pub / Sub 订阅模式
Pub / Sub订阅模型中,多了一个Exchange角色(其实其它模式也有,只不过使用的是默认交换机),在使用时与简单模式和工作队列模式略有区别。
如上图中,简单介绍下:
-
P:生产者,也就是要发送消息的程序,但是不再直接发送消息到队列中,而是发给X(交换机)
-
C:消费者,消息的接收者,会一直监听队列等待消息。
-
Queue:消息队列,接收消息,缓存消息。
-
Exchange:交换机(X),一方面,接收生产者发送的消息。另一方面,直到如何处理消息,例如递交给某个特定队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange常见类型有以下三种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式)的队列
Exchange(交换机)只负责转发消息,并不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会被丢失。
1、生产者代码(Fanout模式)
//1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2、设置参数
factory.setHost("192.168.xx.xx");//ip地址 默认值为Localhost(不设置情况下)
factory.setPort(5672);//端口 默认值为5672
factory.setVirtualHost("/");//设置虚拟机 默认为/
factory.setUsername("guest");//用户名 默认为guest
factory.setPassword("guest");//密码 默认为guest
//3、创建连接 Connection
Connection connection = factory.newConnection();
//4、创建Channel
Channel channel = connection.createChannel();
//5、创建交换机
/*
* 创建交换机方法: exchangeDeclare(String exchange, BuiltExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguements)
* 参数:
* 1、exchange: 交换机名称
* 2、type: 交换机类型(枚举)
* DIRECT("direct"): 定向
* FANOUT("fanout"): 扇形(广播),发送消息到每一个与之绑定队列
* TOPIC("topic"): 通配符的方式
* HEADERS("headers"): 参数匹配
* 3、durable: 是否持久化
* 4、autoDelete: 自动删除
* 5、internal: 内部使用。一般为false。
* 6、arguments: 参数
*/
channel.exchangeDeclare("test_fanout", BuiltExchangeType.FANOUT, true, false, false, null);
//6、创建队列Queue
/*
* 队列创建方法: queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguements):没有该队列时自动创建该队列
* 参数:
* 1、queue: 队列名称
* 2、durable: 是否持久化,即当mq重启之后,其中的消息还在
* 3、exclusive: 是否独占,即是否只能有一个消费者监听这队列,当Connection关闭时,是否删除队列。
* 4、autoDelete: 是否自动删除。即当没有Consumer时,自动删除队列。
* 5、arguments: 删除的一些参数
*/
channel.queueDeclare("test_fanout_queue1", true, false, false, null);
channel.queueDeclare("test_fanout_queue2", true, false, false, null);
//7、绑定队列和交换机
/*
* 绑定方法: queueBind(String queue, String exchange, String routingKey)
* 参数:
* 1、queue: 队列名称
* 2、exchange: 交换机名称
* 3、routingKey: 路由键,即绑定规则。如果交换机的类型为fanout,routingKey无论怎么设置都会给绑定的队列发送消息。
*/
channel.queueBind("test_fanout_queue1", "test_fanout", "");
channel.queueBind("test_fanout_queue2", "test_fanout", "");
//6、发送消息
/*
* 消息发送方法: basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
* 参数:
* 1、exchange: 交换机名称。简单模式下交换机会使用默认的""
* 2、routingKey: 路由名称。
* 3、props: 配置信息
* 4、body: 发送消息数据
*
*/
channel.basicPublic("test_fanout", "", null, "hello rabbitmq".getBytes());
//7、释放资源
channel.close();
connection.close();
2、消费者代码
多个消费者的话监听不同队列即可。
消费者1
//1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2、设置参数
factory.setHost("192.168.xx.xx");//ip地址 默认值为Localhost(不设置情况下)
factory.setPort(5672);//端口 默认值为5672
factory.setVirtualHost("/");//设置虚拟机 默认为/
factory.setUsername("guest");//用户名 默认为guest
factory.setPassword("guest");//密码 默认为guest
//3、创建连接 Connection
Connection connection = factory.newConnection();
//4、创建Channel
Channel channel = connection.createChannel();
//5、接收消息
Consumer consumer = new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
System.out.println("body:" + new String(body));
System.out.println("将数据保存到数据库...");
channel.basicConsume("test_fanout_queue1", true, consumer);
//释放资源?不用
消费者2
//1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2、设置参数
factory.setHost("192.168.xx.xx");//ip地址 默认值为Localhost(不设置情况下)
factory.setPort(5672);//端口 默认值为5672
factory.setVirtualHost("/");//设置虚拟机 默认为/
factory.setUsername("guest");//用户名 默认为guest
factory.setPassword("guest");//密码 默认为guest
//3、创建连接 Connection
Connection connection = factory.newConnection();
//4、创建Channel
Channel channel = connection.createChannel();
//5、接收消息
Consumer consumer = new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
System.out.println("body:" + new String(body));
System.out.println("将消息日志打印到控制台...");
channel.basicConsume("test_fanout_queue2", true, consumer);
//释放资源?不用
3、Routing工作模式
Routing工作模式是基于routingKey来工作的,比如在发送消息到交换机前要指定routingKey,然后交换机通过routingKey发送到不同的队列中(队列在创建时可以指定自己的routingKey),让不同消费者消费。
1、direct交换机模式
根据发送的 routingKey发送到指定routingKey的队列中。
生产者:
//1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2、设置参数
factory.setHost("192.168.xx.xx");//ip地址 默认值为Localhost(不设置情况下)
factory.setPort(5672);//端口 默认值为5672
factory.setVirtualHost("/");//设置虚拟机 默认为/
factory.setUsername("guest");//用户名 默认为guest
factory.setPassword("guest");//密码 默认为guest
//3、创建连接 Connection
Connection connection = factory.newConnection();
//4、创建Channel
Channel channel = connection.createChannel();
//5、创建交换机
/*
* 创建交换机方法: exchangeDeclare(String exchange, BuiltExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguements)
* 参数:
* 1、exchange: 交换机名称
* 2、type: 交换机类型(枚举)
* DIRECT("direct"): 定向
* FANOUT("fanout"): 扇形(广播),发送消息到每一个与之绑定队列
* TOPIC("topic"): 通配符的方式
* HEADERS("headers"): 参数匹配
* 3、durable: 是否持久化
* 4、autoDelete: 自动删除
* 5、internal: 内部使用。一般为false。
* 6、arguments: 参数
*/
channel.exchangeDeclare("test_direct", BuiltExchangeType.DIRECT, true, false, false, null);
//6、创建队列Queue
/*
* 队列创建方法: queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguements):没有该队列时自动创建该队列
* 参数:
* 1、queue: 队列名称
* 2、durable: 是否持久化,即当mq重启之后,其中的消息还在
* 3、exclusive: 是否独占,即是否只能有一个消费者监听这队列,当Connection关闭时,是否删除队列。
* 4、autoDelete: 是否自动删除。即当没有Consumer时,自动删除队列。
* 5、arguments: 删除的一些参数
*/
channel.queueDeclare("test_direct_queue1", true, false, false, null);
channel.queueDeclare("test_direct_queue2", true, false, false, null);
//7、绑定队列和交换机
/*
* 绑定方法: queueBind(String queue, String exchange, String routingKey)
* 参数:
* 1、queue: 队列名称
* 2、exchange: 交换机名称
* 3、routingKey: 路由键,即绑定规则。如果交换机的类型为fanout,routingKey无论怎么设置都会给绑定的队列发送消息。
*/
//queue1和direct交换机绑定routingKey为error
channel.queueBind("test_direct_queue1", "test_direct", "error");
//queue2和direct交换机绑定routingKey为info,error,warning
channel.queueBind("test_direct_queue2", "test_direct", "info");
channel.queueBind("test_direct_queue2", "test_direct", "error");
channel.queueBind("test_direct_queue2", "test_direct", "warning");
//6、发送消息
/*
* 消息发送方法: basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
* 参数:
* 1、exchange: 交换机名称。简单模式下交换机会使用默认的""
* 2、routingKey: 路由名称。
* 3、props: 配置信息
* 4、body: 发送消息数据
*
*/
channel.basicPublic("test_direct", "error", null, "error rabbitmq".getBytes());
channel.basicPublic("test_direct", "info", null, "info rabbitmq".getBytes());
//7、释放资源
channel.close();
connection.close();
2、Topic交换机模式
通过Topic通配符方式绑定队列,规则是队列1定义自己的routingKey为a.b.c。Topic交换机通过通配符绑定队列,即通过***,#的方式进行匹配,表示不多不少只匹配一个单词,而#表示能够匹配一个或多个单词*。
生产者:
//1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2、设置参数
factory.setHost("192.168.xx.xx");//ip地址 默认值为Localhost(不设置情况下)
factory.setPort(5672);//端口 默认值为5672
factory.setVirtualHost("/");//设置虚拟机 默认为/
factory.setUsername("guest");//用户名 默认为guest
factory.setPassword("guest");//密码 默认为guest
//3、创建连接 Connection
Connection connection = factory.newConnection();
//4、创建Channel
Channel channel = connection.createChannel();
//5、创建交换机
/*
* 创建交换机方法: exchangeDeclare(String exchange, BuiltExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguements)
* 参数:
* 1、exchange: 交换机名称
* 2、type: 交换机类型(枚举)
* DIRECT("direct"): 定向
* FANOUT("fanout"): 扇形(广播),发送消息到每一个与之绑定队列
* TOPIC("topic"): 通配符的方式
* HEADERS("headers"): 参数匹配
* 3、durable: 是否持久化
* 4、autoDelete: 自动删除
* 5、internal: 内部使用。一般为false。
* 6、arguments: 参数
*/
channel.exchangeDeclare("test_topic", BuiltExchangeType.TOPIC, true, false, false, null);
//6、创建队列Queue
/*
*以上是关于一文入门RabbitMQ消息队列的主要内容,如果未能解决你的问题,请参考以下文章