RabbitMQ研究组件与协议
Posted wu6660563
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ研究组件与协议相关的知识,希望对你有一定的参考价值。
概念
生产者和消费者
Producer: 生产者,就是投递消息的一方。
生产者创建消息,然后发布到RabbitMQ。一般分为两部分:消息体和标签。消息体被称为payload。消息体一般是一个带有业务逻辑结构的数据,比如一个JSON字符串。当然可以进一步对这个消息体进行序列化。消息的标签用来表述消息,比如一个交换器的名称和一个路由键。
Consumer:消费者,就是接收消息的一方。
消费者连接到RabbitMQ服务器,并订阅到队列上。当消费者消费一条消息,只是消费消息的消息体,在路由的时候,消息的标签会丢弃,存入队列的只有消息体。
Broker:消息中间件的服务节点。
一个Broker可以看作是一个服务节点,或者称为服务实例
生产者将业务方数据进行序列化,封装成消息,发送到Broker中。消费者订阅并接收消息,经过可能的解包之后得到原始数据,之后再进行业务处理。
队列
Queue:队列,是RabbitMQ的内部对象,用于存储消息
RabbitMQ中的消息只存储在队列中,多个消费者可以订阅同一个队列,这时的队列消息会被平摊,给多个消费者进行处理,并不是说每个消费者都收到所有的消息处理
交换器、路由键、绑定
Exchange:所有的消息都发送给交换器,由交换器来选择路由。
生产者将消息发送给Exchange中,由于交换器将消息路由到一个或者多个队列中。如果路由不到,或许会返回给生产者,或许直接丢弃
RoutingKey:路由键。
生产者将消息发送给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则,而这个RoutingKey需要与交换器类型和绑定键联合使用才能最终生效。
总结:交换器和绑定键固定的时候,生产者把消息发送到交换器中,由RoutingKey来决定消息流向哪里。
Binding:绑定。RabbitMQ通过绑定将交换器和队列关联起来,在绑定的时候一般会指定一个绑定键(BindingKey)。
一个队列可以有多个绑定键
第一次看到这个的时候可能会有些费解,为什么要分成两个。一个就搞定了,这个就涉及到RabbitMQ的交换器类型了,direct
确实是两个必须一致,但是如果需要匹配一批的时候,topic模式
,后面会详细介绍
交换器类型
RabbitMQ常用的交换器类型有fanout
、direct
、topic
、headers
四种。AMQP协议中还有两种System
和自定义
-
fanout
不处理路由键,。会把所有的发送到该交换器的消息路由到所有与该交换器绑定的队列中。适用于广播的方式,需要将一个消息广播到所有的队列 -
direct
direct类型的交换器最简单,他是将消息路由到BindingKey和RoutingKey完全匹配的队列中。就是路由键=绑定键 -
topic
路由键通过类似正则的方式绑定键。direct是完全匹配BindingKey和RoutingKey,但是这种方式不能满足业务需求。可能需要以某些开头的方式来匹配等
匹配规则:
1)RoutingKey为一个点号分割独立的的单词
2)BindingKey和RoutingKey也一样是点号分隔的字符串
3)BindingKey中可以存在两种特殊字符串*
和#
,用于模糊匹配,其中*
用于匹配一个单词,#
表示零个或者多个单词。如“log.*”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。
4)如果匹配不了,通过设置mandatory=true返回给生产者,mandatory=false表示丢弃,默认为false
- headers
headers类型的交换器不依赖于路由键的匹配来路由消息,而是根据发送的消息内容中的headers
属性来匹配。在绑定队列和交换器时制定一组键值对,当发送消息到交换器的时候,RabbitMQ会获取到该消息的headers,对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers的类型性能很差,不推荐使用
运转流程
重新梳理一下RabbitMQ的生产者发送消息的流程
- 生产者链接到RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)
- 生产者声明一个交换器,并设置相关属性,比如交换机类型、是否持久化,是否自动删除等
- 生产者声明一个队列并设置相关属性,比如是否排他、是否持久化、是否自动删除等
- 生产者通过路由键将交换器和队列绑定起来
- 生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等
- 相应的交换器根据接收到的路由键查找相匹配的队列
- 如果找到,则从生产者发送过来的消息存入相应的队列中
- 如果没有找到,根据mandatory属性来选择丢弃还是回退到生产者
- 关闭信道
- 关闭连接
消费者接收消息的流程:
- 消费者链接到RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)
- 消费者向Broker请求消费响应队列中的消息,可能会设置相应的回调函数
- 等待Broker回应并投递相应队列中的消息,消费者接收消息
- 消费者确认(ack)接收到的消息
- RabbitMQ从队列中删除相应的已经被确认的消息
- 关闭信道
- 关闭连接
AMQP协议
AMQP模型就是:生产者将消息发送给交换器,交换器和队列绑定。当生产者发送消息时携带的RoutingKey和绑定的BindingKey匹配的时候,消息被存入相应的队列。
AMQP协议包含三层:
- Module Layer:位于协议最高层,主要定义一些供客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑
- Session Layer:位于中间层,主要是负责将客户端命令发送给服务端,再将服务端的应答返回给客户端,主要为客户端和服务端之间的通信提供可靠性同步机制和错误处理
- Transport Layer:位于最底层,主要传输二进制数据流,提供帧的处理、信道复用、错误检测、数据表示
当客户端与Broker建立连接的时候,会调用factory.newConnection方法,这个方法会进一步封装成Protocol Header的报文头发送给Broker,以此通知Broker本次交互采用的是AMQP协议,紧接着Broker返回connection.start来建立链接,在连接的过程中涉及 connection.start/.Start-OK、Connection.tune/。Tune-OK、Connection.Open/.Open-OK这六个命令的交互。
当客户端调用connection.createChannel方法准备开启信道的时候,其包装Channel.Open命令发送给Broker,等待Channel.Open-OK命令。
当客户端发送消息的时候,需要调用channel.basicPublish方法,对应的AQMP命令为Basic.Publish,注意这个命令和前面涉及的命令有不同,因为这个命令包含了ContentHeader和ContentBody。ContentHeader里面包含的消息体的属性。
当客户端发送消息需要关闭资源时,涉及Channel.Close/.Close-OK与Connection.Close/.Close-OK
消费者客户端同样需要与Broker建立连接,与生产者客户端一样,协议交互同样涉及Connection.Start/ . Start-Ok 、Connection.Tune/.Tune-Ok 和Connection.
Open/ . Open-Ok
接着在Connection之上建立Channel,和生产者客户端一样,协议涉及Channel.Open/Open-OK
如果在消费之前调用了channel.basicQos(int prefetchCount)的方法来设置消费者客户端最大能够保持的未确认的消息数,那么协议流转会涉及Basic.Qos/.Qos-OK
在真正消费之前,消费者客户端需要向Broker发送Basic.Consume命令将Channel设置为接收模式,之后Broker回执Basic.Consume-OK以告诉消费者客户端准备好消费消息。Broker向消费者客户端推送消息,即Basic.Deliver命令,会携带Content Header 和Content Body。
消费者接收到消息并正确消费之后,向Broker发送确认,即Basic.Ack命令
消费者停止消费的时候,主动关闭连接,这点和生产者一样,涉及Channel.Close/.Close-OK和Connection.Close/.Close-OK
AMQP命令
名称 | 是否包含内容体 | 对应客户端中的方法 | 简要描述 |
---|---|---|---|
Connection.Start | 否 | factory.newConnection | 建立连接相关 |
Connection.Start-OK | 否 | 同上 | 同上 |
Connection.Tune | 否 | 同上 | 同上 |
Connection.Tune-OK | 否 | 同上 | 同上 |
Connection.Open | 否 | 同上 | 同上 |
Connection.Open-OK | 否 | 同上 | 同上 |
Connection.Close | 否 | connection.close | 关闭连接 |
Connection.Close-OK | 否 | 同上 | 同上 |
Channel.Open | 否 | connection.openChannel | 开启信道 |
Channel.Open-OK | 否 | 同上 | 同上 |
Channel.Close | 否 | channel.close | 关闭信道 |
Channel.Close-OK | 否 | 同上 | 同上 |
Exchange.Declare | 否 | channel.exchangeDeclare | 声明交换器 |
Exchange.Declare-OK | 否 | 同上 | 同上 |
Exchange.Delete | 否 | channel.exchangeDelete | 删除交换器 |
Exchange.Delete-OK | 否 | 同上 | 同上 |
Exchange.Bind | 否 | channel.exchangeBind | 交换器和交换器绑定 |
Exchange.Bind-OK | 否 | 同上 | 同上 |
Exchange.Unbind | 否 | channel.exchangeUnbind | 交换器和交换器解绑 |
Exchange.Unbind-OK | 否 | 同上 | 同上 |
Queue.Declare | 否 | channel.queueDeclare | 声明队列 |
Queue.Declare-OK | 否 | 同上 | 同上 |
Queue.Bind | 否 | channel.queueBind | 队列与交换器绑定 |
Queue.Bind-OK | 否 | 同上 | 同上 |
Queue.Purge | 否 | channel.queuePurege | 清除队列中的内容 |
Queue.Purge-OK | 否 | 同上 | 同上 |
Queue.Delete | 否 | channel.queueDelete | 删除队列 |
Queue.Delete-OK | 否 | 同上 | 同上 |
Queue.Unbind | 否 | channel.QueueUnbind | 队列与交换器解绑 |
Queue.Unbind-OK | 否 | 同上 | 同上 |
Basic.Qos | 否 | channel.basicQos | 设置未被确认消费的个数 |
Basic.Qos-OK | 否 | 同上 | 同上 |
Basic-Consume | 否 | channel.basicConsume | 消费消息(推模式) |
Basic-Consume-OK | 否 | 同上 | 同上 |
Basic-Cancel | 否 | channel.basicCancel | 取消 |
Basic.Cancel-OK | 否 | 同上 | 同上 |
Basic.Publish | 是 | channel.basicPublish | 发送消息 |
Basic.Return | 是 | 无 | 未能成功路由的消息返回 |
Basic.Deliver | 是 | 无 | Broker推送消息 |
Basic.Get | 否 | channel.basicGet | 消费消息(拉模式) |
Basic.Get-OK | 是 | 同上 | 同上 |
Basic.Ack | 否 | channel.basicAck | 确认 |
Basic.Reject | 否 | channel.basicReject | 拒绝(单条拒绝) |
Basic.Recover | 否 | channel.basicRecover | 请求Broker重新发送未被确认的消息 |
Basic.Recover-OK | 否 | 同上 | 同上 |
Basic.Nack | 否 | channel.basicNack | 拒绝(可批量拒绝) |
Tx.Select | 否 | channe.txSelect | 开启事务 |
Tx.Select-OK | 否 | 同上 | 同上 |
Tx.Commit | 否 | channel.txCommit | 事务提交 |
Tx.Commit-OK | 否 | 同上 | 同上 |
Tx.Rollback | 否 | channel.txRollback | 事务回滚 |
Confirm.Select | 否 | channel.confirmSelect | 开启发送端确认模式 |
Confirm.Select-OK | 否 | 同上 | 同上 |
以上是关于RabbitMQ研究组件与协议的主要内容,如果未能解决你的问题,请参考以下文章