neutron-server的启动流程
Posted gj4990
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了neutron-server的启动流程相关的知识,希望对你有一定的参考价值。
neutron-server的启动包括RPC-server的创建,RPC-client的创建,WSGI server的创建,因此neutron-server不单单起到与其他组件中的api的功能。本文将RPC相关创建和WSGI server的创建两方面进行代码流程的分析。
查看setup.cfg文件找到neutron-server的代码入口。
neutron-server = neutron.cmd.eventlet.server:main |
#/neutron/cmd/eventlet/server/__init__.py
def main():
server.main()
#/neutron/server/__init__.py
def main():
# the configuration will be read into the cfg.CONF global data structure
config.init(sys.argv[1:])
if not cfg.CONF.config_file:
sys.exit(_("ERROR: Unable to find configuration file via the default"
" search paths (~/.neutron/, ~/, /etc/neutron/, /etc/) and"
" the '--config-file' option!"))
try:
pool = eventlet.GreenPool()
neutron_api = service.serve_wsgi(service.NeutronApiService)
api_thread = pool.spawn(neutron_api.wait)
try:
neutron_rpc = service.serve_rpc()
except NotImplementedError:
LOG.info(_LI("RPC was already started in parent process by "
"plugin."))
else:
rpc_thread = pool.spawn(neutron_rpc.wait)
# api and rpc should die together. When one dies, kill the other.
rpc_thread.link(lambda gt: api_thread.kill())
api_thread.link(lambda gt: rpc_thread.kill())
pool.waitall()
except KeyboardInterrupt:
pass
except RuntimeError as e:
sys.exit(_("ERROR: %s") % e)
1. WSGIserver创建
neutron_api = service.serve_wsgi(service.NeutronApiService) |
#/neutron/service.py
def serve_wsgi(cls):
try:
service = cls.create()
service.start()
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE('Unrecoverable error: please check log '
'for details.'))
return service
#/neutron/service.py:NeutronApiService
class NeutronApiService(WsgiService):
"""Class for neutron-api service."""
@classmethod
def create(cls, app_name='neutron'):
# Setup logging early, supplying both the CLI options and the
# configuration mapping from the config file
# We only update the conf dict for the verbose and debug
# flags. Everything else must be set up in the conf file...
# Log the options used when starting if we're in debug mode...
config.setup_logging()
# Dump the initial option values
cfg.CONF.log_opt_values(LOG, std_logging.DEBUG)
service = cls(app_name)
return service
serve_wsgi函数将传入的NeutronApiService对象进行创建,并执行该对象的start函数,创建WSGI sever。
#/neutron/service.py:WsgiService
def start(self):
self.wsgi_app = _run_wsgi(self.app_name)
#/neutron/service.py
def _run_wsgi(app_name):
app = config.load_paste_app(app_name)
if not app:
LOG.error(_LE('No known API applications configured.'))
return
server = wsgi.Server("Neutron")
server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host,
workers=cfg.CONF.api_workers)
# Dump all option values here after all options are parsed
cfg.CONF.log_opt_values(LOG, std_logging.DEBUG)
LOG.info(_LI("Neutron service started, listening on %(host)s:%(port)s"),
'host': cfg.CONF.bind_host, 'port': cfg.CONF.bind_port)
return server
其中,
app = config.load_paste_app(app_name) |
为从api-paste.ini文件加载composite为neutron的相关信息。首先创建/neutron/wsgi.py:Server对象,执行Server对象的start函数,将创建WSGI server。
#/neutron/wsgi.py:Server
def start(self, application, port, host='0.0.0.0', workers=0):
"""Run a WSGI server with the given application."""
self._host = host
self._port = port
backlog = CONF.backlog
self._socket = self._get_socket(self._host,
self._port,
backlog=backlog)
self._launch(application, workers)
def _launch(self, application, workers=0):
service = WorkerService(self, application)
if workers < 1:
# The API service should run in the current process.
self._server = service
service.start()
systemd.notify_once()
else:
# dispose the whole pool before os.fork, otherwise there will
# be shared DB connections in child processes which may cause
# DB errors.
if CONF.database.connection:
api.get_engine().pool.dispose()
# The API service runs in a number of child processes.
# Minimize the cost of checking for child exit by extending the
# wait interval past the default of 0.01s.
self._server = common_service.ProcessLauncher(wait_interval=1.0)
self._server.launch_service(service, workers=workers)
其中,start函数中的host即为neutron.conf配置文件中bind_host参数,一般设置为管理网络IP,port为neutron.conf文件中的bind_port参数,一般neutron-server的bind_port参数设置为9696。首先在start函数调用self._get_socket函数创建一个socket去监听本机的9696端口。以下是我的OpenStack环境的neutron-server进程。
[root@jun neutron]# netstat -tnulp | grep 9696 tcp 0 0 192.168.118.1:9696 0.0.0.0:* LISTEN 35298/python2 |
然后在_lauch函数中创建WSGI server进程(即创建子进程用于处理neutronclient发来的HTTP请求)。这里WSGI server进程的个数根据neutron.conf配置文件中的api_workers进行指定,一般为系统cpu的个数。
具体的创建WSGI server进程的代码分析可以参看《 keystone WSGI流程》文章。下面分析最终根据api-paste.ini文件加载的可调用的resources。api-paste.ini文件内容如下。
[composite:neutron] use = egg:Paste#urlmap /: neutronversions /v2.0: neutronapi_v2_0
[composite:neutronapi_v2_0] use = call:neutron.auth:pipeline_factory noauth = request_id catch_errors extensions neutronapiapp_v2_0 keystone = request_id catch_errors authtoken keystonecontext extensions neutronapiapp_v2_0
[filter:request_id] paste.filter_factory = oslo.middleware:RequestId.factory
[filter:catch_errors] paste.filter_factory = oslo.middleware:CatchErrors.factory
[filter:keystonecontext] paste.filter_factory = neutron.auth:NeutronKeystoneContext.factory
[filter:authtoken] paste.filter_factory = keystonemiddleware.auth_token:filter_factory
[filter:extensions] paste.filter_factory = neutron.api.extensions:plugin_aware_extension_middleware_factory
[app:neutronversions] paste.app_factory = neutron.api.versions:Versions.factory
[app:neutronapiapp_v2_0] paste.app_factory = neutron.api.v2.router:APIRouter.factory |
composite为neutron时,有两个分支,一个是返回neutronversions的分支,一个是到composite为neutronapi_v2_0去调用相应的资源。我们发送HTTP请求到neutron,一般会调用/v2.0上的资源。比如create_port, update_port等等。查看composite为neutronapi_v2_0的部分,不管是noauth还是keystone,最终都将到app为neutronapiapp_v2_0的分支去加载资源。其中factory函数如下。
#/neutron/api/v2/router.py:APIRouter
class APIRouter(wsgi.Router):
@classmethod
def factory(cls, global_config, **local_config):
return cls(**local_config)
def __init__(self, **local_config):
mapper = routes_mapper.Mapper()
plugin = manager.NeutronManager.get_plugin()
ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
ext_mgr.extend_resources("2.0", attributes.RESOURCE_ATTRIBUTE_MAP)
col_kwargs = dict(collection_actions=COLLECTION_ACTIONS,
member_actions=MEMBER_ACTIONS)
def _map_resource(collection, resource, params, parent=None):
allow_bulk = cfg.CONF.allow_bulk
allow_pagination = cfg.CONF.allow_pagination
allow_sorting = cfg.CONF.allow_sorting
controller = base.create_resource(
collection, resource, plugin, params, allow_bulk=allow_bulk,
parent=parent, allow_pagination=allow_pagination,
allow_sorting=allow_sorting)
path_prefix = None
if parent:
path_prefix = "/%s/%s_id/%s" % (parent['collection_name'],
parent['member_name'],
collection)
mapper_kwargs = dict(controller=controller,
requirements=REQUIREMENTS,
path_prefix=path_prefix,
**col_kwargs)
return mapper.collection(collection, resource,
**mapper_kwargs)
mapper.connect('index', '/', controller=Index(RESOURCES))
for resource in RESOURCES:
_map_resource(RESOURCES[resource], resource,
attributes.RESOURCE_ATTRIBUTE_MAP.get(
RESOURCES[resource], dict()))
for resource in SUB_RESOURCES:
_map_resource(SUB_RESOURCES[resource]['collection_name'], resource,
attributes.RESOURCE_ATTRIBUTE_MAP.get(
SUB_RESOURCES[resource]['collection_name'],
dict()),
SUB_RESOURCES[resource]['parent'])
# Certain policy checks require that the extensions are loaded
# and the RESOURCE_ATTRIBUTE_MAP populated before they can be
# properly initialized. This can only be claimed with certainty
# once this point in the code has been reached. In the event
# that the policies have been initialized before this point,
# calling reset will cause the next policy check to
# re-initialize with all of the required data in place.
policy.reset()
super(APIRouter, self).__init__(mapper)
1.1 创建plugin(包括core plugin和service plugin)
#/neutron/api/v2/router.py:APIRouter
plugin = manager.NeutronManager.get_plugin()
#/neutron/manager.py:NeutronManager
@classmethod
def get_plugin(cls):
# Return a weakref to minimize gc-preventing references.
return weakref.proxy(cls.get_instance().plugin)
#/neutron/manager.py:NeutronManager
@classmethod
def get_instance(cls):
# double checked locking
if not cls.has_instance():
cls._create_instance()
return cls._instance
#/neutron/manager.py:NeutronManager
@classmethod
@utils.synchronized("manager")
def _create_instance(cls):
if not cls.has_instance():
cls._instance = cls()
#/neutron/manager.py:NeutronManager
class NeutronManager(object):
"""Neutron's Manager class.
Neutron's Manager class is responsible for parsing a config file and
instantiating the correct plugin that concretely implements
neutron_plugin_base class.
The caller should make sure that NeutronManager is a singleton.
"""
_instance = None
def __init__(self, options=None, config_file=None):
# If no options have been provided, create an empty dict
if not options:
options =
msg = validate_pre_plugin_load()
if msg:
LOG.critical(msg)
raise Exception(msg)
# NOTE(jkoelker) Testing for the subclass with the __subclasshook__
# breaks tach monitoring. It has been removed
# intentionally to allow v2 plugins to be monitored
# for performance metrics.
plugin_provider = cfg.CONF.core_plugin
LOG.info(_LI("Loading core plugin: %s"), plugin_provider)
self.plugin = self._get_plugin_instance(CORE_PLUGINS_NAMESPACE,
plugin_provider)
msg = validate_post_plugin_load()
if msg:
LOG.critical(msg)
raise Exception(msg)
# core plugin as a part of plugin collection simplifies
# checking extensions
# TODO(enikanorov): make core plugin the same as
# the rest of service plugins
self.service_plugins = constants.CORE: self.plugin
self._load_service_plugins()
get_plugin函数返回NeutronManager的plugin(core plugin)的弱引用,那么core plugin是什么对象呢?core plugin和service plugin在NeutronManager的__init__函数创建的。其中core plugin和service plugin分别根据neutron.conf配置文件中的core_plugin参数和service_plugins参数进行创建。
1.1.1 core plugin的创建
我的OpenStack环境的neutron.conf配置文件中的core_plugin参数值为:
core_plugin =neutron.plugins.ml2.plugin.Ml2Plugin |
其创建在_get_plugin_instance函数中执行。
#/neutron/manager.py:NeutronManager
def _get_plugin_instance(self, namespace, plugin_provider):
try:
# Try to resolve plugin by name
mgr = driver.DriverManager(namespace, plugin_provider)
plugin_class = mgr.driver
except RuntimeError as e1:
# fallback to class name
try:
plugin_class = importutils.import_class(plugin_provider)
except ImportError as e2:
LOG.exception(_LE("Error loading plugin by name, %s"), e1)
LOG.exception(_LE("Error loading plugin by class, %s"), e2)
raise ImportError(_("Plugin not found."))
return plugin_class()
利用stevedore模块创建core_plugin,即core_plugin为/neutron/plugins/ml2/plugin.py:Ml2Plugin对象。
#/neutron/plugins/ml2/plugin.py:Ml2Plugin
def __init__(self):
# First load drivers, then initialize DB, then initialize drivers
self.type_manager = managers.TypeManager()
self.extension_manager = managers.ExtensionManager()
self.mechanism_manager = managers.MechanismManager()
super(Ml2Plugin, self).__init__()
self.type_manager.initialize()
self.extension_manager.initialize()
self.mechanism_manager.initialize()
self._setup_rpc()
# REVISIT(rkukura): Use stevedore for these?
self.network_scheduler = importutils.import_object(
cfg.CONF.network_scheduler_driver
)
self.start_periodic_dhcp_agent_status_check()
LOG.info(_LI("Modular L2 Plugin initialization complete"))
由于type_manager,extension_manager和mechanism_manager的创建都类似,所以这里我们主要分析type_manager代码流程,其余的简要说明。
#/neutron/plugins/ml2/managers.py:TypeManager
class TypeManager(stevedore.named.NamedExtensionManager):
"""Manage network segment types using drivers."""
def __init__(self):
# Mapping from type name to DriverManager
self.drivers =
LOG.info(_LI("Configured type driver names: %s"),
cfg.CONF.ml2.type_drivers)
super(TypeManager, self).__init__('neutron.ml2.type_drivers',
cfg.CONF.ml2.type_drivers,
invoke_on_load=True)
LOG.info(_LI("Loaded type driver names: %s"), self.names())
self._register_types()
self._check_tenant_network_types(cfg.CONF.ml2.tenant_network_types)
这里,TypeManager对象中的drivers即为type driver对象。drivers根据/etc/neutron/plugins/ml2/ml2_conf.ini配置文件中的type_drivers参数去构造。在我的OpenStack环境中type_drivers参数值如下。
type_drivers = vlan,flat |
两个driver对象的创建在TypeManager类的父类中完成。且创建两个对象被/stevedore/extension.py:Extension包裹。
#/neutron/plugins/ml2/managers.py:TypeManager
def _register_types(self):
for ext in self:
network_type = ext.obj.get_type()
if network_type in self.drivers:
LOG.error(_LE("Type driver '%(new_driver)s' ignored because"
" type driver '%(old_driver)s' is already"
" registered for type '%(type)s'"),
'new_driver': ext.name,
'old_driver': self.drivers[network_type].name,
'type': network_type)
else:
self.drivers[network_type] = ext
LOG.info(_LI("Registered types: %s"), self.drivers.keys())
这里,将创建完成的typedriver对象register到self.drivers字典中,如下。
self.drivers: 'flat': <stevedore.extension.Extension object at 0x428cad0>, 'vlan': <stevedore.extension.Extension object at 0x4299150> |
如果调用实际的flat或vlan对于的type driver的函数,需要访问/stevedore/extension.py:Extension对象的obj成员变量,obj成员变量即为实际的type driver的对象。如下
‘flat’: <neutron.plugins.ml2.drivers.type_flat.FlatTypeDriver object at 0x46cfb50> ‘vlan’: <neutron.plugins.ml2.drivers.type_vlan.VlanTypeDriver object at 0x46a9bd0> |
在register typedriver后,check tenant的network type是否在我们所register的type driver中,如果没有,则raise异常。其中tenant的network type是由/etc/neutron/plugins/ml2/ml2_conf.ini配置文件中的tenant_network_types参数设置。我的OpenStack环境的tenant_network_types参数配置如下。
tenant_network_types = vlan |
#/neutron/plugins/ml2/managers.py:TypeManager
def _check_tenant_network_types(self, types):
self.tenant_network_types = []
for network_type in types:
if network_type in self.drivers:
self.tenant_network_types.append(network_type)
else:
LOG.error(_LE("No type driver for tenant network_type: %s. "
"Service terminated!"), network_type)
raise SystemExit(1)
LOG.info(_LI("Tenant network_types: %s"), self.tenant_network_types)
所以执行_check_tenant_network_types函数时,tenant network type校验成功。
mechanism_manager的创建也是类似的,它管理的mechanism driver。其根据/etc/neutron/plugins/ml2/ml2_conf.ini配置文件中的mechanism_drivers参数去构造mechanism driver对象。我的OpenStack环境mechanism_drivers配置参数如下。
mechanism_drivers =linuxbridge |
构建的mechanismdriver对象为:
<neutron.plugins.ml2.drivers.mech_linuxbridge.LinuxbridgeMechanismDriver object at 0x3f42b90> |
extension_manager的创建也类似。
在type_manager,extension_manager和mechanism_manager创建完成后,执行initialize函数对所创建的driver进行初始化。比如type_manager的initialize函数如下。
#/neutron/plugins/ml2/managers.py:TypeManager
def initialize(self):
for network_type, driver in self.drivers.iteritems():
LOG.info(_LI("Initializing driver for type '%s'"), network_type)
driver.obj.initialize()
从上可以看出,最终会调用type_manager所管理的driver的initialize函数。
下面分析创建rpc-client的代码流程。即执行以下代码。
self._setup_rpc() |
#/neutron/plugins/ml2/managers.py:TypeManager
def _setup_rpc(self):
self.notifier = rpc.AgentNotifierApi(topics.AGENT)
self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
)
这里topics.AGENT=’q-agent-notifier’。
#/neutron/plugins/ml2/rpc.py:AgentNotifierApi
class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
sg_rpc.SecurityGroupAgentRpcApiMixin,
type_tunnel.TunnelAgentRpcApiMixin):
"""Agent side of the openvswitch rpc API.
API version history:
1.0 - Initial version.
1.1 - Added get_active_networks_info, create_dhcp_port,
update_dhcp_port, and removed get_dhcp_port methods.
"""
def __init__(self, topic):
self.topic = topic
self.topic_network_delete = topics.get_topic_name(topic,
topics.NETWORK,
topics.DELETE)
self.topic_port_update = topics.get_topic_name(topic,
topics.PORT,
topics.UPDATE)
self.topic_port_delete = topics.get_topic_name(topic,
topics.PORT,
topics.DELETE)
target = oslo_messaging.Target(topic=topic, version='1.0')
self.client = n_rpc.get_client(target)
#/neutron/common/topics.py
def get_topic_name(prefix, table, operation, host=None):
"""Create a topic name.
The topic name needs to be synced between the agent and the
plugin. The plugin will send a fanout message to all of the
listening agents so that the agents in turn can perform their
updates accordingly.
:param prefix: Common prefix for the plugin/agent message queues.
:param table: The table in question (NETWORK, SUBNET, PORT).
:param operation: The operation that invokes notification (CREATE,
DELETE, UPDATE)
:param host: Add host to the topic
:returns: The topic name.
"""
if host:
return '%s-%s-%s.%s' % (prefix, table, operation, host)
return '%s-%s-%s' % (prefix, table, operation)
其中/neutron/common/topics.py文件中定义了一些常量如下。
#/neutron/common/topics.py NETWORK = 'network' SUBNET = 'subnet' PORT = 'port' SECURITY_GROUP = 'security_group' L2POPULATION = 'l2population' DVR = 'dvr'
CREATE = 'create' DELETE = 'delete' UPDATE = 'update'
AGENT = 'q-agent-notifier' PLUGIN = 'q-plugin' L3PLUGIN = 'q-l3-plugin' DHCP = 'q-dhcp-notifer' FIREWALL_PLUGIN = 'q-firewall-plugin' METERING_PLUGIN = 'q-metering-plugin' LOADBALANCER_PLUGIN = 'n-lbaas-plugin'
L3_AGENT = 'l3_agent' DHCP_AGENT = 'dhcp_agent' METERING_AGENT = 'metering_agent' LOADBALANCER_AGENT = 'n-lbaas_agent' |
所以AgentNotifierApi对象创建了通过AMQP到topic为’q-agent-notifier’的RPC-server的绑定。即AgentNotifierApi可远程调用执行topic为’q-agent-notifier’的RPC-server上的函数。不过这里,并没有纯粹的调用topic为’q-agent-notifier’的RPC-server函数。而是通过prepare函数修改topic构建新的Target来调用3种另外的topic的RPC-server上的函数。这3种topic为(根据get_topic_name函数构造出来的):
‘q-agent-notifier-network-delete’ ‘q-agent-notifier-port-update’ ‘q-agent-notifier-port-delete’ |
那么对应的这3个topic的RPC-server是哪个服务构建的呢?通过AgentNotifierApi类的解释可知,应该是neutron-openvswitch-agent服务构建的。如下
#/neutron/plugins/opensvswitch/agent/ovs_neutron_agent.py:OVSNeutronAgent
def setup_rpc(self):
self.agent_id = 'ovs-agent-%s' % cfg.CONF.host
self.topic = topics.AGENT
self.plugin_rpc = OVSPluginApi(topics.PLUGIN)
self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
self.dvr_plugin_rpc = dvr_rpc.DVRServerRpcApi(topics.PLUGIN)
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
# RPC network init
self.context = context.get_admin_context_without_session()
# Handle updates from service
self.endpoints = [self]
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.PORT, topics.DELETE],
[topics.NETWORK, topics.DELETE],
[constants.TUNNEL, topics.UPDATE],
[constants.TUNNEL, topics.DELETE],
[topics.SECURITY_GROUP, topics.UPDATE],
[topics.DVR, topics.UPDATE]]
if self.l2_pop:
consumers.append([topics.L2POPULATION,
topics.UPDATE, cfg.CONF.host])
self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers,
start_listening=False)
#/neutron/agent/rpc.py
def create_consumers(endpoints, prefix, topic_details, start_listening=True):
"""Create agent RPC consumers.
:param endpoints: The list of endpoints to process the incoming messages.
:param prefix: Common prefix for the plugin/agent message queues.
:param topic_details: A list of topics. Each topic has a name, an
operation, and an optional host param keying the
subscription to topic.host for plugin calls.
:param start_listening: if True, it starts the processing loop
:returns: A common Connection.
"""
connection = n_rpc.create_connection(new=True)
for details in topic_details:
topic, operation, node_name = itertools.islice(
itertools.chain(details, [None]), 3)
topic_name = topics.get_topic_name(prefix, topic, operation)
connection.create_consumer(topic_name, endpoints, fanout=True)
if node_name:
node_topic_name = '%s.%s' % (topic_name, node_name)
connection.create_consumer(node_topic_name,
endpoints,
fanout=False)
if start_listening:
connection.consume_in_threads()
return connection
从上面的代码可以看出,neutron-openvswitch-agent构建的RPC-server将会最多有8个topic。即
‘q-agent-notifier-port-update’ ‘q-agent-notifier-port-delete’ ‘q-agent-notifier-network-delete’ ‘q-agent-notifier-tunnel-update’ ‘q-agent-notifier-tunnel-delete’ ‘q-agent-notifier-security_group-update’ ‘q-agent-notifier-dvr-update’ ‘q-agent-notifier-l2population–update.jun2’ |
从上面8个topic可以看出,包括AgentNotifierApi类所构建RPC-client所需的topic。
不过,我的OpenStack环境采用的linuxbridge的mechanism driver,而在neutron-linuxbridge-agent服务启动时,创建RPC-server时,没有topic为’ q-agent-notifier-port-delete’的Target,所以采用的linuxbridge的mechanism driver时,AgentNotifierApi对象不能调用port_delete函数。如下
#/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py:LinuxBridgeNeutronAgentRPC
def setup_rpc(self, physical_interfaces):
if physical_interfaces:
mac = utils.get_interface_mac(physical_interfaces[0])
else:
devices = ip_lib.IPWrapper().get_devices(True)
if devices:
mac = utils.get_interface_mac(devices[0].name)
else:
LOG.error(_LE("Unable to obtain MAC address for unique ID. "
"Agent terminated!"))
exit(1)
self.agent_id = '%s%s' % ('lb', (mac.replace(":", "")))
LOG.info(_LI("RPC agent_id: %s"), self.agent_id)
self.topic = topics.AGENT
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
# RPC network init
# Handle updates from service
self.endpoints = [LinuxBridgeRpcCallbacks(self.context, self,
self.sg_agent)]
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.NETWORK, topics.DELETE],
[topics.SECURITY_GROUP, topics.UPDATE]]
if cfg.CONF.VXLAN.l2_population:
consumers.append([topics.L2POPULATION,
topics.UPDATE, cfg.CONF.host])
self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)
report_interval = cfg.CONF.AGENT.report_interval
if report_interval:
heartbeat = loopingcall.FixedIntervalLoopingCall(
self._report_state)
heartbeat.start(interval=report_interval)
下面分析另一个跟dhcp相关的RPC-client的创建。
self.agent_notifiers[const.AGENT_TYPE_DHCP] = ( dhcp_rpc_agent_api.DhcpAgentNotifyAPI() ) |
其中self.agent_notifiers属性是从父类/neutron/db/agentschedulers_db.py:AgentSchedulerDbMixin继承而来。
#/neutron/db/agentschedulers_db.py:AgentSchedulerDbMixin
class AgentSchedulerDbMixin(agents_db.AgentDbMixin):
"""Common class for agent scheduler mixins."""
# agent notifiers to handle agent update operations;
# should be updated by plugins;
agent_notifiers =
constants.AGENT_TYPE_DHCP: None,
constants.AGENT_TYPE_L3: None,
constants.AGENT_TYPE_LOADBALANCER: None,
DhcpAgentNotifyAPI对象的创建如下。
#/neutron/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py:DhcpAgentNotifyAPI
class DhcpAgentNotifyAPI(object):
"""API for plugin to notify DHCP agent.
This class implements the client side of an rpc interface. The server side
is neutron.agent.dhcp_agent.DhcpAgent. For more information about changing
rpc interfaces, please see doc/source/devref/rpc_api.rst.
"""
# It seems dhcp agent does not support bulk operation
VALID_RESOURCES = ['network', 'subnet', 'port']
VALID_METHOD_NAMES = ['network.create.end',
'network.update.end',
'network.delete.end',
'subnet.create.end',
'subnet.update.end',
'subnet.delete.end',
'port.create.end',
'port.update.end',
'port.delete.end']
def __init__(self, topic=topics.DHCP_AGENT, plugin=None):
self._plugin = plugin
target = oslo_messaging.Target(topic=topic, version='1.0')
self.client = n_rpc.get_client(target)
其中RPC-client的topic的值为’dhcp_agent’。而对应的RPC-server是在neutron-dhcp-agent服务启动时创建的。
#neutron/agent/dhcp_agent.py
def main():
register_options()
common_config.init(sys.argv[1:])
config.setup_logging()
server = neutron_service.Service.create(
binary='neutron-dhcp-agent',
topic=topics.DHCP_AGENT,
report_interval=cfg.CONF.AGENT.report_interval,
manager='neutron.agent.dhcp.agent.DhcpAgentWithStateReport')
service.launch(server).wait()
#neutron/service.py:Service
def start(self):
self.manager.init_host()
super(Service, self).start()
if self.report_interval:
pulse = loopingcall.FixedIntervalLoopingCall(self.report_state)
pulse.start(interval=self.report_interval,
initial_delay=self.report_interval)
self.timers.append(pulse)
#neutron/common/rpc.py:Service
def start(self):
super(Service, self).start()
self.conn = create_connection(new=True)
LOG.debug("Creating Consumer connection for Service %s",
self.topic)
endpoints = [self.manager]
self.conn.create_consumer(self.topic, endpoints)
# Hook to allow the manager to do other initializations after
# the rpc connection is created.
if callable(getattr(self.manager, 'initialize_service_hook', None)):
self.manager.initialize_service_hook(self)
# Consume from all consumers in threads
self.conn.consume_in_threads()
if self.periodic_interval:
if self.periodic_fuzzy_delay:
initial_delay = random.randint(0, self.periodic_fuzzy_delay)
else:
initial_delay = None
periodic = loopingcall.FixedIntervalLoopingCall(
self.periodic_tasks)
periodic.start(interval=self.periodic_interval,
initial_delay=initial_delay)
self.timers.append(periodic)
self.manager.after_start()
总结一下,上面的coreplugin中的RPC-client与RPC-server的对应关系。
RPC-client | RPC-server | |||
Class | service | endpoints | topic | service |
AgentNotifierApi | neutron-server (core plugin) | OVSNeutronAgent | q-agent-notifier-xxx-yyy (topics.AGENT-xxx-yyy) | neutron-openvswitch-agent |
LinuxBridgeRpcCallbacks | q-agent-notifier-xxx-yyy (topics.AGENT-xxx-yyy) | neutron-linuxbridge-agent | ||
DhcpAgentNotifyAPI | DhcpAgentWithStateReport (inherit from DhcpAgent) | dhcp_agent (topics.DHCP_AGENT) | neutron-dhcp-agent |
其中,xxx表示资源(如port,network等),yyy表示操作(如update,delete等)。
至此_setup_rpc函数分析完成。
Ml2Plugin的__init__函数还剩下以下代码。
#neutron/plugins/ml2/plugin.py:Ml2Plugin.__init__
# REVISIT(rkukura): Use stevedore for these?
self.network_scheduler = importutils.import_object(
cfg.CONF.network_scheduler_driver
)
self.start_periodic_dhcp_agent_status_check()
LOG.info(_LI("Modular L2 Plugin initialization complete"))
network_scheduler_driver是在neutron.conf配置文件中的参数,其默认值为:
network_scheduler_driver = neutron.scheduler.dhcp_agent_scheduler.ChanceScheduler |
所以self.network_scheduler为/neutron/scheduler/dhcp_agent_scheduler.py:ChanceScheduler对象。
start_periodic_dhcp_agent_status_check函数是一个周期性检测函数。
目前把core plugin的代码流程分析完成。下面分析service plugin的代码流程。
1.1.2 service plugin的创建
#/neutron/manager.py:NeutronManager.__init__
# core plugin as a part of plugin collection simplifies
# checking extensions
# TODO(enikanorov): make core plugin the same as
# the rest of service plugins
self.service_plugins = constants.CORE: self.plugin
self._load_service_plugins()
service plugin首先将core plugin载入到self.service_plugins中,然后执行self._load_service_plugins函数将自身的service plugin载入到self.service_plugins中。
#/neutron/manager.py:NeutronManager
def _load_service_plugins(self):
"""Loads service plugins.
Starts from the core plugin and checks if it supports
advanced services then loads classes provided in configuration.
"""
# load services from the core plugin first
self._load_services_from_core_plugin()
plugin_providers = cfg.CONF.service_plugins
LOG.debug("Loading service plugins: %s", plugin_providers)
for provider in plugin_providers:
if provider == '':
continue
LOG.info(_LI("Loading Plugin: %s"), provider)
plugin_inst = self._get_plugin_instance('neutron.service_plugins',
provider)
# only one implementation of svc_type allowed
# specifying more than one plugin
# for the same type is a fatal exception
if plugin_inst.get_plugin_type() in self.service_plugins:
raise ValueError(_("Multiple plugins for service "
"%s were configured") %
plugin_inst.get_plugin_type())
self.service_plugins[plugin_inst.get_plugin_type()] = plugin_inst
# search for possible agent notifiers declared in service plugin
# (needed by agent management extension)
if (hasattr(self.plugin, 'agent_notifiers') and
hasattr(plugin_inst, 'agent_notifiers')):
self.plugin.agent_notifiers.update(plugin_inst.agent_notifiers)
LOG.debug("Successfully loaded %(type)s plugin. "
"Description: %(desc)s",
"type": plugin_inst.get_plugin_type(),
"desc": plugin_inst.get_plugin_description())
首先从core plugin中load service plugin。
#/neutron/manager.py:NeutronManager
def _load_services_from_core_plugin(self):
"""Puts core plugin in service_plugins for supported services."""
LOG.debug("Loading services supported by the core plugin")
# supported service types are derived from supported extensions
for ext_alias in getattr(self.plugin,
"supported_extension_aliases", []):
if ext_alias in constants.EXT_TO_SERVICE_MAPPING:
service_type = constants.EXT_TO_SERVICE_MAPPING[ext_alias]
self.service_plugins[service_type] = self.plugin
LOG.info(_LI("Service %s is supported by the core plugin"),
service_type)
其中EXT_TO_SERVICE_MAPPING的信息如下。
#/neutron/plugins/common/constants.py
# service type constants:
CORE = "CORE"
DUMMY = "DUMMY"
LOADBALANCER = "LOADBALANCER"
LOADBALANCERV2 = "LOADBALANCERV2"
FIREWALL = "FIREWALL"
VPN = "VPN"
METERING = "METERING"
L3_ROUTER_NAT = "L3_ROUTER_NAT"
#maps extension alias to service type
EXT_TO_SERVICE_MAPPING =
'dummy': DUMMY,
'lbaas': LOADBALANCER,
'lbaasv2': LOADBALANCERV2,
'fwaas': FIREWALL,
'vpnaas': VPN,
'metering': METERING,
'router': L3_ROUTER_NAT
即从coreplugin(Ml2Plugin对象)的supported_extension_aliases函数返回的alias查找是否有EXT_TO_SERVICE_MAPPING相匹配的alias,如果有,则将core plugin载入到service plugin中。
#/neutron/plugins/ml2/plugin.py:Ml2Plugin
# List of supported extensions
_supported_extension_aliases = ["provider", "external-net", "binding",
"quotas", "security-group", "agent",
"dhcp_agent_scheduler",
"multi-provider", "allowed-address-pairs",
"extra_dhcp_opt", "subnet_allocation",
"net-mtu", "vlan-transparent"]
@property
def supported_extension_aliases(self):
if not hasattr(self, '_aliases'):
aliases = self._supported_extension_aliases[:]
aliases += self.extension_manager.extension_aliases()
sg_rpc.disable_security_group_extension_by_config(aliases)
vlantransparent.disable_extension_by_config(aliases)
self._aliases = aliases
return self._aliases
supported_extension_aliases函数将_supported_extension_aliases列表的alias赋给self._aliases变量。不过_supported_extension_aliases列表有13个alias,在supported_extension_aliases函数会判断是否移除’security-group’和’vlan-transparent’这两个alias。拿security-group举例说明。
#/neutron/agent/securitygroups_rpc.py
#This is backward compatibility check for Havana
def _is_valid_driver_combination():
return ((cfg.CONF.SECURITYGROUP.enable_security_group and
(cfg.CONF.SECURITYGROUP.firewall_driver and
cfg.CONF.SECURITYGROUP.firewall_driver !=
'neutron.agent.firewall.NoopFirewallDriver')) or
(not cfg.CONF.SECURITYGROUP.enable_security_group and
(cfg.CONF.SECURITYGROUP.firewall_driver ==
'neutron.agent.firewall.NoopFirewallDriver' or
cfg.CONF.SECURITYGROUP.firewall_driver is None)
))
def is_firewall_enabled():
if not _is_valid_driver_combination():
LOG.warn(_LW("Driver configuration doesn't match with "
"enable_security_group"))
return cfg.CONF.SECURITYGROUP.enable_security_group
def _disable_extension(extension, aliases):
if extension in aliases:
aliases.remove(extension)
def disable_security_group_extension_by_config(aliases):
if not is_firewall_enabled():
LOG.info(_LI('Disabled security-group extension.'))
_disable_extension('security-group', aliases)
LOG.info(_LI('Disabled allowed-address-pairs extension.'))
_disable_extension('allowed-address-pairs', aliases)
因为/etc/neutron/plugins/ml2/ml2_conf.ini配置文件中的enable_security_group参数设置如下。
enable_security_group = True |
所以is_firewall_enabled函数返回true,所以disable_security_group_extension_by_config函数不会将’security-group’ alias移除。这里需要注意的。_is_valid_driver_combination函数用于check security group配置参数是否正确。其匹配如下。
enable_security_group | firewall_driver |
True | firewall_driver != 'neutron.agent.firewall.NoopFirewallDriver |
False | firewall_driver == 'neutron.agent.firewall.NoopFirewallDriver |
or firewall_driver is None |
’vlan-transparent’ alias的remove比较简单。
#/neutron/extensions/vlantransparent.py
def disable_extension_by_config(aliases):
if not cfg.CONF.vlan_transparent:
if 'vlan-transparent' in aliases:
aliases.remove('vlan-transparent')
LOG.info(_LI('Disabled vlantransparent extension.'))
vlan_transparent 为neutron.conf配置文件中的参数。如下
vlan_transparent = False |
所以’vlan-transparent’alias被remove了。
最终supported_extension_aliases函数返回的alias为12个:
['provider', 'external-net', 'binding', 'quotas', 'security-group', 'agent', 'dhcp_agent_scheduler', 'multi-provider', 'allowed-address-pairs', 'extra_dhcp_opt', 'subnet_allocation', 'net-mtu'] |
这里没有EXT_TO_SERVICE_MAPPING相匹配的alias。因此执行完_load_services_from_core_plugin函数后,service plugin目前还是只有core plugin被加载其中。
继续回到/neutron/manager.py:NeutronManager的_load_service_plugins函数。其内部在执行完_load_services_from_core_plugin函数后,将根据neutron.conf配置文件中的service_plugins参数去加载和创建service plugin对象。
service_plugins =neutron.services.l3_router.l3_router_plugin.L3RouterPlugin |
所以最终的self.service_plugins变量的值为:
'L3_ROUTER_NAT': <neutron.services.l3_router.l3_router_plugin.L3RouterPlugin object at 0x42038d0>, 'CORE': <neutron.plugins.ml2.plugin.Ml2Plugin object at 0x360d910> |
其中L3_ROUTER_NAT由get_plugin_type获取的。
#/neutron/plugins/common/constants.py
L3_ROUTER_NAT = "L3_ROUTER_NAT"
#/neutron/services/l3_router/l3_router_plugin.py:L3RouterPlugin
def get_plugin_type(self):
return constants.L3_ROUTER_NAT
下面我们分析serviceplugin中的L3RouterPlugin对象的创建。
#/neutron/services/l3_router/l3_router_plugin.py:L3RouterPlugin
class L3RouterPlugin(common_db_mixin.CommonDbMixin,
extraroute_db.ExtraRoute_db_mixin,
l3_hamode_db.L3_HA_NAT_db_mixin,
l3_gwmode_db.L3_NAT_db_mixin,
l3_dvrscheduler_db.L3_DVRsch_db_mixin,
l3_hascheduler_db.L3_HA_scheduler_db_mixin):
"""Implementation of the Neutron L3 Router Service Plugin.
This class implements a L3 service plugin that provides
router and floatingip resources and manages associated
request/response.
All DB related work is implemented in classes
l3_db.L3_NAT_db_mixin, l3_hamode_db.L3_HA_NAT_db_mixin,
l3_dvr_db.L3_NAT_with_dvr_db_mixin, and extraroute_db.ExtraRoute_db_mixin.
"""
supported_extension_aliases = ["dvr", "router", "ext-gw-mode",
"extraroute", "l3_agent_scheduler",
"l3-ha"]
def __init__(self):
self.setup_rpc()
self.router_scheduler = importutils.import_object(
cfg.CONF.router_scheduler_driver)
self.start_periodic_l3_agent_status_check()
super(L3RouterPlugin, self).__init__()
if 'dvr' in self.supported_extension_aliases:
l3_dvrscheduler_db.subscribe()
l3_db.subscribe()
首先,创建RPC-client。
#/neutron/services/l3_router/l3_router_plugin.py:L3RouterPlugin
def setup_rpc(self):
# RPC support
self.topic = topics.L3PLUGIN
self.conn = n_rpc.create_connection(new=True)
self.agent_notifiers.update(
q_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI())
self.endpoints = [l3_rpc.L3RpcCallback()]
self.conn.create_consumer(self.topic, self.endpoints,
fanout=False)
self.conn.consume_in_threads()
其中,
#/neutron/common/topics.py
L3PLUGIN = 'q-l3-plugin'
其中self.agent_notifiers属性是从父类/neutron/db/agentschedulers_db.py:AgentSchedulerDbMixin继承而来。
#/neutron/db/agentschedulers_db.py:AgentSchedulerDbMixin
class AgentSchedulerDbMixin(agents_db.AgentDbMixin):
"""Common class for agent scheduler mixins."""
# agent notifiers to handle agent update operations;
# should be updated by plugins;
agent_notifiers =
constants.AGENT_TYPE_DHCP: None,
constants.AGENT_TYPE_L3: None,
constants.AGENT_TYPE_LOADBALANCER: None,
self.agent_notifiers中的AGENT_TYPE_L3所对应的value值为L3AgentNotifyAPI对象,如下。
#/neutron/api/rpc/agentnotifiers/l3_rpc_agent_api.py:L3AgentNotifyAPI
class L3AgentNotifyAPI(object):
"""API for plugin to notify L3 agent."""
def __init__(self, topic=topics.L3_AGENT):
target = oslo_messaging.Target(topic=topic, version='1.0')
self.client = n_rpc.get_client(target)
L3AgentNotifyAPI对象创建了topic为L3_AGENT = 'l3_agent'的RPC-client。而topic为L3_AGENT ='l3_agent'的RPC-server是由neutron-l3-agent服务启动时创建的。
#/neutron/agent/l3_agent.py
def main(manager='neutron.agent.l3.agent.L3NATAgentWithStateReport'):
register_opts(cfg.CONF)
common_config.init(sys.argv[1:])
config.setup_logging()
server = neutron_service.Service.create(
binary='neutron-l3-agent',
topic=topics.L3_AGENT,
report_interval=cfg.CONF.AGENT.report_interval,
manager=manager)
service.launch(server).wait()
#neutron/service.py:Service
def start(self):
self.manager.init_host()
super(Service, self).start()
if self.report_interval:
pulse = loopingcall.FixedIntervalLoopingCall(self.report_state)
pulse.start(interval=self.report_interval,
initial_delay=self.report_interval)
self.timers.append(pulse)
#neutron/common/rpc.py:Service
def start(self):
super(Service, self).start()
self.conn = create_connection(new=True)
LOG.debug("Creating Consumer connection for Service %s",
self.topic)
endpoints = [self.manager]
self.conn.create_consumer(self.topic, endpoints)
# Hook to allow the manager to do other initializations after
# the rpc connection is created.
if callable(getattr(self.manager, 'initialize_service_hook', None)):
self.manager.initialize_service_hook(self)
# Consume from all consumers in threads
self.conn.consume_in_threads()
if self.periodic_interval:
if self.periodic_fuzzy_delay:
initial_delay = random.randint(0, self.periodic_fuzzy_delay)
else:
initial_delay = None
periodic = loopingcall.FixedIntervalLoopingCall(
self.periodic_tasks)
periodic.start(interval=self.periodic_interval,
initial_delay=initial_delay)
self.timers.append(periodic)
self.manager.after_start()
同时setup_rpc也创建了一个topic为L3PLUGIN = 'q-l3-plugin'的RPC-server。
#/neutron/services/l3_router/l3_router_plugin.py:L3RouterPlugin.setup_rpc
self.endpoints = [l3_rpc.L3RpcCallback()]
self.conn.create_consumer(self.topic, self.endpoints,
fanout=False)
self.conn.consume_in_threads()
其endpoints为/neutron/api/rpc/handlers/l3_rpc.py:L3RpcCallback对象。其中,/neutron/agent/l3/agent.py:L3PluginApi将创建topic为为L3PLUGIN = 'q-l3-plugin'的RPC-client(在neutron-l3-agent服务启动时创建的)去调用/neutron/api/rpc/handlers/l3_rpc.py:L3RpcCallback类中的函数。
#/neutron/agent/l3/agent.py:L3PluginApi
class L3PluginApi(object):
"""Agent side of the l3 agent RPC API.
API version history:
1.0 - Initial version.
1.1 - Floating IP operational status updates
1.2 - DVR support: new L3 plugin methods added.
- get_ports_by_subnet
- get_agent_gateway_port
Needed by the agent when operating in DVR/DVR_SNAT mode
1.3 - Get the list of activated services
1.4 - Added L3 HA update_router_state. This method was reworked in
to update_ha_routers_states
1.5 - Added update_ha_routers_states
"""
def __init__(self, topic, host):
self.host = host
target = oslo_messaging.Target(topic=topic, version='1.0')
self.client = n_rpc.get_client(target)
#/neutron/agent/l3/agent.py:L3NATAgent
class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
ha.AgentMixin,
dvr.AgentMixin,
manager.Manager):
"""Manager for L3NatAgent
API version history:
1.0 initial Version
1.1 changed the type of the routers parameter
to the routers_updated method.
It was previously a list of routers in dict format.
It is now a list of router IDs only.
Per rpc versioning rules, it is backwards compatible.
1.2 - DVR support: new L3 agent methods added.
- add_arp_entry
- del_arp_entry
Needed by the L3 service when dealing with DVR
"""
target = oslo_messaging.Target(version='1.2')
def __init__(self, host, conf=None):
if conf:
self.conf = conf
else:
self.conf = cfg.CONF
self.router_info =
self._check_config_params()
self.process_monitor = external_process.ProcessMonitor(
config=self.conf,
resource_type='router')
... ... ...
self.context = n_context.get_admin_context_without_session()
self.plugin_rpc = L3PluginApi(topics.L3PLUGIN, host)
self.fullsync = True
... ... ...
self._queue = queue.RouterProcessingQueue()
super(L3NATAgent, self).__init__(conf=self.conf)
self.target_ex_net_id = None
self.use_ipv6 = ipv6_utils.is_enabled()
if self.conf.enable_metadata_proxy:
self.metadata_driver = metadata_driver.MetadataDriver(self)
#/neutron/agent/l3/agent.py:L3NATAgentWithStateReport
class L3NATAgentWithStateReport(L3NATAgent):
def __init__(self, host, conf=None):
self.use_call = True
super(L3NATAgentWithStateReport, self).__init__(host=host, conf=conf)
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
self.agent_state =
'binary': 'neutron-l3-agent',
'host': host,
'topic': topics.L3_AGENT,
'configurations':
'agent_mode': self.conf.agent_mode,
'use_namespaces': self.conf.use_namespaces,
'router_id': self.conf.router_id,
'handle_internal_only_routers':
self.conf.handle_internal_only_routers,
'external以上是关于neutron-server的启动流程的主要内容,如果未能解决你的问题,请参考以下文章