分布式消息通信之RabbitMQ_01
Posted qkxh320
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式消息通信之RabbitMQ_01相关的知识,希望对你有一定的参考价值。
目录
官网
? RabbitMQ
? RabbitMQ Tutorials & 1 Hello World! | 2 Work queues | 3 Publish/Subscribe | 4 Routing | 5 Topics | 6 RPC & RabbitMQ 教程
? Installation Guide
1. RabbitMQ安装
1.1 Window版安装
?a.需要首先安装安装Erlang
以及对应版本的rabbitmq-server
,可在此处Installing on Windows下载;
?本次安装版本为rabbitmq-server-3.7.14.exe
及OTP 21.3 Windows 64-bit
;下载完成后一直next即可;可将erl
和rabbitMQ
加入环境变量;
?b.安装完成后启用rabbitmq_management
C:\\Program Files\\RabbitMQ Server\\rabbitmq_server-3.7.14\\sbin>enable rabbitmq_management
'enable' 不是内部或外部命令,也不是可运行的程序
或批处理文件。
C:\\Program Files\\RabbitMQ Server\\rabbitmq_server-3.7.14\\sbin>rabbitmq-plugins.bat enable rabbitmq_management
Enabling plugins on node [email protected]:
rabbitmq_management
The following plugins have been configured:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to [email protected]
The following plugins have been enabled:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
set 3 plugins.
Offline change; changes will take effect at broker restart.
C:\\Program Files\\RabbitMQ Server\\rabbitmq_server-3.7.14\\sbin>
?c.之后启动RabbitMQ,net start RabbitMQ
C:\\Program Files\\RabbitMQ Server\\rabbitmq_server-3.7.14\\sbin>rabbitmq-server.bat net start RabbitMQ
"WARNING: Using RABBITMQ_ADVANCED_CONFIG_FILE: C:\\Users\\EDDY~1.SHE\\AppData\\Roaming\\RabbitMQ\\advanced.config"
## ##
## ## RabbitMQ 3.7.14. Copyright (C) 2007-2019 Pivotal Software, Inc.
########## Licensed under the MPL. See https://www.rabbitmq.com/
###### ##
########## Logs: C:/Users/EDDY~1.SHE/AppData/Roaming/RabbitMQ/log/RABBIT~1.LOG
C:/Users/EDDY~1.SHE/AppData/Roaming/RabbitMQ/log/[email protected]_upgrade.log
Starting broker...
completed with 3 plugins.
?启动成功后可打开http://localhost:15672/#/测试地址查看,用户名密码默认均为guest
1.2 Linux版安装
2. 典型应用场景
?异步,解耦,削峰
? 跨系统的异步通信;
?系统内的同步变为异步;
?基于pub/sub模型的广播订阅;
?分布式事物的最终一致性解决方案;
3. 基本介绍
3.1 AMQP协议
?AMQP协议,跨语言,跨系统,跨平台的协议,Advanced Message Queuing Protocol 一种高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的消息中间件可跨产品,跨语言的消息通信。Erlang中的实现有 RabbitMQ等。
3.2 RabbitMQ的特性
?RabbmitMQ使用Erlang语言编写,使用Mnesia数据库存储消息。
?1)可靠性(Reliability) RabbitMQ使用一些机制来保证消息传输的可靠性,如持久化,传输确认,发布确认。
?2)灵活的路由(Flexible Routing) 在消息进入队列之前,通过Exchange来路由消息。对应典型的路由功能,RabbmitMQ已经内置了一些Exchange来支持。针对更复杂的路由功能,可以将多个Exchange绑定在一起,也可以通过插件机制实现自己的Exchange。
?3) 消息集群(Clustering) 多个RabbitMQ服务可以组成一个集群,形成一个逻辑Broker。
?4) 高可用队列(Highly Available Queues) 队列可以在集群机器上进行镜像,使得在部分节点出现问题的情况下队列让然可用。
?5) 多种协议(Multi Protocol) RabbitMQ支持多种消息队列协议,如 AMQP, STOMP, MQTT等。
?6) 多语言客户端(Clients) RabbitMQ几乎支持所有常用语言,如 Python, Java, Ruby, php, C#, javascript, Go, Elixir, Objective-C等。
?7) 管理界面(Management UI) RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息,集群中的节点。
?8) 插件机制(Plugin) RabbitMQ提供了许多插件,以实现多方面的扩展,当然也可以编写自己的插件。
3.3 工作模型
?工作流程:
?1. 首先生产者将消息发布至交换机Exchange
,绑定路由关键字Routing Key
(生产者不会讲消息直接放入队列,就算是不声明交换机,也是先发布至默认的交换机)
?2. 消费者创建通道channel
,队列queue
,并指定队列queue
绑定的交换机Exchange
,使用绑定关键字Binding Key
来绑定(一个交换机可绑定多个队列queue,一个交换机绑定一个队列时,也可以使用多个绑定关键字)
?3. 当交换机收到消息,根据生产者发送消息时的路由关键字routing key
和消费者定义的绑定关键字Binding Key
做匹配,匹配合适的话就将消息存入匹配到的队列中去
?4. 消费者从队列中取走消息
?关键字:
?Broker
RabbitMQ的实体服务器,提供一种传输及服务,维护一条从生产者到消费者的传输路线,保证消息按指定的方式传输。
?Exchange
消息交换机,指定消息按照哪种规则路由到哪个队列Queue。
?Queue
消息队列。消息的载体,每个消息都会被投送到一个或多个队列中。
?Binding
绑定。将Exchange和Queue按照某种路由规则绑定。
?Routing Key
路由关键字。和 Binding Key对应,按照关键字匹配通过Exchange找到对应的Queue。
?VHost
虚拟主机。相当于一个小的Broker,小的rabbitMQ服务器,一个Broker可以有一到多个虚拟主机,用于不同用户的权限分离。一个虚拟主机拥有一组 Exchange, Queue和 Binding。
?Producer
消息生产者。将消息投递到Exchange上,一般是独立的应用程序。
?Consumer
消息消费者。消息的接受者,一般是独立的应用程序。
?Connection
Producer,Broker和Consumer之间的TCP长连接。
?Channel
消息通道,也成信道。在客户端的每个连接里可以建立多个Channel,每个Channel代表一个会话认为。在RabbitMQ Java Client中,channel上定义了大量的编程接口。
3.4 三种主要的交换机
?Direct Exchange 直连交换机
?使用直连类型的交换机时,发送消息时的路由关键字Routing Key
和接收消息时的绑定关键字Binding Key
必须完全匹配时,生产者发送的消息才能被对应的消费者所接收到;
?Topic Exchange 主题交换机
?路由关键字Routing Key
和绑定关键字Binding Key
不需要完全匹配,类似于正则表达式符合一定规则匹配到即可接收消息。通配符有两个:
* 代表匹配一个单词;
# 代表匹配零到多个单词;
多个单词之间用.分隔。
?Fanout Exchange 广播交换机
?使用广播类型的交换机时,不需要指定Routing Key
和Binding Key
,生成者将消息发送至该交换机后,所有与之绑定的队列都能收到消息。
4. Java API编程
?Demo
// 声明交换机
com.rabbitmq.client.Channel#exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable)
// 声明队列
com.rabbitmq.client.Channel#queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
// 发布消息
com.rabbitmq.client.Channel#basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
?参数说明
声明交换机
?exchange
: 交换机名称
?type
: 交换机类型,DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers"); 4种,也可以直接使用字符串表示
?durable
: 是否持久化,代表交换机在服务器重启后是否存在
声明队列
?String queue
: 队列名称
?boolean durable
: 是否持久化,代表队列在服务器重启后是否存在
?boolean exclusive
: 是否排他队列,如果要声明一个只有自己可见,其余用户都不可见的队列,可以使用排他队列;该队列有两个特性:1)
只有首次声明它的连接(connection)可见(对connection中的多个channel也可见),2)
会在连接断开时自动删除
?boolean autoDelete
:是否自动删除,如果为true,在没有消费者连接到这个队列时,会自动删除
?Map<String, Object> arguments
:队列的其他属性,例如x-message-ttl、x-expires、x-max-length、x-max-length-bytes、x-dead-letter-exchange、x-dead-letter-routing-key、x-max-priority;
发布消息
?BasicProperties props,
:14个属性
public static class BasicProperties extends com.rabbitmq.client.impl.AMQBasicProperties {
private String contentType;
private String contentEncoding;
private Map<String,Object> headers; // 消息的其他自定义参数
private Integer deliveryMode; // 2持久化,其他 瞬态
private Integer priority; // 消息的优先级
private String correlationId;
private String replyTo; // 回调队列
private String expiration; // TTL 消息过期时间,单位毫秒
private String messageId;
private Date timestamp;
private String type;
private String userId;
private String appId;
private String clusterId;
...
5. 进阶知识
5.1 TTL (Time To Live)
?可以通过设置队列的过期时间或者每条消息的过期时间来实现:
// 队列设置过期时间
Map<String, Object> queueArgs = new HashMap<>();
queueArgs.put("x-message-ttl", 60000);
channel.queueDeclare("MY_TTL_QUEUE", true, false, false, queueArgs);
...
// 消息设置过期时间
AMQP.BasicProperties msgArgs = new AMQP.BasicProperties().builder()
.expiration("5000") // TTL
.deliveryMode(2) // 消息持久化
.contentEncoding("UTF-8")
.build();
channel.basicPublish("", "MY_TTL_QUEUE", msgArgs, "Hello World".getBytes());
5.2 死信队列,死信交换机
?三种情况消息会进入死信交换机DLX(Dead Letter Exchange):
?消息过期;
?消费端设置autoAck为false,不使用自动应答而是使用手工应答,并且手工应答的处理是reject或者Nack,且requeue属性为false,被拒绝的消息不会重新入队的时候
-- 消费端 autoAck为false
com.rabbitmq.client.Channel#basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback);
...
-- requeue 重新入队为false
com.rabbitmq.client.Channel#basicReject(long deliveryTag, boolean requeue);
com.rabbitmq.client.Channel#basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
?队列达到最大长度,先入队的消息会被放至DLX
?可以为普通的队列设置死信交换机,当队列中的消息变为死信后会进入指定的死信交换机,然后定义我们的死信队列来接收死信交换机里的内容:
String myDeadLetterExchange = "MY_DLX_EXCHANGE";
String myDeadLetterQueue = "MY_DLX_QUEUE";
Map<String, Object> commonQueueArgs = new HashMap<>();
commonQueueArgs.put("x-message-ttl", 60000);
commonQueueArgs.put("x-max-length", 4); // 队列最大长度
commonQueueArgs.put("x-expires", "9000"); // TTL过期时间
commonQueueArgs.put("x-dead-letter-exchange", myDeadLetterExchange);
// 声明普通队列
channel.queueDeclare("MY_TTL_QUEUE", true, false, false, commonQueueArgs);
// 声明死信交换机
channel.exchangeDeclare(myDeadLetterExchange, "topic");
// 声明死信队列
channel.queueDeclare(myDeadLetterQueue, false, false, false, null);
// 绑定
channel.queueBind(myDeadLetterQueue, myDeadLetterExchange, "#");
5.3 优先级队列
? 设置队列优先级属性x-max-priority
及消息优先级属性com.rabbitmq.client.AMQP.BasicProperties#priority,当队列中消息堆积时,队列会根据属性优先级大小进行优先消费
5.4 延迟队列
?本身并不支持延迟队列,可以使用TTL加DLX,队列过期时间结合私信队列来实现。
?或者使用插件 rabbitmq-delayed-message-exchange
5.5 RPC
5.6 服务端流控
5.6 消费端限流
6. UI管理界面
7. Spring配置方式集成RabbitMQ
8. Spring Boot集成RabbitMQ
以上是关于分布式消息通信之RabbitMQ_01的主要内容,如果未能解决你的问题,请参考以下文章
消息队列_RabbitMQ-0001.RabbitMQ消息代理/队列服务器快速安全部署?
RabbitMQ01_消息队列概述使用场景劣势架构图与主要概念Docker快速安装Rabbitmq角色分类
RabbitMQ01_消息队列概述使用场景劣势架构图与主要概念Docker快速安装Rabbitmq角色分类