C#RabbitMQ高阶指南
Posted JimCarter
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C#RabbitMQ高阶指南相关的知识,希望对你有一定的参考价值。
续上一篇文章:【C#】RabbitMQ进阶指南
文章目录
引言
本文将会在原理层面,进一步剖析并了解RabbitMQ的实现原理,可以帮助我们在实际工作中透过现象看到本质。比如一个队列的内部存储其实是由5个子队列来流转运作的,队列中的消息可以有4中不同的状态等,通过这些可以明白在使用RabbitMQ时尽量不要有过多的消息堆积,不然会影响整体性能。
1. 存储机制
消息的持久化与否,与消息是否写入磁盘没有必然联系。非持久化消息也可以写入磁盘。
- 持久化消息:在到达队列时就被写入磁盘,并且如果可以话,内存中也可以保存一份,当内存吃紧时才从内存中移除,这样可以提高一定的性能。
- 非持久化消息:一般保存在内存中,当内存吃紧时会被写入到磁盘中。
以上两类消息的落盘处理都是在RabbitMQ的“持久层”中完成。
持久层包含两部分:
- 队列索引(rabbit_queue_index):每个队列都已一个与之对应的索引,负责维护队列中落盘消息的信息,如:存储地点,是否交付给消费者,消费者是否ack等。
- 消息存储(rabbit_msg_store):以键值对的形式存储消息,被所有队列共享,每个RabbitMQ节点有且仅有一个。rabbit_msg_store内部又分为msg_store_transient和msg_store_persistent,分别用来负责非持久化消息才存储和持久化消息的存储。
消息可以存储在“队列索引”中也可以存储在“消息存储”中。比较理想的情况是较小的消息存储在“队列索引”中,较大的消息存储在“消息存储”中,这个大小的界限可以通过queue_index_embed_msgs_below
配置,默认是4KB。这个大小是整个消息的大小,包括了消息体+属性+header。
存储
“队列索引”中以顺序的段文件来进行存储(文件名从0开始累加),后缀为.idx。每个段文件固定有SEGMENT_ENTRY_COUNT
条记录(默认值是16384)。每个“队列索引”从磁盘中读取消息的时候至少要在内存中维护一个段文件。所以设置queue_index_embed_msgs_belowe
时要格外谨慎,及时增加一点点,也有可能引起内存爆炸式增加。
经过“消息存储”处理的所有消息都会已追加的方式写入到文件中,当一个文件的大小超过一定的限制时(file_size_limit
),会关闭这个文件并创建一个新文件,文件后缀都是.rdq且从0开始累加。文件经最小的文件就是最老的文件。在进行消息存储时,RabbitMQ会在ETS(Erlang Term Storage)表中记录这个消息在文件中的位置映射和文件的相关信息。
读取
当读取消息时,先根据消息的msg_id找到对应的rdq的文件,如果文件存在且未被锁定,则直接打开文件并返回指定位置的消息内容。如果文件不存在或者被锁定,则发送请求由“消息存储”进行处理。
删除
消息的删除只是从ETS表中删除指定消息的相关信息,同事更新消息对应的rdq文件的相关信息。当消息删除时,并不会立即把消息从rdq文件中移除,只是将其标记为垃圾数据而已。当一个文件中所有的消息都被标记为垃圾消息时,这个文件就可以删除了。
GC
当检测到前后两个文件中的有效数据可以合并到一个文件中,且所有的垃圾数据的大小和所有文件(至少3个)的数据大小的比值超过设置的阈值(默认0.5)时,才会触发合并操作。
如下图所示,执行合并时首先锁定这两个文件,并先对前面文件的有效数据进行整理,再将后面文件的有效数据写入到前面,同时更新ETS记录,最后删除后面的文件。
1.1 队列的结构
通常队列由rabbit_amqueue_process和backing_queue两部分组成,前者负责协议相关的消息处理,比如:接受生产者发布的消息、向消费者交付消息、处理消息的确认等。后者是消息存储的具体形式和引擎,并提供相关的接口供前者调用。
如果消息投递的目的队列是空的,并且有消费者订阅了这个队列,那么会将消息直接投递给消费者,而不是按部就班的将消息放到队列中。当无法将消息直接投递给消费者时,才需要将消息暂时存到队列中以供下次投递。
当消息存入队列后,不是固定不变的,它会随着系统的负载在队列中不断的流动,消息的状态也在不断发生变化。队列的消息可能处于以下4种状态之一:
- alpha:消息内容(消息体,属性,headers)和消息索引都存储在内存中。
- beta:消息内容在磁盘中,消息索引在内存。
- gamma:消息内容在磁盘中,消息索引在内存和磁盘中都有。(只有持久化的消息才会有的状态)。
- delta:都在磁盘中。
对于持久化的消息,消息内容和消息索引都必须先保存在磁盘上,才会处于上述状态中的某一种。
区分以上4种状态的主要作用就是为了满足不同的内存和CPU要求。比如当内存吃紧时就会将部分alpha状态的消息更改为beta、gamma、delta等状态。
- alpha状态最耗内存,但很少消耗cpu。
- delta状态基本不消耗内存,但是需要更多的CPU和磁盘IO。需要执行两次IO操作才能读取到消息,第一次是读取消息索引(从rabbit_queue_index中),第二次是读取消息内容(从rabbit_msg_store中)。
- beta和gamma都只需要一次IO操作(从rabbit_msg_store中)就可以读取到消息。
对于一般的没有优先级的队列和镜像队列来说,backing_queue的默认实现是rabbit_variable_queue,内部是通过5个子队列Q1、Q2、Delta、Q3、Q4来提现消息的各个状态。
所以RabbitMQ的队列其实是rabbit_amqueue_process+5个子队列组成。
结构参考如下:
其中Q1\\Q4只包含alpha状态的消息,Q2、Q3包含beta和gamma状态的消息,delta只包含delta状态的消息。正常情况下,消息是按照Q1->Q2->Delta->Q3->Q4这样的顺序进行流动,但并非每一条消息都会经历所有的状态,主要是取决于当前系统的负载情况。
当消费者获取消息时,会首先从Q4中获取,如果获取成功则返回,如果Q4为空则从Q3获取,此时后续步骤分为两种:
- 如果Q3也为空则认为此时队列里没有消息。
- Q3不为空,则返回Q3里的消息,然后判断Q3和Delta的长度:
- 如果都为空,则可以认为Q2\\Delta\\Q3\\Q4全部为空,此时将Q1的消息直接转到Q4中,便于下次直接从Q4读取。
- 如果Q3为空但Delta不为空,则需要将Delta里的消息转移到Q3中。
将消息从Delta转移到Q3的过程,是按照所以索引分段读取的,首先读取第一段,然后判断读取的消息个数和Delta中消息的个数是否相等,如果相等则可以判断Delta已空,则直接将Q2和刚读到的消息一并放入到Q3中。如果不相等,则只将此次读取到消息放入到Q3中。
通常在负载正常的情况下,如果消息的消费速度大于等于生产速度,且不需要保证不丢失。则消息极有可能只会处于alpha状态。对于durable
设置为true
的消息,它一定会进入到gamma状态,并且开启publisher confirm机制时,只有到了gamma状态时才会确认该消息已被接受。
在负载很高的情况下,消息不能被很快的消费掉,这些消息就会进入到很深的队列中去,处理每个消息的开销将会很大。应对这种问题一般有三种措施:
- 增加prefetch_count的值,即一次性发送多条消息给消费者。
- 采用multiple ack,降低ack带来的开销。
- 流控,见下节。
1.2 惰性队列(lazy queue)
惰性队列是RabbitMQ 3.6版本引入的概念。该队列会尽可能的将消息存入磁盘中(不管是持久化消息还是非持久化),它的设计目标是为了支持更多的消息存储。减少了内存的消耗,但是会增加IO操作。如果消息是持久化的,那么这些IO操作也是无可避免的。如果消息是非持久化的,重启RabbitMQ服务之后,这些消息一样也会消失。
默认情况下,生产者消息发发送到RabbitMQ之后,队列里的消息会尽可能的存储在内存中,目的是为了更快的将消息投递给消费者。即使是持久化的消息,写入磁盘后也会在内存中保留一份。当内存不足时,RabbitMQ会将内存中的消息写入到磁盘中,而这个操作会耗费比较长的时间,也会阻塞队列的操作,进而无法接受新消息。
队列具有两种模式:default和lazy,默认为lazy。在声明队列的时候可以更改其模式。惰性队列相比普通队列只有很小的内存开销,比如要发送1000w条1kb的消息,普通队列会消耗1.2GB的内存,而惰性队列只消耗1.5MB内存。
根据官方的显示,普通队列如果发送1000w条数据,耗费801秒,平均发送速度是13000/s. 如果是惰性队列,耗时471秒,平均发送速度是24000/s。出现如此之大的性能是因为普通队列会因为内存不足需要将消息换页至磁盘。
2. 内存和磁盘告警
当内存使用超过设定值或者磁盘剩余空间低于设定值时,RabbitMQ就会暂时block客户端的connection,并停止接受从客户端发来的消息,以此避免服务崩溃。此时,客户端与服务端的心跳检测也会失效。
被阻塞的connection的状态有以下两种:
- blocking:这种对应的是并不发送消息的connection,如消费者创建的connection。这种情况下消费者可以继续运作消费消息。
- blocked:这种对应的是消费者会继续发送消息的connection。此时这个connection会禁止发送任何数据。
注意:
在一个集群中,如果一个broker节点的内存或者磁盘受限,都会引起整个集群中所有的connection被阻塞。
当然理想的情况是,当发生受限时,只阻塞生产者,不阻塞消费者。但在AMQP协议中,一个channel上可以同时包含生产者和消费者,一个connection中也可以同时有多个生产者channel和多个消费者的channel,实现起来就没有那么容易。建议的做法是:将生产者和消费者的逻辑分摊到不同的connection中,使其不产生任何交集。 客户端可以添加BlockedListener来监听响应连接的阻塞信息。
2.1 内存告警
当RabbitMQ服务启动时或者执行rabbitmqctl set_vm_memory_high_watermark fraction
时,会计算操作系统内存的大小。默认的阈值为0.4即40%。表示当RabbitMQ使用的内存超过40%时就会产生内存告警并阻塞所有生产者的连接。当告警解除之后,一切恢复正常。如果无法计算操作系统的内存大小,则会输出日志,并将其假设为1GB。
将阈值设置为40%并不意味着RabbitMQ不能使用超过40%的内存,这仅仅限制了RabbitMQ的生产者。在最坏的情况下,Erlang的GC机制会导致两倍的内存消耗,即占用80%内存。
如果阈值设置为0,则表示禁止集群中所有消息的发布。正常情况下此值建议在0.4-0.66之间,最好不要超过0.7.
阈值除了支持百分比,还支持绝对值,默认单位是B(如果没有写具体单位的话)。还支持KB\\MB\\GB单位。
在某个Broker节点触及内存并阻塞生产者之前,它会尝试将队列中的消息换页到磁盘以节省内存空间。持久化和非持久化消息都会被存储到磁盘中,因为持久化的消息本省就在磁盘中,所以这里只是会将其从内存中删除。
默认情况下,使用内存达到阈值的50%时会触发换页操作。
2.2 磁盘告警
默认磁盘阈值为50MB,当磁盘剩余空间低于此值时,会阻塞生产者并停止内存中的换页动作。磁盘剩余空间检测操作默认每10秒执行一次,当剩余空间与阈值越来越接近时,检测频率也会越来越大。当要达到磁盘阈值时,检测频率为每秒10次。
一个相对谨慎的做法是将磁盘阈值的大小设置为操作系统内存的大小。
3. 流控
除了上述内存和磁盘的阈值之外,RabbitMQ又引入了流控来确保服务的稳定。
流控机制时用来避免消息发送速率过快而导致服务器难以支撑的情况。内存和磁盘的告警相当于全局的流控,一旦触发就会阻塞集群中所有的connection,而流控是针对的是单个connection。
3.1 流控原理
Erlang进程之间的通讯不是通过共享内存(binary类型除外),而是通过消息传递。每个进程都是自己的进程邮箱(mailbox)。默认情况下,Erlang并没有对进程邮箱的大小进行限制,所以当有大量消息持续的发送到某个进程上时,会导致该邮箱过大,最终会OOM。
RabbitMQ使用了一种基于信用的算法(credit-baseed algorithm)的流控机制来限制发送消息的速率以解决前面的问题。它会监控多个进程的邮箱,当某个进程负载过高而来不及处理消息时,其邮箱就会堆积消息。当堆积到一定量时就会阻塞而不再接受上游的消息。从而上游也会逐渐堆积消息,当堆积到一定量又会阻塞并不再接受其上游的消息,最后就会使得负责网络数据包接受的进程阻塞而暂停接收新消息。当下游的消息得到处理之后,会通知上游,这样上游就可以继续发送消息。由此,基于信用算法的流控机制最终将消息发送进程的发送速度限制在了消息处理进程的处理能力范围之内。
一个连接触发流控时会处于“flow”状态,也就是说这个connection的状态每秒在blocked和unblocked之间来回切换,处于flow状态的connection和处于running状态的connection并没有很大区别,只是说发送速度限制了。
虽然流控机制针对的是connection,但它并不仅作用于connection,同样作用于信道和队列。从connection到channel,再到队列,最后是消息持久化存储,形成了一个完整的流控链。我们可以根据这个链条找到瓶颈所在:
其中各个进程如下:
- rabbit_sender:connection的处理进程,负责接受、解析AMQP协议数据包。
- rabbit_channel:channel的处理进程,负责处理AMQP协议的各种方法,进行路由解析。
- rabbit_amqqueue_process:队列的处理进程,负责实现队列的所有逻辑。
- rabbit_msg_store:负责实现消息的持久化。
当某个connection处于flow状态,但没有channel处于flow状态时:表示这个connection中有一个或多个channel出现了瓶颈,某些channel可能在大量发送小的非持久化消息导致CPU占用过高。
当某个connection处于flow,且这个connection中也有若干channle处于flow,但是没有任何队列处于flow:表示至少一个队列出现了瓶颈。有可能是因为在发送大量小的持久化消息,导致将消息存入队列引起高CPU占用,或者将消息存入磁盘引起高IO占用。
当某个connection处于flow,这个connection下也有若干channel处于flow,channel下也有若干队列处于flow:表示消息持久化出现了瓶颈。可能是在发送大量大的持久化消息,导致存入磁盘时IO过高。
4. 镜像队列
镜像队列可以解决因单个broker故障导致的服务不可用问题。可以将队列镜像到其它broker节点上。当集群中一个节点失效了,队列能够自动切换到另外一个节点上,保证服务整体可用。
通常用法中,针对每一个配置镜像的队列都包含一个master节点和多个slave节点。如下图:
slave会遵从master发出的指定,从而确保二者状态一致。如果master失效,则资历最老(时间最长)的slave会被提升为master。
publish到镜像队列上的所有消息会被同时发往master和所有的slave。除了publish之外的所有动作,都只会发给master,然后master广播给所有的slave。
如果消费者与slave建立连接并想要消费消息,但是其实质上都是从master获取消息并消费。因为slave会将消费者获取消息的请求转发给master,master会准备好消息,传给slave,最后slave将此消息交给消费者。
此时你可能会有一个疑问,这么多的操作都集中到master上,那master会不会有很大压力?答案是否定的,这里的master和slave都是针对队列而言的,而队列可以均匀的散落在集群的各个broker节点上,从而达到负载均衡。可以参考下图:
从上图我们可以看到,Q1的master节点存在于broker1中,slave节点存在于其他broker中。Q2的master没有存放在broker1中而是在broker2上,slave节点存在于其他broker中。
注意:镜像队列同时支持publish confirm和事务。如果是事务,则事务在全部镜像中执行之后,客户端才会收到ok的消息。同样所有slave都接受到消息之后,生产者才会收到confirm。
镜像队列的更多详细内容,如不同于普通队列的backing_queue结构和GM组播实现方式,请参考:《RabbitMQ实战指南》。
镜像队列的引入可以极大提升RabbitMQ的可用和可靠性,提供了数据冗余备份、避免单点故障引起的问题,强烈建议在生产环境中为每个重要队列都配置镜像。
以上是关于C#RabbitMQ高阶指南的主要内容,如果未能解决你的问题,请参考以下文章