RabbitMQ-AMQP模型详解
Posted 踩踩踩从踩
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ-AMQP模型详解相关的知识,希望对你有一定的参考价值。
前言
本篇文章会解析RabbitMQ的核心概念, 解析 包括connection、exchange、queue、中间代理等来解析amqp模型。
概念
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种 客户端。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
AMQP基本流程
通过broker代理, 客户端需要和中间件mq进行交互,连上来时,比较重要的概念虚拟主机;一个连接创建多个通道,所有的发送接收消息,都是通过通道channel进行交互的 ,exchange根据自身类型进行进行绑定,然后根据规则,发送给对应的消费者。
生产者流程:
- 生产者程序启动时连接到RabbitMQ Broker,建立一个连接(Connection), 开启一个或多个 通道(Channel)
- 生产者通过Channel声明一个交换器,并设置相关属性,比如交换机类型、是否持久化等 3. 生产者通过Channell明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等
- 生产者通过路由键将交换器和队列绑定起来
- 生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等信息 6. 相应的交换器根据接收到的路由键查找相匹配的队列
- 如果找到,则将从生产者发送过来的消息存入相应的队列中
- 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
- 生产者程序关闭时关闭通道、关闭连接
消费者流程:
- 消费者连接到RabbitMQ Broker,建立一个连接(Connection) ? 开启一个或多个通道 (Channel)
- 消费者向RabbitMQ Broker请求消费相应队列中的消息
- RabbitMQ Broker投递相应队列中的消息,消费者接收消息
- 消费者确认(ack)接收到的消息
- RabbitMQ Broker从队列中删除相应己经被确认的消息
- 消费者程序关闭时关闭通道、关闭连接
各个组件
broker
消息中间服务点
对于RabbitMQ来说, 一个RabbitMQ Broker可以简单地看作一个RabbitMQ服 务 节点,或者RabbitMQ服务实例。也可以将一个RabbitMQ Broker看作一台RabbitMQ服务器。
Vhost Host
Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象。 虚拟主机是共享相同的身份认证和加密环境的独立服务器域。
每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换 : 器、绑定和权限机制。
vhost是AMQP概念的基础,必须在连接时指定,RabbitMQ默认的vhost是 /。
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2、设置连接属性
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("mike"); // 不指定则连接到 "/" vhost
如果指定则访问 该类型下面的rabbitmq
- 命令行工具来创建: rabbitmqctl add_vhost qa1 直接连客户端执行
- 管理控制台创建 利用admin创建就行 设置权限等
在服务台有删除等操作
创建虚拟主机过后做限量,最大连接数等等。
Connection
AMQP 0-9-1连接是TCP长连接。连接使用身份验证,可以使用TLS进行保护。当应用程序
不再需要连接到服务器时,才应该关闭AMQP 091连接。
—般是在启动客户端程序时创建连接(一般一个客户端一个连接,对于大流量的应用创建
需要数量的连接),在客户端程序关闭时,关闭连接。
/ / lx 创建连接工厂
ConnectionFactory factory = new ConnectionFactoryO;
〃 2、设置连接属性
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("admin");
// 默认使用 “guest”
factory.setPassword("admin");//默认使用” guest"
// factory.setVirtualHost(virtualHost); // 不 指 定 则 连 接 到 vhost
〃3、从连接工厂获取连接
Connection connection = factory.newConnection("生产者”);//可以给连接命个名
在springboot中
创建连接缓存的连接工厂
默认情况下 单连接的连接工厂。
默认,首先会缓存一个Channel,然后再慢慢增 ,高并发使用场景、channelCacheSize 设置缓存的数量 100(不会上来就定义100个通道)
默认的缓存模式是缓存通道。如果把缓存模式设为CacheMode.CONNECTION,则缓存连接以及连接上创建Channel. connectionCacheSize 属性设置缓存多少个连接
CacheMode.CONNECTION 与 Rabbit Admin ( AmqpAdmin) 不兼容,不会自动创建exchange、queues 等
Channel
Channel:通道,是建立在Connection连接之上的一种轻量级的连接。 大部分的操作是在Channel这个个接口中完成的,包括定义队列的声明queueDeclare、交换机的声明exchangeDeclares队列的绑定queueBind、发布消息basicPublish、 消费消息basicConsume等。
如果把Connection比作一条光纤电缆的话,那么Channel信道就比作成光纤电缆中
的其中一束光纤。一个Connection±可以创建任意数量的Channel。
通道与mq服务器进行交互。
一个线程使用一个channel
如果多个channels的消息流量很大,单连接成为io瓶颈,则开辟多个Connection
也就是在高并发情况下可以使用这种方式
一定不能多个线程共用通道,会造成数据异常的。
尽量少建立连接,降低资源消耗
创建通道时,都会创建编号用以返回使用 channel-1 唯一标识。
RoutingKey
RoutingKey:路由键。生产者将消息发给交换器的时候,一般会指定一个 RoutingKey,用来指定这个消息的路由规则。
RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用
在交换器类型和绑定键(BindingKey)固定的情况下,生产者可以在发送消息给交 换器时,通过指定RoutingKey来决定消息流向哪里。
Exchange
Exchange:交换器,生产者将消息发送到Exchange (交换器,通常也可以 用大写的“X” 来表示),由交换器将消息路由到一个或者多个队列中。如果路由不到,或返回给生产者,或直接丢弃。
交换器的属性 ,声明交换器时,包含了类型、自动删除 内部的 参数
Exchange.DeclareOk exchangeDeclare(String exchange,
String type,
boolean durable, // 交换器是否持久化 避免重启后,要再次创建。 和消息的持久化没关系。
boolean autoDelete, // 当没有队列绑定到它时 是否自动删除
boolean internal, // 是否是 MQ 内部使用的, 我们就不能在客户端中使用。
Map<String, Object> arguments)
参数对应的意思 包括交换器是否持久化 避免重启后,要再次创建。 和消息的持久化没关系 ;
自动删除 mq是否内部使用。 在使用时集中在几个参数上。
对于非持久化,重启过后 exchange被删除
Queue
Queue:队列,是RabbitMQ的内部对象,用于存储消息。 也是有这几个属性,包括持久化, 是否排他 以及是否自动删除 队列参数等。
-
Name 应用程序可以选择队列名称,或者要求代理为它们生成一个名称,最长 255 字节 UTF-8字符。如想要Broker为我们生成队列名,可以在声明创建Queue时传入空字符"",在返回值中可以取得生成的队列名。
-
Durable 是否持久存储,如为false, broker restart就没有了
-
Exclusive 独占,被一个connection独占使用,当connection 关闭时Queue也被删除
-
Auto-delete 是否在Queue的最后一个消费者关闭时自动删除Queue
-
Arguments 可选的被插件和Broker特殊特性使用的参数,如message TTL, queue length limit 等
【注意】以amq.开头的队列名称 是保留给Broker内部使用的,如果用户创建这样的队列会异常。
【注意】队列的持久性,跟消息的持久化也没关系。
Queue 的 TTL TIME TO LIVE
autoDelete 队列空闲一段时间之后再删除。
-
policy方式
rabbitmqctl set_policy expiry ".*" '{"expires":1800000}' --apply-to queues
expiry 策略名称 自定义
".*" 作用目标名称的正则表达式
'{"expires":1800000}' 策略定义 过期时间设置 单位毫秒
--apply-to queues 应用于哪一类实体
代码中声明队列是指定
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-expires", 1800000);
channel.queueDeclare("myqueue", false, false, false, args);
channel.queueDeclare("queue1", false, false, false, args);
Binding
Binding:绑定,RabbitMQ中通过绑定将交换器与队列关联起来,在绑定的时候 一般会指定一个绑定键(BindingKey ) , 这样RabbitMQ就知道如何正确地将消 息路由到队列了。
Exchange 类型
RabbitMQ常用的交换器类型有fanout、directs topic、headers这四种
fanout:扇型交换机
它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中
direct:直连交换机 它会把消息路由到那些BindingKey和 RoutingKey完全匹配的队列中
topic:主题交换机
与direct类似,但它可以通过通配符进行模糊匹配
headers:头交换机
不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的
headers属性进行匹配
headers类型的交换器性能很差,而且也不实用。
Message
消息一般可以包含两个部分:消息体和消息属性
消 息 体 (payload)
在实际应用中,消息体一般是一个带有业务逻辑结构的数据,比如一个JSON字符串。当然可
以进一步对这个消息体进行序列化操作。对RabbitMQ,消息体就是一个字节序列,可以没有 消息体。
消息属性
用来描述这条消息,比如目标交换器的名称、路由键和消息体的描述等等。
在代码中都是作为对象进行传输的。
在message中 会有个消息转换器,将字节码进行转换,可以将字符串实例或者字节数组进行转换
消息属性
这都可以在消息头中添加设置 内容类型。 内容编码方式
获取属性 通过message中去获取属性
在spring中通过header去取
@RabbitListener(queues = "spring-queue8")
public void receive(String in, @Headers Map<String, Object> headers,
@Header(AmqpHeaders.EXPIRATION) String expiration, @Header(AmqpHeaders.REPLY_TO) String replyTo,
@Header(AmqpHeaders.CONTENT_TYPE) String contentType) {
System.out.println(" [x] Received '" + in + "'");
System.out.println(headers);
System.out.println(expiration);
System.out.println(replyTo);
System.out.println(contentType);
}
以上是关于RabbitMQ-AMQP模型详解的主要内容,如果未能解决你的问题,请参考以下文章