学习笔记《RabbitMQ实战指南》笔记
Posted 棉花糖灬
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了学习笔记《RabbitMQ实战指南》笔记相关的知识,希望对你有一定的参考价值。
本文摘录总结自《RabbitMQ实战指南》。
一、消息中间件
消息队列中间件(MessageQueueMiddleware,简称为MQ)是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。它一般有两种传递模式:点对点(P2P,Point-to-Point)模式和发布/订阅(Pub/Sub)模式。
消息中间件的作用如下:
-
解耦:消息中间件在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口,这允许你独立地扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束即可;
-
冗余〈存储):有些情况下,处理数据的过程会失败。消息中间件可以把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险;
-
扩展性:因为消息中间件解耦了应用的处理过程,所以提高消息入队和处理的效率是很容易的,只要另外增加处理过程即可,不需要改变代码,也不需要调节参数;
-
削峰:访问量剧增的情况并不常见。如果以能处理这类峰值为标准而投入资源,无疑是巨大的浪费。使用消息中间件能够使关键组件支撑突发访问压力,不会因为突发的超负荷请求而完全崩惯;
-
可恢复性:当系统一部分组件失效时,不会影响到整个系统。消息中间件降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入消息中间件中的消息仍然可以在系统恢复后进行处理;
-
顺序保证:在大多数使用场景下,数据处理的顺序很重要,大部分消息中间件支持一定程度上的顺序性;
-
缓冲:在任何重要的系统中,都会存在需要不同处理时间的元素。消息中间件通过一个缓冲层来帮助任务最高效率地执行,写入消息中间件的处理会尽可能快速。该缓冲层有助于控制和优化数据流经过系统的速度;
-
异步通信:在很多时候应用不想也不需要立即处理消息。消息中间件提供了异步处理机制,允许应用把一些消息放入消息中间件中,但并不立即处理它,在之后需要的时候再慢慢处理。
二、RabbitMQ
RabbitMQ整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。
1. 生产者和消费者
(1) 生产者
Producer:生产者,就是投递消息的一方。
生产者创建消息,然后发布到RabbitMQ中。消息一般可以包含2个部分:消息体和标签(Label)。消息体也可以称之为payload,在实际应用中,消息体一般是一个带有业务逻辑结构的数据,比如一个JSON字符串。消息的标签用来表述这条消息,比如一个交换器的名称和一个路由键。生产者把消息交由RabbitMQ,RabbitMQ之后会根据标签把消息发送给感兴趣的消费者(Consumer)。
(2) 消费者
Consumer:消费者,就是接收消息的一方。
消费者连接到RabbitMQ服务器,并订阅到队列上。当消费者消费一条消息时,只是消费消息的消息体(payload)。在消息路由的过程中,消息的标签会丢弃,存入到队列中的消息只有消息体,消费者也只会消费到消息体,也就不知道消息的生产者是谁,当然消费者也不需要知道。
2. Broker和队列
(1) Broker
Broker:消息中间件的服务节点。
对于RabbitMQ来说,一个RabbitMQ Broker可以简单地看作一个RabbitMQ服务节点,或者RabbitMQ服务实例。大多数情况下也可以将一个RabbitMQ Broker看作一台RabbitMQ服务器。
(2) 队列
Queue:队列,是RabbitMQ的内部对象,用于存储消息。
多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息井处理。
3. 交换器、路由键和绑定键
(1) 交换器
Exchange:交换器。
生产者将消息发送到Exchange(交换器,通常也可以用大写的"X"来表示),由交换器将消息路由到一个或者多个队列中。如果路由不到,要么会返回给生产者,要么直接丢弃。
(2) 路由键
RoutingKey:路由键。
生产者将消息发给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则,而这个RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。
(3) 绑定键
BindingKey:绑定键。
RabbitMQ中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键(BindingKey),这样RabbitMQ就知道如何正确地将消息路由到队列了。
在使用绑定的时候,其中需要的路由键是BindingKey。在发送消息的时候,其中需要的路由键是RoutingKey。一般可以把两者认为是等价的。
4. 交换器类型
RabbitMQ常用的交换器类型有fanout、direct、topic、headers这四种。
(1) fanout
一般翻译为扇出或广播模式。它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。
(2) direct
direct类型的交换器路由规则也很简单,它会把消息路由到那些BindingKey和RoutingKey完全匹配的队列中。
(3) topic
topic类型的交换器在匹配规则上进行了扩展,它与direct类型的交换器相似,也是将消息路由到BindingKey和RoutingKey相匹配的队列中,但这里的匹配规则有些不同,它约定:
-
RoutingKey为一个点号
.
分隔的字符串(被点号.
分隔开的每一段独立的字符串称为一个单词),如"com.rabbitmq.client"; -
令BindingKey和RoutingKey一样也是点号
.
分隔的字符串; -
BindingKey中可以存在两种特殊字符串
#
和*
,用于做模糊匹配,其中#
用于匹配一个单词,而*
用于匹配多规格单词(可以是零个)。
上图中路由键为"com.rabbitmq.client"的消息会同时路由到Queuel和Queue2,路由键为"com.hidden.client"的消息只会路由到Queue2中。
(4) headers
headers类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。在绑定队列和交换器时制定一组键值对,当发送消息到交换器时,RabbitMQ会获取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。
5. RabbitMQ运转流程
(1) 生产者发送消息流程
-
生产者连接到RabbitMQBroker,建立一个连接(Connection),开启一个信道(Channel);
-
生产者声明一个交换器,并设置相关属性,比如交换机类型、是否持久化等;
-
生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等;
-
生产者通过路由键将交换器和队列绑定起来;
-
生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等信息;
-
相应的交换器根据接收到的路由键查找相匹配的队列;
-
如果找到,则将从生产者发送过来的消息存入相应的队列中;
-
如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者;
-
关闭信道;
-
关闭连接。
为了保证消息从队列可靠地达到消费者,RabbitMQ提供了消息确认机制(messageacknowledgement)。消费者在订阅队列时,可以指定autoAck参数,当autoAck等于false时,RabbitMQ会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除)。当autoAck等于true时,RabbitMQ会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。
(2) 消费者接收消息流程
-
消费者连接到RabbitMQBroker,建立一个连接(Connection),开启一个信道(Channel);
-
消费者向RabbitMQ Broker请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作;
-
等待RabbitMQ Broker回应并投递相应队列中的消息;
-
消费者确认(ack)接收到的消息;
-
RabbitMQ从队列中删除相应己经被确认的消息;
-
关闭信道;
-
关闭连接。
RabbitMQ的消费模式分两种:推(Push)模式和拉(Pull)模式。
无论是生产者还是消费者,都需要和RabbitMQ Broker建立连接,这个连接就是一条TCP连接,也就是Connection。一旦TCP连接建立起来,客户端紧接着可以创建一个AMQP信道(Channel),每个信道都会被指派一个唯一的ID。信道是建立在Connection之上的虚拟连接,RabbitMQ处理的每条AMQP指令都是通过信道完成的。选择TCP连接复用,不仅可以减少性能开销,同时也便于管理。
6. AMQP协议
AMQP的模型架构和RabbitMQ的模型架构是一样的,生产者将消息发送给交换器,交换器和队列绑定。当生产者发送消息时所携带的RoutingKey与绑定时的BindingKey相匹配时,消息即被存入相应的队列之中。消费者可以订阅相应的队列来获取消息。
三、RabbitMQ进阶
1. 消息何去何从
mandatory和immediate是channel.basicPublish方法中的两个参数,它们都有当消息传递过程中不可达目的地时将消息返回给生产者的功能。RabbitMQ提供的备份交换器(Altemate Exchange)可以将未能被交换器路由的消息(没有绑定队列或者没有匹配的绑定〉存储起来,而不用返回给客户端。
(1) mandatory参数
当mandatory参数设为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当mandatory参数设置为false时,出现上述情形,则消息直接被丢弃。生产者可以通过调用channel.addReturnListener来添加ReturnListener监昕器实现。
(2) immediate参数
当immediate参数设为true时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者,那么这条消息将不会存入队列中。当与路由键匹配的所有队列都没有消费者时,该消息会通过Basic.Return返回至生产者。
概括来说,mandatory参数告诉服务器至少将该消息路由到一个队列中,否则将消息返回给生产者。immediate参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递;如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。RabbitMQ 3.0版本去掉了immediate参数,因为会影响镜像队列的性能,增加代码复杂性。
(3) 备份交换器
备份交换器,英文名称为Altemate Exchange,简称AE,或者更直白地称之为"备胎交换器"。
生产者在发送消息的时候如果不设置mandatory参数,那么消息在未被路由的情况下将会丢失;如果设置了mandatory参数,那么需要添加ReturnListener的编程逻辑,生产者的代码将变得复杂。如果既不想复杂化生产者的编程逻辑,又不想消息丢失,那么可以使用备份交换器,这样可以将未被路由的消息存储在RabbitMQ中,再在需要的时候去处理这些消息。
2. 过期时间(TTL)
TTL,Time to Live的简称,即过期时间。RabbitMQ可以对消息和队列设置TTL。
目前有两种方法可以设置消息的TTL。第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。第二种方法是对消息本身进行单独设置,每条消息的TTL可以不同。如果两种方法一起使用,则消息的TTL以两者之间较小的那个数值为准。消息在队列中的生存时间一旦超过设置的TTL值时,就会变成"死信"(Dead Message),消费者将无法再收到该消息(这点不是绝对的)。
对于第一种设置队列TTL属性的方法,一旦消息过期,就会从队列中抹去,而在第二种方法中,即使消息过期,也不会马上从队列中抹去,因为需要扫描一遍队列才知道哪些消息是过期的,开销较大,采用当消息被发给消费者时判断是否到期并抹去的方法。
3. 死信队列
DLX,全称为Dead Letter Exchange,可以称之为死信交换器,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列。
消息变成死信一般是由于以下几种情况:
-
消息被拒绝(Basic.Reject/Basic.Nack),井且设置requeue参数为false;
-
消息过期;
-
队列达到最大长度。
4. 延迟队列
延迟队列存储的对象是对应的延迟消息,所谓"延迟消息"是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。
在AMQP协议中,或者RabbitMQ本身没有直接支持延迟队列的功能,但是可以通过前面所介绍的DLX和TTL模拟出延迟队列的功能。假设一个应用中需要将每条消息都设置为10秒的延迟,生产者通过exchange.normal这个交换器将发送的消息存储在queue.normal这个队列中。消费者订阅的并非是queue.normal这个队列,而是queue.dlx这个队列。当消息从queue.normal这个队列中过期之后被存入queue.dlx这个队列中,消费者就恰巧消费到了延迟10秒的这条消息。
5. 优先级队列
优先级队列,顾名思义,具有高优先级的队列具有高的优先权,优先级高的消息具备优先被消费的特权。
如果在消费者的消费速度大于生产者的速度且Broker中没有消息堆积的情况下,对发送的消息设置优先级也就没有什么实际意义。因为生产者刚发送完一条消息就被消费者消费了,那么就相当于Broker中至多只有一条消息,对于单条消息来说优先级是没有什么意义的。
6. 持久化
持久化可以提高RabbitMQ的可靠性,以防在异常情况(重启、关闭、右机等)下的数据丢失。
如果交换器不设置持久化,那么在RabbitMQ服务重启之后,相关的交换器元数据会丢失,不过消息不会丢失,只是不能将消息发送到这个交换器中了。对一个长期使用的交换器来说,建议将其置为持久化的。如果队列不设置持久化,那么在RabbitMQ服务重启之后,相关队列的元数据会丢失,此时数据也会丢失。要确保消息不会丢失,需要将消息和队列都设置为持久化,但对消息设置持久化会严重影响RabbitMQ的性能。
将交换器、队列、消息都设置了持久化之后也不能百分之百保证数据不丢失。
7. 生产者确认
当消息的生产者将消息发送出去之后,生产者如何得知消息到底有没有正确地到达服务器呢?RabbitMQ针对这个问题,提供了两种解决方式:
- 通过事务机制实现;
- 通过发送方确认(publisher confirm)机制实现。
(1) 事务机制
RabbitMQ客户端中与事务机制相关的方法有三个:channel.txSelect、channel.txCommit和channel.txRollback。channel.txSelect用于将当前的信道设置成事务模式,channel.txCommit用于提交事务,channel.txRollback用于事务回滚。
(2) 发送方确认机制
生产者将信道设置成confirm(确认)模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这就使得生产者知晓消息已经正确到达了目的地了。
(3) 两者的区别
事务机制和publisher∞nfirm机制两者是互斥的,不能共存。事务机制在一条消息发送之后会使发送端阻塞,以等待RabbitMQ的回应,之后才能继续发送下一条消息。相比之下,发送方确认机制最大的好处在于它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用程序便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack(Basic.Nack)命令,生产者应用程序同样可以在回调方法中处理该nack命令。
publisherconfmn的优势在于并不一定需要同步确认。所以可以改进一下使用方式,总结有如下两种:
-
批量confirm方法:每发送一批消息后,调用channel.waitForConfirms方法,等待服务器的确认返回;
-
异步confirm方法:提供一个回调方法,服务端确认了一条或者多条消息后客户端会因调这个方法进行处理。
批量confirm在出现返回Basic.Nack或者超时情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且当消息经常丢失时,批量confirm的性能应该是不升反降的。
8. 消费者的要点
对于RabbitMQ消费端来说,还有几点需要注意:
- 消息分发;
- 消息顺序性。
(1) 消息的分发
当RabbitMQ队列拥有多个消费者时,队列收到的消息将以轮询(round-robin)的分发方式发送给消费者。每条消息只会发送给订阅列表里的一个消费者。但没有考虑到每个生产者的性能以及当前任务量等。可以用channel.basicQos方法允许限制信道上的消费者所能保持的最大未确认消息的数量。
(2) 消息的顺序性
消息的顺序性是指消费者消费到的消息和发送者发布的消息的顺序是一致的。
如果生产者使用了事务机制,在发送消息之后遇到异常进行了事务回滚,那么需要重新补偿发送这条消息,如果补偿发送是在另一个线程实现的,那么消息在生产者这个源头就出现了错序。
以上是关于学习笔记《RabbitMQ实战指南》笔记的主要内容,如果未能解决你的问题,请参考以下文章