RabbitMQ - 介绍
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ - 介绍相关的知识,希望对你有一定的参考价值。
参考技术A各组件解释如下:
AMQP 消息的路由中增加了 Exchange 和 Binding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到那个队列。
Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型:
消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为"dog",则只转发 routing key 标记为"dog"的消息,不会转发"dog.puppy",也不会转发"dog.guard"等等。它是完全匹配、单播的模式。
每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。
topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号"#"和符号" "。
"#"匹配0个或多个单词," "匹配不多不少一个单词。
RabbbitMQ 的分发机制非常适合扩展,而且它是专门为并发程序设计的,如果现在 load 加重,那么只需要创建更多的 Consumer 来进行任务处理。
在实际应用中,可能会发生消费者收到 Queue 中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失。为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给 RabbitMQ,RabbitMQ 收到消息回执(Message acknowledgment)后才将该消息从 Queue 中移除;如果 RabbitMQ 没有收到回执并检测到消费者的 RabbitMQ 连接断开,则 RabbitMQ 会将该消息发送给其他消费者(如果存在多个消费者)进行处理。这里不存在 timeout 概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的 RabbitMQ 连接断开。 这里会产生另外一个问题,如果我们的开发人员在处理完业务逻辑后,忘记发送回执给 RabbitMQ,这将会导致严重的 bug——Queue 中堆积的消息会越来越多;消费者重启后会重复消费这些消息并重复执行业务逻辑。
另外 pub message 是没有 ack 的。
如果我们希望即使在 RabbitMQ 服务重启的情况下,也不会丢失消息,我们可以将 Queue 与 Message 都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的 RabbitMQ 消息不会丢失。但依然解决不了小概率丢失事件的发生(比如 RabbitMQ 服务器已经接收到生产者的消息,但还没来得及持久化该消息时 RabbitMQ 服务器就断电了),如果我们需要对这种小概率事件也要管理起来,那么我们要用到事务。由于这里仅为 RabbitMQ 的简单介绍,所以这里将不讲解 RabbitMQ 相关的事务。
要持久化队列 queue 的持久化需要在声明时指定 durable=True;
这里要注意,队列的名字一定要是 Broker 中不存在的,不然不能改变此队列的任何属性.
队列和交换机有一个创建时候指定的标志 durable,durable 的唯一含义就是具有这个标志的队列和交换机会在重启之后重新建立,它不表示说在队列中的消息会在重启后恢复。
消息持久化包括 3 部分
如果 exchange 和 queue 都是持久化的,那么它们之间的 binding 也是持久化的,如果 exchange 和 queue 两者之间有一个持久化,一个非持久化,则不允许建立绑定.
注意:一旦创建了队列和交换机,就不能修改其标志了,例如,创建了一个 non-durable 的队列,然后想把它改变成 durable 的,唯一的办法就是删除这个队列然后重现创建。
你可能也注意到了,分发机制不是那么优雅,默认状态下,RabbitMQ 将第 n 个 Message 分发给第 n 个 Consumer。n 是取余后的,它不管 Consumer 是否还有 unacked Message,只是按照这个默认的机制进行分发.
那么如果有个 Consumer 工作比较重,那么就会导致有的 Consumer 基本没事可做,有的 Consumer 却毫无休息的机会,那么,Rabbit 是如何处理这种问题呢?
RabbitMQ 使用 ProtoBuf 序列化消息,它可作为 RabbitMQ 的 Message 的数据格式进行传输,由于是结构化的数据,这样就极大的方便了 Consumer 的数据高效处理,当然也可以使用 XML,与 XML 相比,ProtoBuf 有以下优势:
RabbitMQ介绍5 - 集群
RabbitMQ内建集群机制,利用Erlang提供的开放电信平台(OTP,Open telecom Platform)通信框架,使得集群很容易进行横向扩展,提高系统吞吐量。这里只讨论集群的概念、原理,关于如何创建集群见官方介绍: http://www.rabbitmq.com/clustering.html
内存节点和磁盘节点
RabbitMQ将队列、exchange、绑定、vhost的配置信息(也就是创建时提供的信息)称为元数据(集群状态时,集群节点地址、节点与其它元数据的关系也是元数据),简单理解--除掉MQ中消息内容本身,其它的都是元数据。在作为单独节点运行时,RabbitMQ将所有的元数据保存在内存中,将标记为可持久化的队列,exchange(和相应的绑定)保存到磁盘。作为集群时,这些元数据可能仅仅在内存中(称为内存节点RAM node),或同时保存在硬盘中(称为磁盘节点disk node),每个集群最少有一个磁盘节点,可有0个或多个内存节点。单一节点的RabbitMQ必须是一个磁盘节点。问题:磁盘节点会保存非持久化队列的信息到磁盘吗?内存节点会保存持久化的消息到磁盘吗?都会。
Metadata writes in RAM and disk nodes.
磁盘节点保证集群重启的时候可以重建持久化的Exchange,Queue等信息,所有这些信息的更新都必须先通过磁盘节点保存。所以磁盘节点太多影响性能,磁盘节点太少不安全。这里的性能指的是创建Exchange,queue的性能,一般我们不需要经常创建、更新这些信息,使用磁盘节点不会有性能影响,但对于RPC之类的应用场景,需要频繁的创建和删除queue,便需要考虑性能问题了。
如果集群中的磁盘节点都崩溃的话,集群可以继续进行路由工作,但是不能进行元数据改动的操作,例如创建Exchange、queue,添加删除节点。
添加删除节点。节点加入或离开集群至少要通知一个磁盘节点,正常的情况是需要全部的磁盘节点在线,并更新全部磁盘节点(如果有多个磁盘节点,只通知了其中的一个,集群会出问题)。内存节点重启时会连接一个硬盘节点,获得集群元数据拷贝(内存节点唯一存储到磁盘的元数据是磁盘节点的地址)。
队列和Exchange在集群中的工作方式
队列只会在集群的一个节点上创建,也就是说队列的消息只会出现在一个节点(发送者和消费者都有可能连接到集群的其它节点,但它们都必须通过创建队列的节点首发消息),当然集群中的所有节点都知道到哪能找到这个队列。当队列所在的这个节点崩溃的时候,这个队列便从集群中消失了,附加在队列上的消费者会丢失订阅(收到异常),发送者的消息自然也无法发送到这个队列,也就是消息丢失了。这时候需要重建队列(捕获异常,重建),但是如果队列是持久化的,无法重建队列,只能等待故障节点恢复。队列只在一个节点创建的策略是出于性能的考虑,这样数据没有冗余,可以提高节点吞吐量,方便横向扩展。
Queue behavior in standalone and cluster configurations
Exchange会在集群的全部节点上创建,因为Exchange本质上是一个名称和绑定的列表(可以理解为一张查询表),在集群每个节点都保留一份并不会有性能开销。每次创建Exchange的时候,这个信息会发布到所有节点上。这样,在有节点崩溃的时候,连接在节点上的终端只需要连接到集群的其它节点,不需要重建Exchange,但是如果使用镜像队列的主备模式,不能保证exchange会恢复,所以,好习惯是终端每次连接RabbitMQ时都创建exchange。
Exchange and queue distribution in a cluster.
镜像队列
为了实现高可用性(记得持久化队列所在节点崩溃的情况么),RabbitMQ从版本2.6.0开始,提供镜像队列功能。设置为镜像的队列会在集群的多个节点上创建,这样消息会有备份。一个主队列Master可以有多个从队列slave,一旦master不可用,最老的slave成为新的master。官方介绍:http://www.rabbitmq.com/ha.html
创建镜像队列
可以在客户端通过声明队列的时候设置参数x-ha-policy来创建镜像,例如下面将队列hello-queue镜像到所有节点。
queue_args = {"x-ha-policy" : "all"}
channel.queue_declare(queue="hello-queue", arguments=queue_args)
如果要指定作为slave的节点,可以像下面这样提供slave列表。
queue_args = {"x-ha-policy" : "nodes", "x-ha-policy-params" : [[email protected]]}
channel.queue_declare(queue="hello-queue", arguments=queue_args)
也可以在服务器端通过设置Policy来配置,见连接 http://www.rabbitmq.com/ha.html#genesis 。一般采用服务端配置,实现简单,方便修改。
规则可以用来配置exchang和queue上的参数(关于Policy的解释: http://www.rabbitmq.com/parameters.html#policies ),类似于客户端创建exchange和queue提供的参数,但是并不完全一样。每个exchange或queue只能应用一个规则,如果有多个规则匹配,则使用优先级最高的规则。每个规则可以配置多个参数,例如federation-upstream-set和ha-mode,通过这两个参数可以控制federation(跨网同步插件)和mirror(镜像机制)。
问题:主节点是哪个?这取决于master选择的策略,通过参数x-queue-master-locator(规则queue_master_locator)可以设置策略。
- Pick the node hosting the minimum number of masters: min-masters
- Pick the node the client that declares the queue is connected to: client-local
- Pick a random node: random
镜像队列和异常处理
信道将消息投递到master队列和所有的slave队列,类似于fanout模式的exchange将消息路由到所有绑定的队列。这样,在使用事务或发送方确认模式时,需要多个成功事务或发送方确认消息。
异常时消费者的处理方式。
- 当slave队列所在的节点崩溃时,连接在这个节点上的消费者会丢失连接,重新连接到集群后可以重新附到master队列上;master队列所在节点上的消费者没有影响。
- 当master队列所在的节点崩溃时,连接在主节点的消费者会丢失连接,从新连接到集群后可以附到新的master队列;连接在slave队列节点上的消费者会收到一个消费者取消通知,这时候需要重新连接集群,附到新的master队列上。但有的AMQP客户端不支持消费者取消通知的含义(支持的客户端会抛出异常),这时候消费者只会傻傻等待,以为队列是空的,而事实上队列已经慢慢被消息塞满了。
- 主节点崩溃时尚未确认的消息会重新投递,因为新的主节点无法确认这些消息是确实消费者没有确认,还是由于主节点崩溃导致丢失了消费者的确认,为安全起见,会回到原位置(2.7.0版本之前会回到队列末尾)重新投递。
从故障中恢复
构建RabbitMQ集群来确保可用性和性能只是保障弹性消息通信基础架构的一半,另一半是编写当节点发生故障时知道如何重连到集群的应用程序。处理到集群的重连有多种策略,比如让程序知道所有的节点地址,出问题后可以尝试其他的节点;或者使用负载均衡,这样应用只需要知道负载均衡地址,而且可以比较均匀的将连接分布到各个节点。这里只讨论负载均衡工作方式。
负载均衡为一组服务器提供单一的访问地址,见网上介绍:https://zh.wikipedia.org/wiki/%E8%B4%9F%E8%BD%BD%E5%9D%87%E8%A1%A1_(%E8%AE%A1%E7%AE%97%E6%9C%BA)
RabbitMQ集群使用负载均衡。客户端通过共同的地址(例如localhost:5670)连接到负载均衡设备(或者负载均衡软件),负载均衡负责将连接分配到集群的具体节点。
Load balancing a RabbitMQ cluster.
连接丢失和故障转移
当集群节点出现问题时,应用程序必须要做出决定:下一个该连向哪里?应用程序需要重新连接到集群,并且重建连接、channel,和exchange、queue,也就是每次重连的时候想象自己连接了一个全新的MQ。C#客户端会自动处理重连(参考连接:http://www.rabbitmq.com/dotnet-api-guide.html#connection-recovery),重连分为两个部分:Connection和Topology。connection部分负责网络连接的重建,topology负责exchange、queue、绑定的重建。
Automatic Recovery From Network Failures
Connection Recovery
RabbitMQ .NET/C# client supports automatic recovery of connections and topology (queues, exchanges, bindings, and consumers). The automatic recovery process for many applications follows the following steps:
- Reconnect
- Restore connection listeners
- Re-open channels
- Restore channel listeners
- Restore channel basic.qos setting, publisher confirms and transaction settings
Topology recovery includes the following actions, performed for every channel
- Re-declare exchanges (except for predefined ones)
- Re-declare queues
- Recover all bindings
- Recover all consumers
To enable automatic connection recovery, set ConnectionFactory.AutomaticRecoveryEnabled to true:
ConnectionFactory factory = new ConnectionFactory();
factory.AutomaticRecoveryEnabled = true;
// connection that will recover automatically
IConnection conn = factory.CreateConnection();If recovery fails due to an exception (e.g. RabbitMQ node is still not reachable), it will be retried after a fixed time interval (default is 5 seconds). The interval can be configured:
ConnectionFactory factory = new ConnectionFactory();
// attempt recovery every 10 seconds
factory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10);Topology Recovery
Topology recovery involves recovery of exchanges, queues, bindings and consumers. It is enabled by default but can be disabled:
ConnectionFactory factory = new ConnectionFactory();
Connection conn = factory.CreateConnection();
factory.AutomaticRecoveryEnabled = true;
factory.TopologyRecoveryEnabled = false;Manual Acknowledgements and Automatic Recovery
When manual acknowledgements are used, it is possible that network connection to RabbitMQ node fails between message delivery and acknowledgement. After connection recovery, RabbitMQ will reset delivery tags on all channels. This means that basic.ack, basic.nack, and basic.reject with old delivery tags will cause a channel exception. To avoid this, RabbitMQ .NET client keeps track of and updates delivery tags to make them monotonically growing between recoveries. IModel.BasicAck, IModel.BasicNack, andIModel.BasicReject then translate adjusted delivery tags into those used by RabbitMQ. Acknowledgements with stale delivery tags will not be sent. Applications that use manual acknowledgements and automatic recovery must be capable of handling redeliveries.
Tip:如果消费者通过EventingBasicConsumer接收消息,使用connection自动重建不会恢复消息监听。
心跳
集群各个节点之间,RabbitMQ服务和客户端之间都需要监测TCP连接,以便网络断开时执行相关处理。操作系统可能需要花点时间才会发现连接出了问题,比如Linux默认需要约11分钟后才会检测到。AMQP 0-9-1提供心跳机制以便更快的检测网络情况,同时,心跳也能防止有些网络设备关掉“idle”的TCP连接。RabbitMQ实现了这个机制( http://www.rabbitmq.com/heartbeats.html )。
心跳超时时间
服务器和客户端连接的时候会协定一个心跳超时时间,如果超过这个时间还没有收到心跳,则认为连接断开。3.5.5之前心跳的默认超时时间是580s,之后是60s。心跳包的发送间隔是timeout/2,也就是丢失2个心跳包便会认为断开。将超时时间设为0表示禁用心跳。
var cf = new ConnectionFactory();
// set the heartbeat timeout to 60 seconds
cf.RequestedHeartbeat = 60;
以上是关于RabbitMQ - 介绍的主要内容,如果未能解决你的问题,请参考以下文章