RabbitMQ快速入门
Posted 汤键.
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ快速入门相关的知识,希望对你有一定的参考价值。
目录
-
MQ概述
- MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器
- 多用于分布式系统之间进行通信
- MQ,消息队列,存储消息的中间件
- 分布式系统通信两种方式:直接远程调用和借助第三方完成间接通信
- 发送方称为生产者,接收方称为消费者
-
优势
-
应用解耦
- 如果我们的应用与应用直连的话,如果一方宕机,可能会导致整个系统不能使用
- 使用MQ就可以使得应用间解耦,提升容错性和可维护性
-
异步提速
- 如下图所示
- 当下一个订单之后,订单系统需要连接库存系统,支付系统以及物流系统
- 而与每个系统交互的时间为300ms,在每得到一个响应时才能与下个系统进行交互
- 最后还要与数据库交互20ms,那么总共就是920ms
- 但是当使用MQ之后,当有用户下单时
- 只需要将订单放到MQ中,然后其它系统异步去获取,不需要等待
- 放一条消息为5ms,然后与数据库交互是20ms,那么用户感知的整个过程就只有25ms
- 其它工作都会异步执行,效率提高了很多
-
削峰填谷
- 如下图所示
- A系统每次只能处理最大1000个请求
- 但是某一瞬间请求数量突然增多,达到了每秒5000个请求
- 这时,如果用了MQ,A系统就不会直接接收到这5000个请求
- 而是自己会去MQ中每秒拿1000个请求去处理,这就叫削峰填谷
-
劣势
-
系统可用性降低
- 系统引入的外部依赖越多,系统稳定性越差
- 一旦MQ宕机,就会对业务造成影响
-
系统复杂度提高
- MQ 的加入大大增加了系统的复杂度
- 以前系统间是同步的远程调用,现在是通过MQ进行异步调用
-
一致性问题
- A系统处理完业务
- 通过MQ给B、C、D三个系统发消息数据,如果B系统、C系统处理成功,D系统处理失败
- 如何保证消息数据处理的一致性?
-
思考
- 既然MQ有优势也有劣势
- 那么使用MQ需要满足什么条件呢?
- 1-生产者不需要从消费者处获得反馈
- 引入消息队列之前的直接调用,其接口的返回值应该为空
- 这才让明明下层的动作还没做,上层却当成动作做完了继续往后走,即所谓异步成为了可能
- 2-容许短暂的不一致性
- 3-确实是用了有效果
- 即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本
-
RabbitMQ
-
AMQP
- 即Advanced Message Queuing Protocol(高级消息队列协议)
- 是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计
- 基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制
- 2006年,AMQP规范发布,类比HTTP
-
RabbitMQ
- 2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布
- RabbitMQ 采用 Erlang 语言开发
- Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛
- RabbitMQ 基础架构如下图
-
RabbitMQ中的相关概念
-
Broker
- 接收和分发消息的应用
- RabbitMQ Server就是 Message Broker
-
Virtual host
- 出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中
- 类似于网络中的namespace概念
- 当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost
- 每个用户在自己的 vhost 创建 exchange/queue 等
-
Connection
- publisher/consumer 和 broker 之间的 TCP连接
-
Channel
- 如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低
- Channel 是在 connection 内部建立的逻辑连接
- 如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯
- AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的
- Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
-
Exchange
- 交换机,message 到达 broker 的第一站
- 根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去
- 常用的类型有:
- direct (point-to-point)–路由定向
- topic (publish-subscribe)–通配符
- fanout (multicast)–广播
-
Queue
- 消息最终被送到这里等待 consumer 取走
-
Binding
- exchange 和 queue 之间的虚拟连接
- binding 中可以包含 routing key
- Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
-
工作模式
- RabbitMQ提供了6种工作模式:
- 简单模式
- work queues
- Publish/Subscribe 发布与订阅模式
- Routing路由模式
- Topics主题模式
- RPC远程调用模式(远程调用,不太算MQ;暂不作介绍)
-
JMS
- JMS 即 Java消息服务(JavaMessage Service)应用程序接口
- 是一个Java平台中关于面向消息中间件的API
- JMS是JavaEE规范中的一种,类比JDBC
- 很多消息中间件都实现了JMS规范
- 例如:ActiveMQ;RabbitMQ官方没有提供JMS的实现包,但是开源社区有
-
快速入门
- 在下图的模型中,有以下概念:
- P:生产者,也就是要发送消息的程序
- C:消费者,消息的接收者,会一直等待消息到来
- queue:消息队列,图中红色部分
- 类似一个邮箱,可以缓存消息;
- 生产者向其中投递消息,消费者从其中取出消息
- 需求: 使用简单模式完成消息传递
- 步骤:
- (1)创建工程(生成者、消费者),分别添加依赖
- (2)编写生产者发送消息
- (3)编写消费者接收消息
- 添加Virtual Hosts
-
生产者Moudle
- pom.xml
-
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>RabbitMQ-producer</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <!--rabbitmq java 客户端--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency> </dependencies> </project>
- 生产者类
-
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @author Tang * @version 2022.2 * @code 2023/1/10 19:07 */ //发送消息 public class Producer_hello public static void main(String[] args) throws Exception //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2.设置参数 factory.setHost("192.168.11.11"); //ip 默认值localhost factory.setPort(5672); //端口 默认值5672 factory.setVirtualHost("/icpc"); //虚拟机 默认值/ factory.setUsername("admin");//用户名 默认guest factory.setPassword("admin");//密码 默认值guest //3.创建连接 Connection Connection connection = factory.newConnection(); //4.创建Channel Channel channel = connection.createChannel(); //5.创建队列Queue /* 【API说明】: queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) 参数: (1)String queue:队列名称。如果当前队列名称不存在,则会创建该队列,否则不会创建 (2)boolean durable:是否持久化。即队列在服务器重启后是否还存在 (3)boolean exclusive:是否独占。只能有一个消费者监听这队列;当Connection关闭时,是否删除队列 (4)boolean autoDelete:是否自动删除。如果为true,至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,队列会自动删除。 (5)Map<String, Object> arguments:附带参数。队列的其他属性,例如:x-message-ttl、x-expires、x-max-length、x-maxlength-bytes、x-dead-letter-exchange、x-dead-letter-routing-key、x-max-priority */ channel.queueDeclare("helloworld_queue",true,false,false,null); //6.发送消息 /* * 【API说明】: * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) * 参数: * (1)String exchange:交换机名称。简单模式下交换机会使用默认的 "" * (2)String routingKey:路由名称 * (3)BasicProperties props:配置信息 * (4)byte[] body:发送消息数据 */ String body = "hello rabbitmq~~~"; channel.basicPublish("","helloworld_queue",null,body.getBytes()); //7.释放资源 channel.close(); connection.close();
-
消费者Moudle
- pom.xml
- 消费者类
-
import com.rabbitmq.client.*; /** * @author Tang * @version 2022.2 * @code 2023/1/10 19:12 */ public class Consumer_hello public static void main(String[] args) throws Exception //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2.设置参数 factory.setHost("192.168.11.11"); //ip 默认值localhost factory.setPort(5672); //端口 默认值5672 factory.setVirtualHost("/icpc"); //虚拟机 默认值/ factory.setUsername("admin");//用户名 默认guest factory.setPassword("admin");//密码 默认值guest //3.创建连接 Connection Connection connection = factory.newConnection(); //4.创建Channel Channel channel = connection.createChannel(); //5.创建队列Queue /* 【API说明】: queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) 参数: (1)String queue:队列名称。如果当前队列名称不存在,则会创建该队列,否则不会创建 (2)boolean durable:是否持久化。即队列在服务器重启后是否还存在 (3)boolean exclusive:是否独占。只能有一个消费者监听这队列;当Connection关闭时,是否删除队列 (4)boolean autoDelete:是否自动删除。如果为true,至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,队列会自动删除。 (5)Map<String, Object> arguments:附带参数。队列的其他属性,例如:x-message-ttl、x-expires、x-max-length、x-maxlength-bytes、x-dead-letter-exchange、x-dead-letter-routing-key、x-max-priority */ channel.queueDeclare("helloworld_queue",true,false,false,null); //6.接收信息 Consumer consumer = new DefaultConsumer(channel) /* * 回调方法,当收到消息后,会自动执行该方法 * void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) * 参数: * (1)String consumerTag:标识 * (2)Envelope envelope:获取一些信息,交换机,路由key... * (3)AMQP.BasicProperties properties:配置信息 * (4)byte[] body:数据 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) System.out.println("consumerTag:" + consumerTag); System.out.println("Exchange:" + envelope.getExchange()); System.out.println("RoutingKey:" + envelope.getRoutingKey()); System.out.println("properties:" + properties); System.out.println("body:" + new String(body)); ; /* * 【API说明】: * basicConsume(String queue, boolean autoAck, Consumer callback) * 参数: * (1)String queue:队列名称 * (2)boolean autoAck:是否自动确认 * (3)Consumer callback:回调对象 */ channel.basicConsume("helloworld_queue", true, consumer); //7.关闭资源(消费者需要监听生产者消息,这里不关闭)
-
测试
- (1)先启动消费者类,消费监听生产者消息
- (2)再启动生产者类生产消息,消费者消费消息,即控制台输出:
以上是关于RabbitMQ快速入门的主要内容,如果未能解决你的问题,请参考以下文章