openstack nova 源码分析

Posted 胖虎是只mao

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了openstack nova 源码分析相关的知识,希望对你有一定的参考价值。

1. 下载nova源码

从 github 下载 Victoria 版本的 Nova 源码

git clone https://github.com/openstack/nova.git --branch stable/victoria --single-branch

nova/ 文件夹下的目录

accelerator/    # Cyborg 加速器
api/            # Nova API 服务
cmd/            # 各个 Nova 服务的入口程序
compute/        # Nova Compute 服务
conductor/      # Nova Conductor 服务
conf/           # 所有的配置选项
console/        # nova-console 服务
db/             # 封装数据库操作
hacking/        # 编码规范检查
image/          # 封装镜像操作,Glance 接口抽象
keymgr/         # 密钥管理器实现
locale/         # 国际化相关文件
network/        # nova-network 服务
notifications/  # 通知相关功能
objects/        # 封装实体对象的 CURD 操作
pci/            # PCI/SR-IOV 支持
policies/       # 所有 Policy 的默认规则
privsep/        # oslo_privsep 相关
scheduler/      # Nova Scheduler 服务
servicegroup/   # 成员服务(membership),服务组
storage/        # Ceph 存储支持
tests/          # 单元测试
virt/           # 支持的 hypervisor 驱动
volume/         # 封装卷访问接口,Cinder 接口抽象

最新版的源码如下:

nova/ 文件夹下的 python 文件:

__init__.py
availability_zones.py   # 区域设置的工具函数
baserpc.py              # 基础 RPC 客户端/服务端实现
block_device.py         # 块设备映射
cache_utils.py          # oslo_cache 封装
config.py               # 解析命令行参数
context.py              # 贯穿 Nova 的所有请求的上下文
crypto.py               # 包装标准加密数据元素
debugger.py             # pydev 调试
exception.py            # 基础异常类
exception_wrapper.py    # 封装异常类
filters.py              # 基础过滤器
i18n.py                 # 集成 oslo_i18n
loadables.py            # 可加载类
manager.py              # 基础 Manager 类
middleware.py           # 更新 oslo_middleware 的默认配置选项
monkey_patch.py         # eventlet 猴子补丁
policy.py               # 策略引擎
profiler.py             # 调用 OSProfiler
quota.py                # 每个项目的资源配额
rpc.py                  # RPC 操作相关的工具函数
safe_utils.py           # 不会导致循环导入的工具函数
service.py              # 通用节点基类,用于在主机上运行的所有工作者
service_auth.py         # 身份认证插件
test.py                 # 单元测试基础类
utils.py                # 工具函数
version.py              # 版本号管理
weights.py              # 权重插件
wsgi.py                 # 管理 WSGI 应用的服务器类

setup.cfg 配置文件,[entry_points] 小节指定了 nova 各个组件入口

console_scripts =
    nova-api = nova.cmd.api:main
    nova-api-metadata = nova.cmd.api_metadata:main
    nova-api-os-compute = nova.cmd.api_os_compute:main
    nova-compute = nova.cmd.compute:main
    nova-conductor = nova.cmd.conductor:main
    nova-manage = nova.cmd.manage:main
    nova-novncproxy = nova.cmd.novncproxy:main
    nova-policy = nova.cmd.policy:main
    nova-rootwrap = oslo_rootwrap.cmd:main
    nova-rootwrap-daemon = oslo_rootwrap.cmd:daemon
    nova-scheduler = nova.cmd.scheduler:main
    nova-serialproxy = nova.cmd.serialproxy:main
    nova-spicehtml5proxy = nova.cmd.spicehtml5proxy:main
    nova-status = nova.cmd.status:main
wsgi_scripts =
    nova-api-wsgi = nova.api.openstack.compute.wsgi:init_application
    nova-metadata-wsgi = nova.api.metadata.wsgi:init_application

2. nova-api

nova-api 对外提供 RESTful API,没有对内的 RPC 。

nova/api/ 目录结构

__init__.py
auth.py             # 身份认证中间件
compute_req_id.py   # x-compute-request-id 中间件(oslo_middleware)
metadata/           # Metadata API
openstack/          # Nova v2.1 API
validation/         # 请求体验证
wsgi.py             # WSGI 原语(请求、应用、中间件、路由、加载器)

openstack 目录中包含 WSGI 基础架构的代码,一些 WSGI 中间件,以及如何解析请求与分发请求的核心代码。
nova/api/openstack/compute/ 包含 Controller 实现,Resource 对象将 API 映射到相应的 Controller 方法上。

__init__.py
api_version_request.py  # 版本验证
auth.py                 # noauth 中间件
common.py               # 信息查询的工具函数
compute/                # 每个 API 的入口点
identity.py             # 验证项目是否存在
requestlog.py           # 请求日志中间件
urlmap.py               # url 映射
versioned_method.py     # 版本信息
wsgi.py                 # WSGI 相关抽象类
wsgi_app.py             # WSGI 应用程序初始化方法

API 请求路由

nova-api 读取 etc/nova/api-paste.ini 并加载 WSGI 程序,最终 API 入口点都位于 nova.api.openstack.compute

[composite:osapi_compute]
use = call:nova.api.openstack.urlmap:urlmap_factory
/: oscomputeversions                # version API
/v2: oscomputeversion_legacy_v2     # v2 API
/v2.1: oscomputeversion_v2          # v2.1 API
# v21 is an exactly feature match for v2, except it has more stringent
# input validation on the wsgi surface (prevents fuzzing early on the
# API). It also provides new features via API microversions which are
# opt into for clients. Unaware clients will receive the same frozen
# v2 API feature set, but with some relaxed validation
/v2/+: openstack_compute_api_v21_legacy_v2_compatible
/v2.1/+: openstack_compute_api_v21

[composite:openstack_compute_api_v21]
use = call:nova.api.auth:pipeline_factory_v21       # 加载中间件
keystone = cors http_proxy_to_wsgi compute_req_id faultwrap request_log sizelimit osprofiler bees_profiler authtoken keystonecontext osapi_compute_app_v21
# DEPRECATED: The [api]auth_strategy conf option is deprecated and will be
# removed in a subsequent release, whereupon this pipeline will be unreachable.
noauth2 = cors http_proxy_to_wsgi compute_req_id faultwrap request_log sizelimit osprofiler bees_profiler noauth2 osapi_compute_app_v21

[app:osapi_compute_app_v21]
paste.app_factory = nova.api.openstack.compute:APIRouterV21.factory  # 入口

nova/api/openstack/compute/routes.py 中的 APIRouterV21 主要用来完成路由规则的创建,其中 ROUTE_LIST 保存了 URL 与 Controller 之间的映射关系。

APIRouterV21 基于 ROUTE_LIST,使用 Routes 模块作为 URL 映射的工具,将各个模块所实现的 API 对应的 URL 注册到 mapper 中,并把每个资源都封装成 nova.api.openstack.wsgi.Resource 对象,当解析 URL 请求时,可以通过 URL 映射找到 API 对应的 Resource 对象。

# Router 类对 WSGI routes 模块进行了简单的封装
class APIRouterV21(base_wsgi.Router):
    """Routes requests on the OpenStack API to the appropriate controller
    and method. The URL mapping based on the plain list `ROUTE_LIST` is built
    at here.
    """
    def __init__(self, custom_routes=None):
        """:param custom_routes: the additional routes can be added by this
               parameter. This parameter is used to test on some fake routes
               primarily.
        """
        super(APIRouterV21, self).__init__(nova.api.openstack.ProjectMapper())

        if custom_routes is None:
            custom_routes = tuple()

        for path, methods in ROUTE_LIST + custom_routes:
            # NOTE(alex_xu): The variable 'methods' is a dict in normal, since
            # the dict includes all the methods supported in the path. But
            # if the variable 'method' is a string, it means a redirection.
            # For example, the request to the '' will be redirect to the '/' in
            # the Nova API. To indicate that, using the target path instead of
            # a dict. The route entry just writes as "('', '/)".
            if isinstance(methods, six.string_types):
                self.map.redirect(path, methods)
                continue

            for method, controller_info in methods.items():
                # TODO(alex_xu): In the end, I want to create single controller
                # instance instead of create controller instance for each
                # route.
                controller = controller_info[0]()
                action = controller_info[1]
                self.map.create_route(path, method, controller, action)

    @classmethod
    def factory(cls, global_config, **local_config):
        """Simple paste factory, :class:`nova.wsgi.Router` doesn't have one."""
        return cls()

nova/api/wsgi.py 解析 URL 映射,通过 _dispatch 回调,调用 Resource 对象的 call 方法,最终通过请求调用 API 对应的模块中的方法。

# 路由
class Router(object):
    """WSGI middleware that maps incoming requests to WSGI apps."""

    def __init__(self, mapper):
        """Create a router for the given routes.Mapper.

        Each route in `mapper` must specify a 'controller', which is a
        WSGI app to call.  You'll probably want to specify an 'action' as
        well and have your controller be an object that can route
        the request to the action-specific method.

        Examples:
          mapper = routes.Mapper()
          sc = ServerController()

          # Explicit mapping of one route to a controller+action
          mapper.connect(None, '/svrlist', controller=sc, action='list')

          # Actions are all implicitly defined
          mapper.resource('server', 'servers', controller=sc)

          # Pointing to an arbitrary WSGI app.  You can specify the
          # path_info:.* parameter so the target app can be handed just that
          # section of the URL.
          mapper.connect(None, '/v1.0/path_info:.*', controller=BlogApp())

        """
        self.map = mapper
        # 使用 routes 模块关联 mapper 和 _dispatch
        # routes.middleware.RoutesMiddleware 设置 environ 信息
        self._router = routes.middleware.RoutesMiddleware(self._dispatch,
                                                          self.map)

    @webob.dec.wsgify(RequestClass=Request)
    def __call__(self, req):
        """Route the incoming request to a controller based on self.map.

        If no match, return a 404.

        """
        # 根据 mapper 将请求路由到 WSGI 应用(资源)
        # 每个资源会在 __call__ 方法中根据 HTTP 请求的 URL 路由到对应 Controller 上的方法(Action)
        return self._router

    @staticmethod
    @webob.dec.wsgify(RequestClass=Request)
    def _dispatch(req):
        """Dispatch the request to the appropriate controller.

        Called by self._router after matching the incoming request to a route
        and putting the information into req.environ.  Either returns 404
        or the routed WSGI app's response.

        """
        # 根据 HTTP 请求的 environ 信息找到 URL 对应的 Controller
        match = req.environ['wsgiorg.routing_args'][1]
        if not match:
            return webob.exc.HTTPNotFound()
        app = match['controller']
        return app

API 实现

nova/api/openstack/compute/ 目录包含每个 API 对应的 Controller 实现,Resource 对象将请求的 API 映射到相应的 Controller 方法上。

keypairs.py (密钥对管理扩展)为例,公共方法包含 create、delete、show、index,多个实现对应不同的 Microversion(使用 @wsgi.Controller.api_version 装饰器)

@wsgi.expected_errors:API 允许的错误返回码
@validation.query_schema:请求对应的 json schema
@wsgi.response:API 请求正常返回码
@wsgi.action:注册 action

Microversion 用于实现兼容性。
nova/api/openstack/compute/schemas 包含允许的 json schema,表示接受的键值对及其类型。
通过方法接口可以得到 webob.Request 对象,从 Request 对象中可以获取其他请求参数,用于执行对应的操作。
keypairs.py

class KeypairController(wsgi.Controller):

    """Keypair API controller for the OpenStack API."""

    _view_builder_class = keypairs_view.ViewBuilder

    def __init__(self):
        super(KeypairController, self).__init__()
        self.api = compute_api.KeypairAPI()

    @wsgi.Controller.api_version("2.10")
    @wsgi.response(201)
    @wsgi.expected_errors((400, 403, 409))
    @validation.schema(keypairs.create_v210)
    def create(self, req, body):
        ...

    @wsgi.Controller.api_version("2.2", "2.9")  # noqa
    @wsgi.response(201)
    @wsgi.expected_errors((400, 403, 409))
    @validation.schema(keypairs.create_v22)
    def create(self, req, body):  # noqa
        ...

nova-conductor

使用 RPC 的子组件通常包含以下文件:

  • api.py 对 RPC 接口进行封装,类似提供 SDK
  • rpcapi.py 暴露给其他内部组件的 RPC 接口,RPC 客户端
  • manager.py 处理 RPC API 调用

nova-compute 访问数据库的操作都要由 nova-conductor 代理,用 nova/conductor/manager.py 的 ConductorManager 类完成,出于安全性考虑,nova-conductor 和 nova-compute 不能部署在同一服务器上。

nova/objects 定义了 nova object,封装数据库 CURD 操作,每个类对应数据库中的一张表。

nova-scheduler

nova-scheduler 执行调度决策,nova-compute 收集并更新主机数据,实时写入数据库(周期任务)。

nova/scheduler/filters 包含所有的过滤器实现,用于过滤不符合条件的主机;
nova/scheduler/weights 包含所有的权重实现,用于计算权重并排序。

3. 启动流程

nova-api 启动入口 nova.cmd.api:main

def main():
    config.parse_args(sys.argv) # 解析参数
    logging.setup(CONF, "nova") # 设置日志
    objects.register_all()      # 注册 nova object
    gmr_opts.set_defaults(CONF) # 设置 oslo_reports
    if 'osapi_compute' in CONF.enabled_apis:
        # NOTE(mriedem): This is needed for caching the nova-compute service
        # version.
        objects.Service.enable_min_version_cache()
    log = logging.getLogger(__name__)

    # 生成报告的机制 Guru Meditation Report (GMR)
    gmr.TextGuruMeditation.setup_autorun(version, conf=CONF)

    # oslo_service.ProcessLauncher
    launcher = service.process_launcher()
    started = 0
    # 根据 paste-ini 文件创建 WSGI 应用
    for api in CONF.enabled_apis:
        should_use_ssl = api in CONF.enabled_ssl_apis
        try:
            # nova.service.WSGIService 初始化 WSGI 程序
            server = service.WSGIService(api, use_ssl=should_use_ssl)
            # oslo_service.ProcessLauncher 创建子进程启动服务
            launcher.launch_service(server, workers=server.workers or 1)
            started += 1
        except exception.PasteAppNotFound as ex:
            log.warning("%s. ``enabled_apis`` includes bad values. "
                        "Fix to remove this warning.", ex)

    if started == 0:
        log.error('No APIs were started. '
                  'Check the enabled_apis config option.')
        sys.exit(1)

    # 等待子进程终止
    launcher.wait()

nova.service.WSGIService 的初始化函数实例化 nova.wsgi.Server ,启动函数实际调用了 nova.wsgi.Server 的 start 方法。

其中的 self._socket 使用 eventlet.listen 创建,最后使用 utils 中封装的 spawn 函数启动 WSGI 程序

class Server(service.ServiceBase):
    """Server class to manage a WSGI server, serving a WSGI application."""
    
    ...

    def start(self):
        """Start serving a WSGI application.

        :returns: None
        """
        # The server socket object will be closed after server exits,
        # but the underlying file descriptor will remain open, and will
        # give bad file descriptor error. So duplicating the socket object,
        # to keep file descriptor usable.

        dup_socket = self._socket.dup()
        dup_socket.setsockopt(socket.SOL_SOCKET,
                              socket.SO_REUSEADDR, 1)
        # sockets can hang around forever without keepalive
        dup_socket.setsockopt(socket.SOL_SOCKET,
                              socket.SO_KEEPALIVE, 1)

        ...

        self._server = utils.spawn(**wsgi_kwargs)

nova-conductor 启动入口 nova.cmd.conductor:main

def main():
    config.parse_args(sys.argv)
    logging.setup(CONF, "nova")
    objects.register_all()
    gmr_opts.set_defaults(CONF)
    objects.Service.enable_min_version_cache()

    gmr.TextGuruMeditation.setup_autorun(version, conf=CONF)

    # nova.service.Service 实例化 Service 对象
    server = service.Service.create(binary='nova-conductor',
                                    topic=rpcapi.RPC_TOPIC)
    workers = CONF.conductor.workers or processutils.get_worker_count()
    # oslo_service.launch 创建 launcher
    service.serve(server, workers=workers)
    # 调用 launcher.wait 等待子进程终止
    service.wait()

nova.service.Service 初始化函数接受 manager 对象,通过监听消息队列启用 RPC 服务;设置定期任务报告状态,并写入数据库。

  • nova-compute
  • nova-conductor
  • nova-scheduler
    RPC 服务启动时创建 rpc_client 用于发送消息,创建 rpc_server 用于接收消息,分派执行。

1. rpc_client

nova/cmd/conductor.py 实际创建 Service 实例

server = service.Service.create(binary='nova-conductor',
                                topic=rpcapi.RPC_TOPIC)

nova/service.py 初始化函数

# 创建 _driver
self.servicegroup_api = servicegroup.API()

# 动态导入 manager 类
manager_class = importutils.import_class(self.manager_class_name)

if objects_base.NovaObject.indirection_api:
    # 创建 RPCClient
    conductor_api = conductor.API()
    # 等待 nova-conductor 启动
    conductor_api.wait_until_ready(context.get_admin_context())

nova/servicegroup/api.py 创建 _driver

driver_class = _driver_name_class_mapping[CONF.servicegroup_driver]
self._driver = importutils.import_object(driver_class,
                                         *args, **kwargs)

nova/conductor/rpcapi.py 设置 rpc_client

def __init__(self):
    super(ConductorAPI, self).__init__()
    target = messaging.Target(topic=RPC_TOPIC, version='3.0')
    version_cap = self.VERSION_ALIASES.get(CONF.upgrade_levels.conductor,
                                           CONF.upgrade_levels.conductor)
    serializer = objects_base.NovaObjectSerializer()
    # rpc client
    self.client = rpc.get_client(target,
                                 version_cap=version_cap,
                                 serializer=serializer)

nova/baserpc.py 设置 rpc_client

def __init__(self, topic):
    super(BaseAPI, self).__init__()
    target = messaging.Target(topic=topic,
                              namespace=_NAMESPACE,
                              version='1.0')
    version_cap = self.VERSION_ALIASES.get(CONF.upgrade_levels.baseapi,
                                           CONF.upgrade_levels.baseapi)
    self.client = rpc.get_client(target, version_cap=version_cap)

2. rpc_server

nova/cmd/conductor.py 使用 Service 实例启动服务

# oslo_service.launch 创建 launcher
service.serve(server, workers=workers)
# 调用 launcher.wait 等待子进程终止
service.wait()

nova/service.py 实际调用 oslo_service 的 launch 函数,创建绿色线程(greenthread)或进程,最终调用 Service 实例的 start 方法

def serve(server, workers=None):
    global _launcher
    if _launcher:
        raise RuntimeError(_('serve() can only be called once'))

    _launcher = service.launch(CONF, server, workers=workers,
                               restart_method='mutate')

nova/service.py Service 实例的 start 方法创建 rpc_server 和 dispatcher;设置周期任务

# 创建 rpc server 以及 dispatcher
self.rpcserver = rpc.get_server(target, endpoints, serializer)
self.rpcserver.start()

...

if self.periodic_enable:
    if self.periodic_fuzzy_delay:
        initial_delay = random.randint(0, self.periodic_fuzzy_delay)
    else:
        initial_delay = None

    self.tg.add_dynamic_timer(self.periodic_tasks,
                             initial_delay=initial_delay,
                             periodic_interval_max=
                                self.periodic_interval_max)

收到消息后主要由 oslo_messaging 进行解析和处理,核心是 oslo_messaging/rpc/dispatcher.py

incoming 是 AMQP 消息格式

def dispatch(self, incoming):
    """Dispatch an RPC message to the appropriate endpoint method.

    :param incoming: incoming message
    :type incoming: IncomingMessage
    :raises: NoSuchMethod, UnsupportedVersion
    """
    message = incoming.message
    ctxt = incoming.ctxt

    method = message.get('method')
    args = message.get('args', )
    namespace = message.get('namespace')
    version = message.get('version', '1.0')

    # NOTE(danms): This event and watchdog thread are used to send
    # call-monitoring heartbeats for this message while the call
    # is executing if it runs for some time. The thread will wait
    # for the event to be signaled, which we do explicitly below
    # after dispatching the method call.
    completion_event = eventletutils.Event()
    watchdog_thread = threading.Thread(target=self._watchdog,
                                       args=(completion_event, incoming))
    if incoming.client_timeout:
        # NOTE(danms): The client provided a timeout, so we start
        # the watchdog thread. If the client is old or didn't send
        # a timeout, we just never start the watchdog thread.
        watchdog_thread.start()

    found_compatible = False
    for endpoint in self.endpoints:
        target = getattr(endpoint, 'target', None)
        if not target:
            target = self._default_target

        if not (self._is_namespace(target, namespace) and
                self._is_compatible(target, version)):
            continue

        if hasattr(endpoint, method):
            if self.access_policy.is_allowed(endpoint, method):
                try:
                    # 分派,调用函数
                    return self._do_dispatch(endpoint, method, ctxt, args)
                finally:
                    completion_event.set()
                    if incoming.client_timeout:
                        watchdog_thread.join()

        found_compatible = True

    if found_compatible:
        raise NoSuchMethod(method)
    else:
        raise UnsupportedVersion(version, method=method)

oslo_messaging/rpc/dispatcher.py 调用函数

def _do_dispatch(self, endpoint, method, ctxt, args):
    ctxt = self.serializer.deserialize_context(ctxt)
    new_args = dict()
    for argname, arg in args.items():
        new_args[argname] = self.serializer.deserialize_entity(ctxt, arg)
    func = getattr(endpoint, method)
    result = func(ctxt, **new_args以上是关于openstack nova 源码分析的主要内容,如果未能解决你的问题,请参考以下文章

openstack nova 源码分析

Openstack liberty源码分析 之 云主机的启动过程3

Openstack liberty源码分析 之 云主机的启动过程2

nova创建虚拟机流程源码系列分析之二 wsgi模型

Openstack liberty 创建实例快照源码分析2

Openstack liberty中nova-compute服务的启动过程