neutron-server的启动流程

Posted gj4990

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了neutron-server的启动流程相关的知识,希望对你有一定的参考价值。

neutron-server的启动包括RPC-server的创建,RPC-client的创建,WSGI server的创建,因此neutron-server不单单起到与其他组件中的api的功能。本文将RPC相关创建和WSGI server的创建两方面进行代码流程的分析。

查看setup.cfg文件找到neutron-server的代码入口。

neutron-server = neutron.cmd.eventlet.server:main

#/neutron/cmd/eventlet/server/__init__.py
def main():
    server.main()

#/neutron/server/__init__.py
def main():
    # the configuration will be read into the cfg.CONF global data structure
    config.init(sys.argv[1:])
    if not cfg.CONF.config_file:
        sys.exit(_("ERROR: Unable to find configuration file via the default"
                   " search paths (~/.neutron/, ~/, /etc/neutron/, /etc/) and"
                   " the '--config-file' option!"))
    try:
        pool = eventlet.GreenPool()

        neutron_api = service.serve_wsgi(service.NeutronApiService)
        api_thread = pool.spawn(neutron_api.wait)

        try:
            neutron_rpc = service.serve_rpc()
        except NotImplementedError:
            LOG.info(_LI("RPC was already started in parent process by "
                         "plugin."))
        else:
            rpc_thread = pool.spawn(neutron_rpc.wait)

            # api and rpc should die together.  When one dies, kill the other.
            rpc_thread.link(lambda gt: api_thread.kill())
            api_thread.link(lambda gt: rpc_thread.kill())

        pool.waitall()
    except KeyboardInterrupt:
        pass
    except RuntimeError as e:
        sys.exit(_("ERROR: %s") % e)

1. WSGIserver创建

neutron_api = service.serve_wsgi(service.NeutronApiService)

#/neutron/service.py
def serve_wsgi(cls):

    try:
        service = cls.create()
        service.start()
    except Exception:
        with excutils.save_and_reraise_exception():
            LOG.exception(_LE('Unrecoverable error: please check log '
                              'for details.'))

    return service

#/neutron/service.py:NeutronApiService
class NeutronApiService(WsgiService):
    """Class for neutron-api service."""

    @classmethod
    def create(cls, app_name='neutron'):

        # Setup logging early, supplying both the CLI options and the
        # configuration mapping from the config file
        # We only update the conf dict for the verbose and debug
        # flags. Everything else must be set up in the conf file...
        # Log the options used when starting if we're in debug mode...

        config.setup_logging()
        # Dump the initial option values
        cfg.CONF.log_opt_values(LOG, std_logging.DEBUG)
        service = cls(app_name)
        return service

serve_wsgi函数将传入的NeutronApiService对象进行创建,并执行该对象的start函数,创建WSGI sever。

#/neutron/service.py:WsgiService
    def start(self):
        self.wsgi_app = _run_wsgi(self.app_name)

#/neutron/service.py
def _run_wsgi(app_name):
    app = config.load_paste_app(app_name)
    if not app:
        LOG.error(_LE('No known API applications configured.'))
        return
    server = wsgi.Server("Neutron")
    server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host,
                 workers=cfg.CONF.api_workers)
    # Dump all option values here after all options are parsed
    cfg.CONF.log_opt_values(LOG, std_logging.DEBUG)
    LOG.info(_LI("Neutron service started, listening on %(host)s:%(port)s"),
             'host': cfg.CONF.bind_host, 'port': cfg.CONF.bind_port)
    return server

其中,

app = config.load_paste_app(app_name)

为从api-paste.ini文件加载composite为neutron的相关信息。首先创建/neutron/wsgi.py:Server对象,执行Server对象的start函数,将创建WSGI server。

#/neutron/wsgi.py:Server
    def start(self, application, port, host='0.0.0.0', workers=0):
        """Run a WSGI server with the given application."""
        self._host = host
        self._port = port
        backlog = CONF.backlog

        self._socket = self._get_socket(self._host,
                                        self._port,
                                        backlog=backlog)

        self._launch(application, workers)

    def _launch(self, application, workers=0):
        service = WorkerService(self, application)
        if workers < 1:
            # The API service should run in the current process.
            self._server = service
            service.start()
            systemd.notify_once()
        else:
            # dispose the whole pool before os.fork, otherwise there will
            # be shared DB connections in child processes which may cause
            # DB errors.
            if CONF.database.connection:
                api.get_engine().pool.dispose()
            # The API service runs in a number of child processes.
            # Minimize the cost of checking for child exit by extending the
            # wait interval past the default of 0.01s.
            self._server = common_service.ProcessLauncher(wait_interval=1.0)
            self._server.launch_service(service, workers=workers)

其中,start函数中的host即为neutron.conf配置文件中bind_host参数,一般设置为管理网络IP,port为neutron.conf文件中的bind_port参数,一般neutron-server的bind_port参数设置为9696。首先在start函数调用self._get_socket函数创建一个socket去监听本机的9696端口。以下是我的OpenStack环境的neutron-server进程。

[root@jun neutron]# netstat -tnulp | grep 9696

tcp        0      0 192.168.118.1:9696      0.0.0.0:*               LISTEN      35298/python2

然后在_lauch函数中创建WSGI server进程(即创建子进程用于处理neutronclient发来的HTTP请求)。这里WSGI server进程的个数根据neutron.conf配置文件中的api_workers进行指定,一般为系统cpu的个数。

具体的创建WSGI server进程的代码分析可以参看《 keystone WSGI流程》文章。下面分析最终根据api-paste.ini文件加载的可调用的resources。api-paste.ini文件内容如下。

[composite:neutron]

use = egg:Paste#urlmap

/: neutronversions

/v2.0: neutronapi_v2_0

 

[composite:neutronapi_v2_0]

use = call:neutron.auth:pipeline_factory

noauth = request_id catch_errors extensions neutronapiapp_v2_0

keystone = request_id catch_errors authtoken keystonecontext extensions neutronapiapp_v2_0

 

[filter:request_id]

paste.filter_factory = oslo.middleware:RequestId.factory

 

[filter:catch_errors]

paste.filter_factory = oslo.middleware:CatchErrors.factory

 

[filter:keystonecontext]

paste.filter_factory = neutron.auth:NeutronKeystoneContext.factory

 

[filter:authtoken]

paste.filter_factory = keystonemiddleware.auth_token:filter_factory

 

[filter:extensions]

paste.filter_factory = neutron.api.extensions:plugin_aware_extension_middleware_factory

 

[app:neutronversions]

paste.app_factory = neutron.api.versions:Versions.factory

 

[app:neutronapiapp_v2_0]

paste.app_factory = neutron.api.v2.router:APIRouter.factory

composite为neutron时,有两个分支,一个是返回neutronversions的分支,一个是到composite为neutronapi_v2_0去调用相应的资源。我们发送HTTP请求到neutron,一般会调用/v2.0上的资源。比如create_port, update_port等等。查看composite为neutronapi_v2_0的部分,不管是noauth还是keystone,最终都将到app为neutronapiapp_v2_0的分支去加载资源。其中factory函数如下。

#/neutron/api/v2/router.py:APIRouter
class APIRouter(wsgi.Router):

    @classmethod
    def factory(cls, global_config, **local_config):
        return cls(**local_config)

    def __init__(self, **local_config):
        mapper = routes_mapper.Mapper()
        plugin = manager.NeutronManager.get_plugin()
        ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
        ext_mgr.extend_resources("2.0", attributes.RESOURCE_ATTRIBUTE_MAP)

        col_kwargs = dict(collection_actions=COLLECTION_ACTIONS,
                          member_actions=MEMBER_ACTIONS)

        def _map_resource(collection, resource, params, parent=None):
            allow_bulk = cfg.CONF.allow_bulk
            allow_pagination = cfg.CONF.allow_pagination
            allow_sorting = cfg.CONF.allow_sorting
            controller = base.create_resource(
                collection, resource, plugin, params, allow_bulk=allow_bulk,
                parent=parent, allow_pagination=allow_pagination,
                allow_sorting=allow_sorting)
            path_prefix = None
            if parent:
                path_prefix = "/%s/%s_id/%s" % (parent['collection_name'],
                                                  parent['member_name'],
                                                  collection)
            mapper_kwargs = dict(controller=controller,
                                 requirements=REQUIREMENTS,
                                 path_prefix=path_prefix,
                                 **col_kwargs)
            return mapper.collection(collection, resource,
                                     **mapper_kwargs)

        mapper.connect('index', '/', controller=Index(RESOURCES))
        for resource in RESOURCES:
            _map_resource(RESOURCES[resource], resource,
                          attributes.RESOURCE_ATTRIBUTE_MAP.get(
                              RESOURCES[resource], dict()))

        for resource in SUB_RESOURCES:
            _map_resource(SUB_RESOURCES[resource]['collection_name'], resource,
                          attributes.RESOURCE_ATTRIBUTE_MAP.get(
                              SUB_RESOURCES[resource]['collection_name'],
                              dict()),
                          SUB_RESOURCES[resource]['parent'])

        # Certain policy checks require that the extensions are loaded
        # and the RESOURCE_ATTRIBUTE_MAP populated before they can be
        # properly initialized. This can only be claimed with certainty
        # once this point in the code has been reached. In the event
        # that the policies have been initialized before this point,
        # calling reset will cause the next policy check to
        # re-initialize with all of the required data in place.
        policy.reset()
        super(APIRouter, self).__init__(mapper)

1.1 创建plugin(包括core plugin和service plugin)

#/neutron/api/v2/router.py:APIRouter
plugin = manager.NeutronManager.get_plugin()

#/neutron/manager.py:NeutronManager
    @classmethod
    def get_plugin(cls):
        # Return a weakref to minimize gc-preventing references.
        return weakref.proxy(cls.get_instance().plugin)

#/neutron/manager.py:NeutronManager
    @classmethod
    def get_instance(cls):
        # double checked locking
        if not cls.has_instance():
            cls._create_instance()
        return cls._instance

#/neutron/manager.py:NeutronManager
    @classmethod
    @utils.synchronized("manager")
    def _create_instance(cls):
        if not cls.has_instance():
            cls._instance = cls()

#/neutron/manager.py:NeutronManager
class NeutronManager(object):
    """Neutron's Manager class.

    Neutron's Manager class is responsible for parsing a config file and
    instantiating the correct plugin that concretely implements
    neutron_plugin_base class.
    The caller should make sure that NeutronManager is a singleton.
    """
    _instance = None

    def __init__(self, options=None, config_file=None):
        # If no options have been provided, create an empty dict
        if not options:
            options = 

        msg = validate_pre_plugin_load()
        if msg:
            LOG.critical(msg)
            raise Exception(msg)

        # NOTE(jkoelker) Testing for the subclass with the __subclasshook__
        #                breaks tach monitoring. It has been removed
        #                intentionally to allow v2 plugins to be monitored
        #                for performance metrics.
        plugin_provider = cfg.CONF.core_plugin
        LOG.info(_LI("Loading core plugin: %s"), plugin_provider)
        self.plugin = self._get_plugin_instance(CORE_PLUGINS_NAMESPACE,
                                                plugin_provider)
        msg = validate_post_plugin_load()
        if msg:
            LOG.critical(msg)
            raise Exception(msg)

        # core plugin as a part of plugin collection simplifies
        # checking extensions
        # TODO(enikanorov): make core plugin the same as
        # the rest of service plugins
        self.service_plugins = constants.CORE: self.plugin
        self._load_service_plugins()

get_plugin函数返回NeutronManager的plugin(core plugin)的弱引用,那么core plugin是什么对象呢?core plugin和service plugin在NeutronManager的__init__函数创建的。其中core plugin和service plugin分别根据neutron.conf配置文件中的core_plugin参数和service_plugins参数进行创建。

1.1.1 core plugin的创建

我的OpenStack环境的neutron.conf配置文件中的core_plugin参数值为:

core_plugin =neutron.plugins.ml2.plugin.Ml2Plugin

其创建在_get_plugin_instance函数中执行。

#/neutron/manager.py:NeutronManager
    def _get_plugin_instance(self, namespace, plugin_provider):
        try:
            # Try to resolve plugin by name
            mgr = driver.DriverManager(namespace, plugin_provider)
            plugin_class = mgr.driver
        except RuntimeError as e1:
            # fallback to class name
            try:
                plugin_class = importutils.import_class(plugin_provider)
            except ImportError as e2:
                LOG.exception(_LE("Error loading plugin by name, %s"), e1)
                LOG.exception(_LE("Error loading plugin by class, %s"), e2)
                raise ImportError(_("Plugin not found."))
        return plugin_class()

利用stevedore模块创建core_plugin,即core_plugin为/neutron/plugins/ml2/plugin.py:Ml2Plugin对象。

#/neutron/plugins/ml2/plugin.py:Ml2Plugin
    def __init__(self):
        # First load drivers, then initialize DB, then initialize drivers
        self.type_manager = managers.TypeManager()
        self.extension_manager = managers.ExtensionManager()
        self.mechanism_manager = managers.MechanismManager()
        super(Ml2Plugin, self).__init__()
        self.type_manager.initialize()
        self.extension_manager.initialize()
        self.mechanism_manager.initialize()

        self._setup_rpc()

        # REVISIT(rkukura): Use stevedore for these?
        self.network_scheduler = importutils.import_object(
            cfg.CONF.network_scheduler_driver
        )

        self.start_periodic_dhcp_agent_status_check()
        LOG.info(_LI("Modular L2 Plugin initialization complete"))

由于type_manager,extension_manager和mechanism_manager的创建都类似,所以这里我们主要分析type_manager代码流程,其余的简要说明。

#/neutron/plugins/ml2/managers.py:TypeManager
class TypeManager(stevedore.named.NamedExtensionManager):
    """Manage network segment types using drivers."""

    def __init__(self):
        # Mapping from type name to DriverManager
        self.drivers = 

        LOG.info(_LI("Configured type driver names: %s"),
                 cfg.CONF.ml2.type_drivers)
        super(TypeManager, self).__init__('neutron.ml2.type_drivers',
                                          cfg.CONF.ml2.type_drivers,
                                          invoke_on_load=True)
        LOG.info(_LI("Loaded type driver names: %s"), self.names())
        self._register_types()
        self._check_tenant_network_types(cfg.CONF.ml2.tenant_network_types)

这里,TypeManager对象中的drivers即为type driver对象。drivers根据/etc/neutron/plugins/ml2/ml2_conf.ini配置文件中的type_drivers参数去构造。在我的OpenStack环境中type_drivers参数值如下。

type_drivers = vlan,flat

两个driver对象的创建在TypeManager类的父类中完成。且创建两个对象被/stevedore/extension.py:Extension包裹。

#/neutron/plugins/ml2/managers.py:TypeManager
    def _register_types(self):
        for ext in self:
            network_type = ext.obj.get_type()
            if network_type in self.drivers:
                LOG.error(_LE("Type driver '%(new_driver)s' ignored because"
                              " type driver '%(old_driver)s' is already"
                              " registered for type '%(type)s'"),
                          'new_driver': ext.name,
                           'old_driver': self.drivers[network_type].name,
                           'type': network_type)
            else:
                self.drivers[network_type] = ext
        LOG.info(_LI("Registered types: %s"), self.drivers.keys())

这里,将创建完成的typedriver对象register到self.drivers字典中,如下。

self.drivers: 'flat': <stevedore.extension.Extension object at 0x428cad0>, 'vlan': <stevedore.extension.Extension object at 0x4299150>

如果调用实际的flat或vlan对于的type driver的函数,需要访问/stevedore/extension.py:Extension对象的obj成员变量,obj成员变量即为实际的type driver的对象。如下

‘flat’: <neutron.plugins.ml2.drivers.type_flat.FlatTypeDriver object at 0x46cfb50>

‘vlan’: <neutron.plugins.ml2.drivers.type_vlan.VlanTypeDriver object at 0x46a9bd0>

在register typedriver后,check tenant的network type是否在我们所register的type driver中,如果没有,则raise异常。其中tenant的network type是由/etc/neutron/plugins/ml2/ml2_conf.ini配置文件中的tenant_network_types参数设置。我的OpenStack环境的tenant_network_types参数配置如下。

tenant_network_types = vlan

#/neutron/plugins/ml2/managers.py:TypeManager
    def _check_tenant_network_types(self, types):
        self.tenant_network_types = []
        for network_type in types:
            if network_type in self.drivers:
                self.tenant_network_types.append(network_type)
            else:
                LOG.error(_LE("No type driver for tenant network_type: %s. "
                              "Service terminated!"), network_type)
                raise SystemExit(1)
        LOG.info(_LI("Tenant network_types: %s"), self.tenant_network_types)

所以执行_check_tenant_network_types函数时,tenant network type校验成功。

mechanism_manager的创建也是类似的,它管理的mechanism driver。其根据/etc/neutron/plugins/ml2/ml2_conf.ini配置文件中的mechanism_drivers参数去构造mechanism driver对象。我的OpenStack环境mechanism_drivers配置参数如下。

mechanism_drivers =linuxbridge

构建的mechanismdriver对象为:

<neutron.plugins.ml2.drivers.mech_linuxbridge.LinuxbridgeMechanismDriver object at 0x3f42b90>

extension_manager的创建也类似。

在type_manager,extension_manager和mechanism_manager创建完成后,执行initialize函数对所创建的driver进行初始化。比如type_manager的initialize函数如下。

#/neutron/plugins/ml2/managers.py:TypeManager
    def initialize(self):
        for network_type, driver in self.drivers.iteritems():
            LOG.info(_LI("Initializing driver for type '%s'"), network_type)
            driver.obj.initialize()

从上可以看出,最终会调用type_manager所管理的driver的initialize函数。

下面分析创建rpc-client的代码流程。即执行以下代码。

self._setup_rpc()

 

#/neutron/plugins/ml2/managers.py:TypeManager
    def _setup_rpc(self):
        self.notifier = rpc.AgentNotifierApi(topics.AGENT)
        self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
            dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
        )

这里topics.AGENT=’q-agent-notifier’。

#/neutron/plugins/ml2/rpc.py:AgentNotifierApi
class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
                       sg_rpc.SecurityGroupAgentRpcApiMixin,
                       type_tunnel.TunnelAgentRpcApiMixin):
    """Agent side of the openvswitch rpc API.

    API version history:
        1.0 - Initial version.
        1.1 - Added get_active_networks_info, create_dhcp_port,
              update_dhcp_port, and removed get_dhcp_port methods.

    """

    def __init__(self, topic):
        self.topic = topic
        self.topic_network_delete = topics.get_topic_name(topic,
                                                          topics.NETWORK,
                                                          topics.DELETE)
        self.topic_port_update = topics.get_topic_name(topic,
                                                       topics.PORT,
                                                       topics.UPDATE)
        self.topic_port_delete = topics.get_topic_name(topic,
                                                       topics.PORT,
                                                       topics.DELETE)

        target = oslo_messaging.Target(topic=topic, version='1.0')
        self.client = n_rpc.get_client(target)

#/neutron/common/topics.py
def get_topic_name(prefix, table, operation, host=None):
    """Create a topic name.

    The topic name needs to be synced between the agent and the
    plugin. The plugin will send a fanout message to all of the
    listening agents so that the agents in turn can perform their
    updates accordingly.

    :param prefix: Common prefix for the plugin/agent message queues.
    :param table: The table in question (NETWORK, SUBNET, PORT).
    :param operation: The operation that invokes notification (CREATE,
                      DELETE, UPDATE)
    :param host: Add host to the topic
    :returns: The topic name.
    """
    if host:
        return '%s-%s-%s.%s' % (prefix, table, operation, host)
    return '%s-%s-%s' % (prefix, table, operation)

其中/neutron/common/topics.py文件中定义了一些常量如下。

#/neutron/common/topics.py

NETWORK = 'network'

SUBNET = 'subnet'

PORT = 'port'

SECURITY_GROUP = 'security_group'

L2POPULATION = 'l2population'

DVR = 'dvr'

 

CREATE = 'create'

DELETE = 'delete'

UPDATE = 'update'

 

AGENT = 'q-agent-notifier'

PLUGIN = 'q-plugin'

L3PLUGIN = 'q-l3-plugin'

DHCP = 'q-dhcp-notifer'

FIREWALL_PLUGIN = 'q-firewall-plugin'

METERING_PLUGIN = 'q-metering-plugin'

LOADBALANCER_PLUGIN = 'n-lbaas-plugin'

 

L3_AGENT = 'l3_agent'

DHCP_AGENT = 'dhcp_agent'

METERING_AGENT = 'metering_agent'

LOADBALANCER_AGENT = 'n-lbaas_agent'

所以AgentNotifierApi对象创建了通过AMQP到topic为’q-agent-notifier’的RPC-server的绑定。即AgentNotifierApi可远程调用执行topic为’q-agent-notifier’的RPC-server上的函数。不过这里,并没有纯粹的调用topic为’q-agent-notifier’的RPC-server函数。而是通过prepare函数修改topic构建新的Target来调用3种另外的topic的RPC-server上的函数。这3种topic为(根据get_topic_name函数构造出来的):

‘q-agent-notifier-network-delete’

‘q-agent-notifier-port-update’

‘q-agent-notifier-port-delete’

那么对应的这3个topic的RPC-server是哪个服务构建的呢?通过AgentNotifierApi类的解释可知,应该是neutron-openvswitch-agent服务构建的。如下

#/neutron/plugins/opensvswitch/agent/ovs_neutron_agent.py:OVSNeutronAgent
    def setup_rpc(self):
        self.agent_id = 'ovs-agent-%s' % cfg.CONF.host
        self.topic = topics.AGENT
        self.plugin_rpc = OVSPluginApi(topics.PLUGIN)
        self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
        self.dvr_plugin_rpc = dvr_rpc.DVRServerRpcApi(topics.PLUGIN)
        self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)

        # RPC network init
        self.context = context.get_admin_context_without_session()
        # Handle updates from service
        self.endpoints = [self]
        # Define the listening consumers for the agent
        consumers = [[topics.PORT, topics.UPDATE],
                     [topics.PORT, topics.DELETE],
                     [topics.NETWORK, topics.DELETE],
                     [constants.TUNNEL, topics.UPDATE],
                     [constants.TUNNEL, topics.DELETE],
                     [topics.SECURITY_GROUP, topics.UPDATE],
                     [topics.DVR, topics.UPDATE]]
        if self.l2_pop:
            consumers.append([topics.L2POPULATION,
                              topics.UPDATE, cfg.CONF.host])
        self.connection = agent_rpc.create_consumers(self.endpoints,
                                                     self.topic,
                                                     consumers,
                                                     start_listening=False)

#/neutron/agent/rpc.py
def create_consumers(endpoints, prefix, topic_details, start_listening=True):
    """Create agent RPC consumers.

    :param endpoints: The list of endpoints to process the incoming messages.
    :param prefix: Common prefix for the plugin/agent message queues.
    :param topic_details: A list of topics. Each topic has a name, an
                          operation, and an optional host param keying the
                          subscription to topic.host for plugin calls.
    :param start_listening: if True, it starts the processing loop

    :returns: A common Connection.
    """

    connection = n_rpc.create_connection(new=True)
    for details in topic_details:
        topic, operation, node_name = itertools.islice(
            itertools.chain(details, [None]), 3)

        topic_name = topics.get_topic_name(prefix, topic, operation)
        connection.create_consumer(topic_name, endpoints, fanout=True)
        if node_name:
            node_topic_name = '%s.%s' % (topic_name, node_name)
            connection.create_consumer(node_topic_name,
                                       endpoints,
                                       fanout=False)
    if start_listening:
        connection.consume_in_threads()
    return connection

从上面的代码可以看出,neutron-openvswitch-agent构建的RPC-server将会最多有8个topic。即

‘q-agent-notifier-port-update’

‘q-agent-notifier-port-delete’

‘q-agent-notifier-network-delete’

‘q-agent-notifier-tunnel-update’

‘q-agent-notifier-tunnel-delete’

‘q-agent-notifier-security_group-update’

‘q-agent-notifier-dvr-update’

‘q-agent-notifier-l2population–update.jun2’

从上面8个topic可以看出,包括AgentNotifierApi类所构建RPC-client所需的topic。

不过,我的OpenStack环境采用的linuxbridge的mechanism driver,而在neutron-linuxbridge-agent服务启动时,创建RPC-server时,没有topic’ q-agent-notifier-port-delete’Target,所以采用的linuxbridgemechanism driver时,AgentNotifierApi对象不能调用port_delete函数。如下

#/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py:LinuxBridgeNeutronAgentRPC
    def setup_rpc(self, physical_interfaces):
        if physical_interfaces:
            mac = utils.get_interface_mac(physical_interfaces[0])
        else:
            devices = ip_lib.IPWrapper().get_devices(True)
            if devices:
                mac = utils.get_interface_mac(devices[0].name)
            else:
                LOG.error(_LE("Unable to obtain MAC address for unique ID. "
                              "Agent terminated!"))
                exit(1)
        self.agent_id = '%s%s' % ('lb', (mac.replace(":", "")))
        LOG.info(_LI("RPC agent_id: %s"), self.agent_id)

        self.topic = topics.AGENT
        self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
        # RPC network init
        # Handle updates from service
        self.endpoints = [LinuxBridgeRpcCallbacks(self.context, self,
                                                  self.sg_agent)]
        # Define the listening consumers for the agent
        consumers = [[topics.PORT, topics.UPDATE],
                     [topics.NETWORK, topics.DELETE],
                     [topics.SECURITY_GROUP, topics.UPDATE]]
        if cfg.CONF.VXLAN.l2_population:
            consumers.append([topics.L2POPULATION,
                              topics.UPDATE, cfg.CONF.host])
        self.connection = agent_rpc.create_consumers(self.endpoints,
                                                     self.topic,
                                                     consumers)
        report_interval = cfg.CONF.AGENT.report_interval
        if report_interval:
            heartbeat = loopingcall.FixedIntervalLoopingCall(
                self._report_state)
            heartbeat.start(interval=report_interval)

下面分析另一个跟dhcp相关的RPC-client的创建。

        self.agent_notifiers[const.AGENT_TYPE_DHCP] = (

            dhcp_rpc_agent_api.DhcpAgentNotifyAPI()

        )

其中self.agent_notifiers属性是从父类/neutron/db/agentschedulers_db.py:AgentSchedulerDbMixin继承而来。

#/neutron/db/agentschedulers_db.py:AgentSchedulerDbMixin
class AgentSchedulerDbMixin(agents_db.AgentDbMixin):
    """Common class for agent scheduler mixins."""

    # agent notifiers to handle agent update operations;
    # should be updated by plugins;
    agent_notifiers = 
        constants.AGENT_TYPE_DHCP: None,
        constants.AGENT_TYPE_L3: None,
        constants.AGENT_TYPE_LOADBALANCER: None,
    

DhcpAgentNotifyAPI对象的创建如下。

#/neutron/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py:DhcpAgentNotifyAPI
class DhcpAgentNotifyAPI(object):
    """API for plugin to notify DHCP agent.

    This class implements the client side of an rpc interface.  The server side
    is neutron.agent.dhcp_agent.DhcpAgent.  For more information about changing
    rpc interfaces, please see doc/source/devref/rpc_api.rst.
    """
    # It seems dhcp agent does not support bulk operation
    VALID_RESOURCES = ['network', 'subnet', 'port']
    VALID_METHOD_NAMES = ['network.create.end',
                          'network.update.end',
                          'network.delete.end',
                          'subnet.create.end',
                          'subnet.update.end',
                          'subnet.delete.end',
                          'port.create.end',
                          'port.update.end',
                          'port.delete.end']

    def __init__(self, topic=topics.DHCP_AGENT, plugin=None):
        self._plugin = plugin
        target = oslo_messaging.Target(topic=topic, version='1.0')
        self.client = n_rpc.get_client(target)

其中RPC-client的topic的值为’dhcp_agent’。而对应的RPC-server是在neutron-dhcp-agent服务启动时创建的。

#neutron/agent/dhcp_agent.py
def main():
    register_options()
    common_config.init(sys.argv[1:])
    config.setup_logging()
    server = neutron_service.Service.create(
        binary='neutron-dhcp-agent',
        topic=topics.DHCP_AGENT,
        report_interval=cfg.CONF.AGENT.report_interval,
        manager='neutron.agent.dhcp.agent.DhcpAgentWithStateReport')
    service.launch(server).wait()

#neutron/service.py:Service
    def start(self):
        self.manager.init_host()
        super(Service, self).start()
        if self.report_interval:
            pulse = loopingcall.FixedIntervalLoopingCall(self.report_state)
            pulse.start(interval=self.report_interval,
                        initial_delay=self.report_interval)
            self.timers.append(pulse)

#neutron/common/rpc.py:Service
    def start(self):
        super(Service, self).start()

        self.conn = create_connection(new=True)
        LOG.debug("Creating Consumer connection for Service %s",
                  self.topic)

        endpoints = [self.manager]

        self.conn.create_consumer(self.topic, endpoints)

        # Hook to allow the manager to do other initializations after
        # the rpc connection is created.
        if callable(getattr(self.manager, 'initialize_service_hook', None)):
            self.manager.initialize_service_hook(self)

        # Consume from all consumers in threads
        self.conn.consume_in_threads()

        if self.periodic_interval:
            if self.periodic_fuzzy_delay:
                initial_delay = random.randint(0, self.periodic_fuzzy_delay)
            else:
                initial_delay = None

            periodic = loopingcall.FixedIntervalLoopingCall(
                self.periodic_tasks)
            periodic.start(interval=self.periodic_interval,
                           initial_delay=initial_delay)
            self.timers.append(periodic)
        self.manager.after_start()

总结一下,上面的coreplugin中的RPC-client与RPC-server的对应关系。

RPC-client

RPC-server

Class

service

endpoints

topic

service

AgentNotifierApi

neutron-server

(core plugin)

OVSNeutronAgent

q-agent-notifier-xxx-yyy

(topics.AGENT-xxx-yyy)

neutron-openvswitch-agent

LinuxBridgeRpcCallbacks

q-agent-notifier-xxx-yyy

(topics.AGENT-xxx-yyy)

neutron-linuxbridge-agent

DhcpAgentNotifyAPI

DhcpAgentWithStateReport

(inherit from DhcpAgent)

dhcp_agent

(topics.DHCP_AGENT)

neutron-dhcp-agent

其中,xxx表示资源(如port,network等),yyy表示操作(如update,delete等)。

至此_setup_rpc函数分析完成。

Ml2Plugin的__init__函数还剩下以下代码。

#neutron/plugins/ml2/plugin.py:Ml2Plugin.__init__
        # REVISIT(rkukura): Use stevedore for these?
        self.network_scheduler = importutils.import_object(
            cfg.CONF.network_scheduler_driver
        )

        self.start_periodic_dhcp_agent_status_check()
        LOG.info(_LI("Modular L2 Plugin initialization complete"))

network_scheduler_driver是在neutron.conf配置文件中的参数,其默认值为:

network_scheduler_driver = neutron.scheduler.dhcp_agent_scheduler.ChanceScheduler

所以self.network_scheduler为/neutron/scheduler/dhcp_agent_scheduler.py:ChanceScheduler对象。

start_periodic_dhcp_agent_status_check函数是一个周期性检测函数。

目前把core plugin的代码流程分析完成。下面分析service plugin的代码流程。

1.1.2 service plugin的创建

#/neutron/manager.py:NeutronManager.__init__
        # core plugin as a part of plugin collection simplifies
        # checking extensions
        # TODO(enikanorov): make core plugin the same as
        # the rest of service plugins
        self.service_plugins = constants.CORE: self.plugin
        self._load_service_plugins()

service plugin首先将core plugin载入到self.service_plugins中,然后执行self._load_service_plugins函数将自身的service plugin载入到self.service_plugins中。

#/neutron/manager.py:NeutronManager
    def _load_service_plugins(self):
        """Loads service plugins.

        Starts from the core plugin and checks if it supports
        advanced services then loads classes provided in configuration.
        """
        # load services from the core plugin first
        self._load_services_from_core_plugin()

        plugin_providers = cfg.CONF.service_plugins
        LOG.debug("Loading service plugins: %s", plugin_providers)
        for provider in plugin_providers:
            if provider == '':
                continue

            LOG.info(_LI("Loading Plugin: %s"), provider)
            plugin_inst = self._get_plugin_instance('neutron.service_plugins',
                                                    provider)

            # only one implementation of svc_type allowed
            # specifying more than one plugin
            # for the same type is a fatal exception
            if plugin_inst.get_plugin_type() in self.service_plugins:
                raise ValueError(_("Multiple plugins for service "
                                   "%s were configured") %
                                 plugin_inst.get_plugin_type())

            self.service_plugins[plugin_inst.get_plugin_type()] = plugin_inst

            # search for possible agent notifiers declared in service plugin
            # (needed by agent management extension)
            if (hasattr(self.plugin, 'agent_notifiers') and
                    hasattr(plugin_inst, 'agent_notifiers')):
                self.plugin.agent_notifiers.update(plugin_inst.agent_notifiers)

            LOG.debug("Successfully loaded %(type)s plugin. "
                      "Description: %(desc)s",
                      "type": plugin_inst.get_plugin_type(),
                       "desc": plugin_inst.get_plugin_description())

首先从core plugin中load service plugin。

#/neutron/manager.py:NeutronManager
    def _load_services_from_core_plugin(self):
        """Puts core plugin in service_plugins for supported services."""
        LOG.debug("Loading services supported by the core plugin")

        # supported service types are derived from supported extensions
        for ext_alias in getattr(self.plugin,
                                 "supported_extension_aliases", []):
            if ext_alias in constants.EXT_TO_SERVICE_MAPPING:
                service_type = constants.EXT_TO_SERVICE_MAPPING[ext_alias]
                self.service_plugins[service_type] = self.plugin
                LOG.info(_LI("Service %s is supported by the core plugin"),
                         service_type)

其中EXT_TO_SERVICE_MAPPING的信息如下。

#/neutron/plugins/common/constants.py
# service type constants:
CORE = "CORE"
DUMMY = "DUMMY"
LOADBALANCER = "LOADBALANCER"
LOADBALANCERV2 = "LOADBALANCERV2"
FIREWALL = "FIREWALL"
VPN = "VPN"
METERING = "METERING"
L3_ROUTER_NAT = "L3_ROUTER_NAT"


#maps extension alias to service type
EXT_TO_SERVICE_MAPPING = 
    'dummy': DUMMY,
    'lbaas': LOADBALANCER,
    'lbaasv2': LOADBALANCERV2,
    'fwaas': FIREWALL,
    'vpnaas': VPN,
    'metering': METERING,
    'router': L3_ROUTER_NAT

即从coreplugin(Ml2Plugin对象)的supported_extension_aliases函数返回的alias查找是否有EXT_TO_SERVICE_MAPPING相匹配的alias,如果有,则将core plugin载入到service plugin中。

#/neutron/plugins/ml2/plugin.py:Ml2Plugin
    # List of supported extensions
    _supported_extension_aliases = ["provider", "external-net", "binding",
                                    "quotas", "security-group", "agent",
                                    "dhcp_agent_scheduler",
                                    "multi-provider", "allowed-address-pairs",
                                    "extra_dhcp_opt", "subnet_allocation",
                                    "net-mtu", "vlan-transparent"]

    @property
    def supported_extension_aliases(self):
        if not hasattr(self, '_aliases'):
            aliases = self._supported_extension_aliases[:]
            aliases += self.extension_manager.extension_aliases()
            sg_rpc.disable_security_group_extension_by_config(aliases)
            vlantransparent.disable_extension_by_config(aliases)
            self._aliases = aliases
        return self._aliases

supported_extension_aliases函数将_supported_extension_aliases列表的alias赋给self._aliases变量。不过_supported_extension_aliases列表有13个alias,在supported_extension_aliases函数会判断是否移除’security-group’和’vlan-transparent’这两个alias。拿security-group举例说明。

#/neutron/agent/securitygroups_rpc.py
#This is backward compatibility check for Havana
def _is_valid_driver_combination():
    return ((cfg.CONF.SECURITYGROUP.enable_security_group and
             (cfg.CONF.SECURITYGROUP.firewall_driver and
              cfg.CONF.SECURITYGROUP.firewall_driver !=
             'neutron.agent.firewall.NoopFirewallDriver')) or
            (not cfg.CONF.SECURITYGROUP.enable_security_group and
             (cfg.CONF.SECURITYGROUP.firewall_driver ==
             'neutron.agent.firewall.NoopFirewallDriver' or
              cfg.CONF.SECURITYGROUP.firewall_driver is None)
             ))

def is_firewall_enabled():
    if not _is_valid_driver_combination():
        LOG.warn(_LW("Driver configuration doesn't match with "
                     "enable_security_group"))

    return cfg.CONF.SECURITYGROUP.enable_security_group

def _disable_extension(extension, aliases):
    if extension in aliases:
        aliases.remove(extension)

def disable_security_group_extension_by_config(aliases):
    if not is_firewall_enabled():
        LOG.info(_LI('Disabled security-group extension.'))
        _disable_extension('security-group', aliases)
        LOG.info(_LI('Disabled allowed-address-pairs extension.'))
        _disable_extension('allowed-address-pairs', aliases)

因为/etc/neutron/plugins/ml2/ml2_conf.ini配置文件中的enable_security_group参数设置如下。

enable_security_group = True

所以is_firewall_enabled函数返回true,所以disable_security_group_extension_by_config函数不会将’security-group’ alias移除。这里需要注意的。_is_valid_driver_combination函数用于check security group配置参数是否正确。其匹配如下。

enable_security_group

firewall_driver

True

firewall_driver !=  'neutron.agent.firewall.NoopFirewallDriver

False

firewall_driver == 'neutron.agent.firewall.NoopFirewallDriver

or firewall_driver is None

’vlan-transparent’ alias的remove比较简单。

#/neutron/extensions/vlantransparent.py
def disable_extension_by_config(aliases):
    if not cfg.CONF.vlan_transparent:
        if 'vlan-transparent' in aliases:
            aliases.remove('vlan-transparent')
        LOG.info(_LI('Disabled vlantransparent extension.'))

vlan_transparent 为neutron.conf配置文件中的参数。如下

vlan_transparent = False

所以’vlan-transparent’alias被remove了。

最终supported_extension_aliases函数返回的alias为12个:

['provider', 'external-net', 'binding', 'quotas', 'security-group', 'agent', 'dhcp_agent_scheduler', 'multi-provider', 'allowed-address-pairs', 'extra_dhcp_opt', 'subnet_allocation', 'net-mtu']

这里没有EXT_TO_SERVICE_MAPPING相匹配的alias。因此执行完_load_services_from_core_plugin函数后,service plugin目前还是只有core plugin被加载其中。

继续回到/neutron/manager.py:NeutronManager的_load_service_plugins函数。其内部在执行完_load_services_from_core_plugin函数后,将根据neutron.conf配置文件中的service_plugins参数去加载和创建service plugin对象。

service_plugins =neutron.services.l3_router.l3_router_plugin.L3RouterPlugin

所以最终的self.service_plugins变量的值为:

'L3_ROUTER_NAT': <neutron.services.l3_router.l3_router_plugin.L3RouterPlugin object at 0x42038d0>, 'CORE': <neutron.plugins.ml2.plugin.Ml2Plugin object at 0x360d910>

其中L3_ROUTER_NAT由get_plugin_type获取的。

#/neutron/plugins/common/constants.py
L3_ROUTER_NAT = "L3_ROUTER_NAT"

#/neutron/services/l3_router/l3_router_plugin.py:L3RouterPlugin
    def get_plugin_type(self):
        return constants.L3_ROUTER_NAT

下面我们分析serviceplugin中的L3RouterPlugin对象的创建。

#/neutron/services/l3_router/l3_router_plugin.py:L3RouterPlugin
class L3RouterPlugin(common_db_mixin.CommonDbMixin,
                     extraroute_db.ExtraRoute_db_mixin,
                     l3_hamode_db.L3_HA_NAT_db_mixin,
                     l3_gwmode_db.L3_NAT_db_mixin,
                     l3_dvrscheduler_db.L3_DVRsch_db_mixin,
                     l3_hascheduler_db.L3_HA_scheduler_db_mixin):

    """Implementation of the Neutron L3 Router Service Plugin.

    This class implements a L3 service plugin that provides
    router and floatingip resources and manages associated
    request/response.
    All DB related work is implemented in classes
    l3_db.L3_NAT_db_mixin, l3_hamode_db.L3_HA_NAT_db_mixin,
    l3_dvr_db.L3_NAT_with_dvr_db_mixin, and extraroute_db.ExtraRoute_db_mixin.
    """
    supported_extension_aliases = ["dvr", "router", "ext-gw-mode",
                                   "extraroute", "l3_agent_scheduler",
                                   "l3-ha"]

    def __init__(self):
        self.setup_rpc()
        self.router_scheduler = importutils.import_object(
            cfg.CONF.router_scheduler_driver)
        self.start_periodic_l3_agent_status_check()
        super(L3RouterPlugin, self).__init__()
        if 'dvr' in self.supported_extension_aliases:
            l3_dvrscheduler_db.subscribe()
        l3_db.subscribe()

首先,创建RPC-client。

#/neutron/services/l3_router/l3_router_plugin.py:L3RouterPlugin
    def setup_rpc(self):
        # RPC support
        self.topic = topics.L3PLUGIN
        self.conn = n_rpc.create_connection(new=True)
        self.agent_notifiers.update(
            q_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI())
        self.endpoints = [l3_rpc.L3RpcCallback()]
        self.conn.create_consumer(self.topic, self.endpoints,
                                  fanout=False)
        self.conn.consume_in_threads()

其中,

#/neutron/common/topics.py
L3PLUGIN = 'q-l3-plugin'

其中self.agent_notifiers属性是从父类/neutron/db/agentschedulers_db.py:AgentSchedulerDbMixin继承而来。

#/neutron/db/agentschedulers_db.py:AgentSchedulerDbMixin
class AgentSchedulerDbMixin(agents_db.AgentDbMixin):
    """Common class for agent scheduler mixins."""

    # agent notifiers to handle agent update operations;
    # should be updated by plugins;
    agent_notifiers = 
        constants.AGENT_TYPE_DHCP: None,
        constants.AGENT_TYPE_L3: None,
        constants.AGENT_TYPE_LOADBALANCER: None,
    

self.agent_notifiers中的AGENT_TYPE_L3所对应的value值为L3AgentNotifyAPI对象,如下。

#/neutron/api/rpc/agentnotifiers/l3_rpc_agent_api.py:L3AgentNotifyAPI
class L3AgentNotifyAPI(object):
    """API for plugin to notify L3 agent."""

    def __init__(self, topic=topics.L3_AGENT):
        target = oslo_messaging.Target(topic=topic, version='1.0')
        self.client = n_rpc.get_client(target)

L3AgentNotifyAPI对象创建了topic为L3_AGENT = 'l3_agent'的RPC-client。而topic为L3_AGENT ='l3_agent'的RPC-server是由neutron-l3-agent服务启动时创建的。

#/neutron/agent/l3_agent.py
def main(manager='neutron.agent.l3.agent.L3NATAgentWithStateReport'):
    register_opts(cfg.CONF)
    common_config.init(sys.argv[1:])
    config.setup_logging()
    server = neutron_service.Service.create(
        binary='neutron-l3-agent',
        topic=topics.L3_AGENT,
        report_interval=cfg.CONF.AGENT.report_interval,
        manager=manager)
service.launch(server).wait()

#neutron/service.py:Service
    def start(self):
        self.manager.init_host()
        super(Service, self).start()
        if self.report_interval:
            pulse = loopingcall.FixedIntervalLoopingCall(self.report_state)
            pulse.start(interval=self.report_interval,
                        initial_delay=self.report_interval)
            self.timers.append(pulse)

#neutron/common/rpc.py:Service
    def start(self):
        super(Service, self).start()

        self.conn = create_connection(new=True)
        LOG.debug("Creating Consumer connection for Service %s",
                  self.topic)

        endpoints = [self.manager]

        self.conn.create_consumer(self.topic, endpoints)

        # Hook to allow the manager to do other initializations after
        # the rpc connection is created.
        if callable(getattr(self.manager, 'initialize_service_hook', None)):
            self.manager.initialize_service_hook(self)

        # Consume from all consumers in threads
        self.conn.consume_in_threads()

        if self.periodic_interval:
            if self.periodic_fuzzy_delay:
                initial_delay = random.randint(0, self.periodic_fuzzy_delay)
            else:
                initial_delay = None

            periodic = loopingcall.FixedIntervalLoopingCall(
                self.periodic_tasks)
            periodic.start(interval=self.periodic_interval,
                           initial_delay=initial_delay)
            self.timers.append(periodic)
        self.manager.after_start()

同时setup_rpc也创建了一个topic为L3PLUGIN = 'q-l3-plugin'的RPC-server。

#/neutron/services/l3_router/l3_router_plugin.py:L3RouterPlugin.setup_rpc
        self.endpoints = [l3_rpc.L3RpcCallback()]
        self.conn.create_consumer(self.topic, self.endpoints,
                                  fanout=False)
        self.conn.consume_in_threads()

其endpoints为/neutron/api/rpc/handlers/l3_rpc.py:L3RpcCallback对象。其中,/neutron/agent/l3/agent.py:L3PluginApi将创建topic为为L3PLUGIN = 'q-l3-plugin'的RPC-client(neutron-l3-agent服务启动时创建的)去调用/neutron/api/rpc/handlers/l3_rpc.py:L3RpcCallback类中的函数。

#/neutron/agent/l3/agent.py:L3PluginApi
class L3PluginApi(object):
    """Agent side of the l3 agent RPC API.

    API version history:
        1.0 - Initial version.
        1.1 - Floating IP operational status updates
        1.2 - DVR support: new L3 plugin methods added.
              - get_ports_by_subnet
              - get_agent_gateway_port
              Needed by the agent when operating in DVR/DVR_SNAT mode
        1.3 - Get the list of activated services
        1.4 - Added L3 HA update_router_state. This method was reworked in
              to update_ha_routers_states
        1.5 - Added update_ha_routers_states

    """

    def __init__(self, topic, host):
        self.host = host
        target = oslo_messaging.Target(topic=topic, version='1.0')
        self.client = n_rpc.get_client(target)

#/neutron/agent/l3/agent.py:L3NATAgent
class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
                 ha.AgentMixin,
                 dvr.AgentMixin,
                 manager.Manager):
    """Manager for L3NatAgent

        API version history:
        1.0 initial Version
        1.1 changed the type of the routers parameter
            to the routers_updated method.
            It was previously a list of routers in dict format.
            It is now a list of router IDs only.
            Per rpc versioning rules,  it is backwards compatible.
        1.2 - DVR support: new L3 agent methods added.
              - add_arp_entry
              - del_arp_entry
              Needed by the L3 service when dealing with DVR
    """
    target = oslo_messaging.Target(version='1.2')

    def __init__(self, host, conf=None):
        if conf:
            self.conf = conf
        else:
            self.conf = cfg.CONF
        self.router_info = 

        self._check_config_params()

        self.process_monitor = external_process.ProcessMonitor(
            config=self.conf,
            resource_type='router')

              ... ... ...

        self.context = n_context.get_admin_context_without_session()
        self.plugin_rpc = L3PluginApi(topics.L3PLUGIN, host)
        self.fullsync = True
        ... ... ...

        self._queue = queue.RouterProcessingQueue()
        super(L3NATAgent, self).__init__(conf=self.conf)

        self.target_ex_net_id = None
        self.use_ipv6 = ipv6_utils.is_enabled()

        if self.conf.enable_metadata_proxy:
            self.metadata_driver = metadata_driver.MetadataDriver(self)

#/neutron/agent/l3/agent.py:L3NATAgentWithStateReport
class L3NATAgentWithStateReport(L3NATAgent):

    def __init__(self, host, conf=None):
        self.use_call = True
        super(L3NATAgentWithStateReport, self).__init__(host=host, conf=conf)
        self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
        self.agent_state = 
            'binary': 'neutron-l3-agent',
            'host': host,
            'topic': topics.L3_AGENT,
            'configurations': 
                'agent_mode': self.conf.agent_mode,
                'use_namespaces': self.conf.use_namespaces,
                'router_id': self.conf.router_id,
                'handle_internal_only_routers':
                self.conf.handle_internal_only_routers,
                'external

以上是关于neutron-server的启动流程的主要内容,如果未能解决你的问题,请参考以下文章

neutron-server源码分析总纲

neutron 启动流程

pdb /usr/bin/neutron-server

pdb /usr/bin/neutron-server

深入理解Activity启动流程–Activity启动相关类的类图

[转] 浅谈Linux系统的启动流程