深度分析RabbitMQ在OpenStack中的实现

Posted 苏研大云人

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深度分析RabbitMQ在OpenStack中的实现相关的知识,希望对你有一定的参考价值。


“消息实现分布式服务”并不是陌生的话题,而在当前比较热门的OpenStack云平台下,RabbitMQ这种消息代理是如何设计与实现的呢?本文将带你探究其中的真谛...

深度分析RabbitMQ在OpenStack中的实现

消息实现分布式服务,在当前已经成为一种必然趋势。本文主要分享OpenStack项目中的消息是如何实现的。实现消息队列的框架有很多,当前比较热门的有RabbitMQ、Kafka等,而在OpenStack项目中,目前社区及各界更加倾向于使用RabbitMQ。因此,本文围绕RabbitMQ进行阐述。

深度分析RabbitMQ在OpenStack中的实现


12

RabbitMQ

是基于erl实现的一套框架,OpenStack中使用的消息队列是RabbitMQ的rpc模块。为了保证消息的高可靠性,通常RabbitMQ会以集群的形式呈现。下面简要叙述一下RabbitMQ的基本原理,RabbitMQ作为消息中间件,负责将消息从生产端准确传递到消费端,并且如果有消息确认机制,即生产者需要知道消息的处理状态,此时消费者会通过RabbitMQ将处理完成的消息,回传给生产者。这样的实现,依赖于RabbitMQ中的核心概念,包括Producer、Consumer、Queue、Exchange、Routing Key、Binding Key等等,这些基础概念,在Rabbitmq官网可以查到,在本文不作赘述。



有了上述RabbitMQ的基础,我们现在来介绍一下OpenStack项目中与消息相关的几个重要概念。





12

   Target

指明消息的目标对象,包括消息路由器、命名空间、消息主题、版本等等信息。


12

   Transport

消息通道,负责将消息传递给设定的后端驱动进行消息处理。



在OpenStack项目中,任何需要通过RabbitMQ实现消息交互的服务或者模块,都会在服务启动的过程中,初始化相关的RabbitMQ参数,然后获取rpc的远程服务对象,并启动服务流程。这个过程中,涉及的rpc核心代码如下:


conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
                            topic=target.topic,
                            callback=listener)
conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
                            topic='%s.%s' % (target.topic,
                                             target.server),
                            callback=listener)
conn.declare_fanout_consumer(target.topic, listener)



深度分析RabbitMQ在OpenStack中的实现


深度分析RabbitMQ在OpenStack中的实现

在解释这段代码之前,首先说明一下消息转发的类型:Exchange Type,主要包括fanout,direct,topic。fanout类型的规则,非常简单,它会将发送到该Exchange上的消息,转发到所有与它绑定的Queue中;direct类型的规则,它会将消息转发到该Exchange的bing key和routing key完全匹配的Queue中;topic类型的规则,支持模式匹配,会根据routing key与binding key的匹配度进行消息投递。routing key和binding key是通过类似域名这种方式呈现,如cmss.chinamobile.com,支持的通配符包括‘#’(表示模糊匹配,>=0个字符) 和‘*’(匹配一个单词)




深度分析RabbitMQ在OpenStack中的实现


深度分析RabbitMQ在OpenStack中的实现

这样,上述代码的含义是声明2个topic类型的服务进程及1个fanout类型的服务进程,服务端没有使用direct方式的服务进程。服务开启的消息进程,指明了exchange-name、topic(可以理解为路由键值),这些值会在各服务启动过程中,通过初始化Target对象时进行设定,如nova、nova-conductor;另外,还指定了callback这个回调方法,这是监听到有消息传递过来的处理方法,即服务端的消息处理流程。




深度分析RabbitMQ在OpenStack中的实现


深度分析RabbitMQ在OpenStack中的实现

除此之外,我们还关心队列是如何设计的。在OpenStack的服务消息体系中,topic的类型,队列的名称同Target中指定的topic,在声明消费者的时候,会通过Kombu的entity文件中进行初始化、声明,并同exchange进行绑定;fanout类型,队列的名称在Target中的指定的topic的关键字的后面加上_fanout,如nova-conductor_fanout;而direct类型,队列的名称是reply_+msg_id,其中msg_id是随机生成的。



有了这三类队列后,那么它们究竟是如何使用的呢?前面我们已知,服务在启动过程中,并没用启用direct类型的消费者进程,这样的设计是和RabbitMQ中的队列的特性相关。在RabbitMQ中,direct 和 fanout类型的消息,是不能持久化存储,并且会自动删除;而topic类型的消息,可以根据用户的需求,设定是否需要持久化存储,及自动删除。fanout类型的消息,会发送到所有相关的exchange的队列中,这主要是用在服务状态的同步上,如cinder-volume服务,会根据设定的间隔时间,不断向cinder-scheduler的fanout队列发送服务状态消息,然后scheduler会接收到信号,从而知道cinder-volume的状态。direct类型的消息,主要是用于客户端发送请求,等待同步响应的操作,服务端处理完用户请求后,将结果以direct的方式返回给客户端。


下面,我们从OpenStack实现消息框架的层面,进一步深入分析,如下图一所示。





深度分析RabbitMQ在OpenStack中的实现


深度分析RabbitMQ在OpenStack中的实现

在OpenStack的项目中,每个项目会有多个服务,这些服务之间的交互方式,通常使用消息实现,从而实现分布式处理。每个项目都存在一个service类对象,用于服务的启动、关闭,在服务中设定了消息的关键字topic、服务特征值binary等。在service启动过程中,首先初始化Target对象(初始化消息的基础信息,包括路由名称等),然后通过rpc模块,获取消息处理服务对象MessageHandlingServer,这个对象中包含经过初始化的消息通道transport,rpc调度器dispatcher(负责消息的监听与消息的响应模块调度),及消息的构造器对象executor(负责消息进程的启动、监听、消息的管理等),它还包括了start方法,用于启动服务的消息进程。在启动过程中,初始化消息的上下文序列化对象RequestContextSerializer,并调用消息构造器EventletExecutor实现监听及消息处理进程。当前消息的构造方式有2种,分别是eventlet、blocking。这两种方式的差异是,blocking的模式是阻塞的方式,eventlet采用协程,而OpenStack目前采用的是eventlet的方式。



深度分析RabbitMQ在OpenStack中的实现


深度分析RabbitMQ在OpenStack中的实现


深度分析RabbitMQ在OpenStack中的实现

当消息构造器对象初始化完成,接下去进行消息的监听。主要通过transport对象,将监听的动作传递给后端驱动,在本文指定的是RabbitMQ,因此对应的驱动是RabbitDriver。而当前在OpenStack项目已经实现的消息框架还包括Zmq、Qpid等。RabbitDriver监听的内容,就是一开始本文介绍的服务启动rpc模块的核心模块,主要是启动2个topic类型的服务进程,及1个fanout类型的服务进程。除此之外,服务的消息队列连接池,是服务性能的一个重要体现,在OpenStack的项目中,通过rpc_conn_pool_size来进行设定,默认的连接池大小是30。原则上,这个值越大越好,但是由于机器内存上限,在OpenStack项目中,不同项目需要综合评估,从而设定消息服务连接池的数量。



到目前为止,服务端的消息进程已经介绍完成。下面我们从客户端发送消息角度,进一步介绍OpenStack中消息是如何发送完成的。


深度分析RabbitMQ在OpenStack中的实现


深度分析RabbitMQ在OpenStack中的实现

客户端首先初始化target对象,初始化消息序列化对象(用于识别消息),然后再创建消息通道transport,客户端发出rpc的请求,即调用cast、call方法,通常如下两条语句:


       cctxt.cast(ctxt, 'attach_volume', **kw)
    cctxt.call(ctxt, 'get_console_topic')


深度分析RabbitMQ在OpenStack中的实现


深度分析RabbitMQ在OpenStack中的实现

其中cctxt是指客户端的连接上下文对象,即_CallContext,是指已经初始化target、transport、序列化对象、版本号、连接次数等信息的对象。这个对象的cast、call方法的区别主要体现在是否需要确认消息处理机制上。cast方法是异步消息处理,客户端不需要等到服务端的响应结果;call方法是同步消息处理,客户端会一直阻塞在消息等待过程中。




深度分析RabbitMQ在OpenStack中的实现


深度分析RabbitMQ在OpenStack中的实现

通过上述这2个方法发送的消息,会继续由transport通道,传递给后端消息驱动Rabbit Driver的send进行处理。Send方法的具体过程中,需要提及一点,当请求是同步消息,则此时会创建direct类型的消息通道,主要用于等待发出的请求,而服务端是如何知道消息是同步还是异步的呢?这一点很巧妙,就是在此处发送请求的时候,如果有msg_id,服务端就认为是同步请求,服务端在处理完消息后,及时发送2个返回消息,其中一个是消息的处理结果,另一个是消息处理完成的消息,这样客户端收到响应后,会退出消息阻塞继续执行;而如果没有msg_id的消息,这是一个异步处理的消息,服务端处理完成后,并不会向客户端响应结果。



深度分析RabbitMQ在OpenStack中的实现


通过以上分析,我们已经了解RabbitMQ在OpenStack项目的实现架构,通常同一个项目中不同组件之间,会指定同样的exchange名称,这样在发送fanout类型的消息,处理更加方便;而direct类型的exchange,依据同步请求的消息数量。并且我们能够认识到消息在发送之前,首先服务端得先创建好消息服务进程,否则会导致消息丢失。当服务端的消息进程接收到消息,会通过callback中指定的函数进行处理,最后将处理结果返回给客户端。






以上是关于深度分析RabbitMQ在OpenStack中的实现的主要内容,如果未能解决你的问题,请参考以下文章

OpenStack中的rabbitmq的配置方法

UOS 4.0 - RabbitMQ 参数调优分析

消息队列基础 RabbitMQ与AMQP协议详解——超大规模高可用OpenStack核心技术深入解析系列

RabbitMQ学习记录- 消息队列存储机制源码分析

openstack详解——openstack rabbitmq安装与启动

在OpenStack中运行RabbitMQ的最佳实践