openstack Rocky系列之Cinder:Cinder 创建一个卷

Posted 苏陌宁

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了openstack Rocky系列之Cinder:Cinder 创建一个卷相关的知识,希望对你有一定的参考价值。

  上一篇文章写了cinder服务的启动,下面讲一下openstack是如何通过openstack创建一个卷

  通过查看cinder的api-paste.ini文件,并且现在是v3版本的API,可以得知目前API的router文件是cinder/api/v3/router.py文件

  

  通过查看router.py文件,可以得知,对于volume的操作都会通过mapper重定向到cinder/api/v3/volume.py文件中进行处理

  

 

   看一下创建volume的源码

 1     def create(self, req, body):
 2         ........
 3         ........
 4         new_volume = self.volume_api.create(context,
 5                                             size,
 6                                             volume.get(\'display_name\'),
 7                                             volume.get(\'display_description\'),
 8                                             **kwargs)
 9 
10         retval = self._view_builder.detail(req, new_volume)
11 
12         return retval    

  此处调用了self.volume_api.create()去创建卷,self.volume_api 这个变量是VolumeController从V2的api继承过来来的,在初始话的时候被初始化为cinder.volume.api.API(),所以其create方法为cinder/volume/api.py中API类下的create方法

 1     def create(self, context, size, name, description, snapshot=None,
 2                image_id=None, volume_type=None, metadata=None,
 3                availability_zone=None, source_volume=None,
 4                scheduler_hints=None,
 5                source_replica=None, consistencygroup=None,
 6                cgsnapshot=None, multiattach=False, source_cg=None,
 7                group=None, group_snapshot=None, source_group=None,
 8                backup=None):
 9          .........   
10          .........
11         create_what = {
12             \'context\': context,
13             \'raw_size\': size,
14             \'name\': name,
15             \'description\': description,
16             \'snapshot\': snapshot,
17             \'image_id\': image_id,
18             \'raw_volume_type\': volume_type,
19             \'metadata\': metadata or {},
20             \'raw_availability_zone\': availability_zone,
21             \'source_volume\': source_volume,
22             \'scheduler_hints\': scheduler_hints,
23             \'key_manager\': self.key_manager,
24             \'optional_args\': {\'is_quota_committed\': False},
25             \'consistencygroup\': consistencygroup,
26             \'cgsnapshot\': cgsnapshot,
27             \'raw_multiattach\': multiattach,
28             \'group\': group,
29             \'group_snapshot\': group_snapshot,
30             \'source_group\': source_group,
31             \'backup\': backup,
32         }
33         try:
34             sched_rpcapi = (self.scheduler_rpcapi if (
35                             not cgsnapshot and not source_cg and
36                             not group_snapshot and not source_group)
37                             else None)
38             volume_rpcapi = (self.volume_rpcapi if (
39                              not cgsnapshot and not source_cg and
40                              not group_snapshot and not source_group)
41                              else None)
42             flow_engine = create_volume.get_flow(self.db,
43                                                  self.image_service,
44                                                  availability_zones,
45                                                  create_what,
46                                                  sched_rpcapi,
47                                                  volume_rpcapi)
48         except Exception:
49             msg = _(\'Failed to create api volume flow.\')
50             LOG.exception(msg)
51             raise exception.CinderException(msg)    

     此处调用了create_flow中的get_flow方法,进行传参和并创建,get_flow采用了taskflow,使用了taskflow中的线性流程,依次添加了ExtractVolumeRequestTesk(), QuotaReserveTask(), EntryCreateTask(), QuotaCommitTask() 以及VolumeCastTask()五个步骤

 1 def get_flow(db_api, image_service_api, availability_zones, create_what,
 2              scheduler_rpcapi=None, volume_rpcapi=None):
 3     """Constructs and returns the api entrypoint flow.
 4 
 5     This flow will do the following:
 6 
 7     1. Inject keys & values for dependent tasks.
 8     2. Extracts and validates the input keys & values.
 9     3. Reserves the quota (reverts quota on any failures).
10     4. Creates the database entry.
11     5. Commits the quota.
12     6. Casts to volume manager or scheduler for further processing.
13     """
14 
15     flow_name = ACTION.replace(":", "_") + "_api"
16     api_flow = linear_flow.Flow(flow_name)
17 
18     api_flow.add(ExtractVolumeRequestTask(
19         image_service_api,
20         availability_zones,
21         rebind={\'size\': \'raw_size\',
22                 \'availability_zone\': \'raw_availability_zone\',
23                 \'volume_type\': \'raw_volume_type\',
24                 \'multiattach\': \'raw_multiattach\'}))
25     api_flow.add(QuotaReserveTask(),
26                  EntryCreateTask(),
27                  QuotaCommitTask())
28 
29     if scheduler_rpcapi and volume_rpcapi:
30         # This will cast it out to either the scheduler or volume manager via
31         # the rpc apis provided.
32         api_flow.add(VolumeCastTask(scheduler_rpcapi, volume_rpcapi, db_api))
33 
34     # Now load (but do not run) the flow using the provided initial data.
35     return taskflow.engines.load(api_flow, store=create_what)

  taskflow会调用添加的每个步骤类的execute方法,taskflow是openstack中的一个重要组建,用于构建逻辑需要精准步骤的业务,涉及的东西比较多,暂时不在这里记录

       ExcuactVolumeRequestTask类主要对传过来的参数进行校验,提取各类参数,并根据参数进行zone、镜像等选取的操作,并为QuotaReserveTask 类传递参数

       QuotaReserveTask类进行配额检查以及占用

  EntryCreateTask类主要是是调用cinder.objects.volume.Volume.create()方法在database中创建记录

       QuotaCommitTask类在数据库中进行配额的确认

       VolumeCastTask类通过rpc对任务进行投递投递的对象为schduler_rpcapi

 

 

 scheduler_rpcapi在调用get_flow时已经指定

 

 

    此时cinder-scheduler接收到cinder-api传过来的请求,发送请求的代码部分为 cinder/scheduler/rpcapi.py

 1     def create_volume(self, ctxt, volume, snapshot_id=None, image_id=None,
 2                       request_spec=None, filter_properties=None,
 3                       backup_id=None):
 4         volume.create_worker()
 5         cctxt = self._get_cctxt()
 6         msg_args = {\'snapshot_id\': snapshot_id, \'image_id\': image_id,
 7                     \'request_spec\': request_spec,
 8                     \'filter_properties\': filter_properties,
 9                     \'volume\': volume, \'backup_id\': backup_id}
10         if not self.client.can_send_version(\'3.10\'):
11             msg_args.pop(\'backup_id\')
12         return cctxt.cast(ctxt, \'create_volume\', **msg_args)

  此处同样,cinder-scheduler接收为cinder-cherduler的cinder/scheduler/manager.SchedulerManager

 1     @objects.Volume.set_workers
 2     @append_operation_type()
 3     def create_volume(self, context, volume, snapshot_id=None, image_id=None,
 4                       request_spec=None, filter_properties=None,
 5                       backup_id=None):
 6         self._wait_for_scheduler()
 7 
 8         try:
 9             flow_engine = create_volume.get_flow(context,
10                                                  self.driver,
11                                                  request_spec,
12                                                  filter_properties,
13                                                  volume,
14                                                  snapshot_id,
15                                                  image_id,
16                                                  backup_id)
17         except Exception:
18             msg = _("Failed to create scheduler manager volume flow")
19             LOG.exception(msg)
20             raise exception.CinderException(msg)
21 
22         with flow_utils.DynamicLogListener(flow_engine, logger=LOG):
23             flow_engine.run()

  此处cinder-scheduler同样使用了taskflow模型对磁盘进行创建,看一下这个get_flow中包含的几个task类

 1 def get_flow(context, driver_api, request_spec=None,
 2              filter_properties=None,
 3              volume=None, snapshot_id=None, image_id=None, backup_id=None):
 4 
 5     create_what = {
 6         \'context\': context,
 7         \'raw_request_spec\': request_spec,
 8         \'filter_properties\': filter_properties,
 9         \'volume\': volume,
10         \'snapshot_id\': snapshot_id,
11         \'image_id\': image_id,
12         \'backup_id\': backup_id,
13     }
14 
15     flow_name = ACTION.replace(":", "_") + "_scheduler"
16     scheduler_flow = linear_flow.Flow(flow_name)
17 
18     # This will extract and clean the spec from the starting values.
19     scheduler_flow.add(ExtractSchedulerSpecTask(
20         rebind={\'request_spec\': \'raw_request_spec\'}))
21 
22     # This will activate the desired scheduler driver (and handle any
23     # driver related failures appropriately).
24     scheduler_flow.add(ScheduleCreateVolumeTask(driver_api))
25 
26     # Now load (but do not run) the flow using the provided initial data.
27     return taskflow.engines.load(scheduler_flow, store=create_what)

  ExtractSchedulerSpecTask 同样为对请求参数进行提取加工以供后续调用

      ScheduleCreateVolumeTask中execute中有两个操作,1调用drvier_api进行volume的创建,2.如果创建过程中出现失败,则通过message将消息发送给scheduler

 1     def execute(self, context, request_spec, filter_properties, volume):
 2         try:
 3             self.driver_api.schedule_create_volume(context, request_spec,
 4                                                    filter_properties)
 5         except Exception as e:
 6             self.message_api.create(
 7                 context,
 8                 message_field.Action.SCHEDULE_ALLOCATE_VOLUME,
 9                 resource_uuid=request_spec[\'volume_id\'],
10                 exception=e)
11             with excutils.save_and_reraise_exception(
12                     reraise=not isinstance(e, exception.NoValidBackend)):
13                 try:
14                     self._handle_failure(context, request_spec, e)
15                 finally:
16                     common.error_out(volume, reason=e)

  此时driver_api为SchedulerManager初始化时的scheduler_driver

 

 

   可知driver_api为cinder.cheduler.filter_scheduler.FilterScheduler

 1     def schedule_create_volume(self, context, request_spec, filter_properties):
 2         backend = self._schedule(context, request_spec, filter_properties)
 3 
 4         if not backend:
 5             raise exception.NoValidBackend(reason=_("No weighed backends "
 6                                                     "available"))
 7 
 8         backend = backend.obj
 9         volume_id = request_spec[\'volume_id\']
10 
11         updated_volume = driver.volume_update_db(
12             context, volume_id,
13             backend.host,
14             backend.cluster_name,
15             availability_zone=backend.service[\'availability_zone\'])
16         self._post_select_populate_filter_properties(filter_properties,
17                                                      backend)
18 
19         # context is not serializable
20         filter_properties.pop(\'context\', None)
21 
22         self.volume_rpcapi.create_volume(context, updated_volume, request_spec,
23                                          filter_properties,
24                                          allow_reschedule=True)

  self._schedule 通过传入的参数对后端的进行选择(多个后端的情况下)

  最后调用self.volume_rpcapi.create_volume进行volume的创建,volume_api为volume_rpcapi.VolumeAPI()

  

1     def create_volume(self, ctxt, volume, request_spec, filter_properties,
2                       allow_reschedule=True):
3         cctxt = self._get_cctxt(volume.service_topic_queue)
4         cctxt.cast(ctxt, \'create_volume\',
5                    request_spec=request_spec,
6                    filter_properties=filter_properties,
7                    allow_reschedule=allow_reschedule,
8                    volume=volume)

  此时cinder-schedule通过rpc对cinder-volume发送创建volume的消息,接收消息的是cinder-volume的VolumeManager

  1     @objects.Volume.set_workers
  2     def create_volume(self, context, volume, request_spec=None,
  3                       filter_properties=None, allow_reschedule=True):
  4         """Creates the volume."""
  5         utils.log_unsupported_driver_warning(self.driver)
  6 
  7         self._set_resource_host(volume)
  8 
  9         self._update_allocated_capacity(volume)
 10         # We lose the host value if we reschedule, so keep it here
 11         original_host = volume.host
 12 
 13         context_elevated = context.elevated()
 14         if filter_properties is None:
 15             filter_properties = {}
 16 
 17         if request_spec is None:
 18             request_spec = objects.RequestSpec()
 19 
 20         try:
 21             # NOTE(flaper87): Driver initialization is
 22             # verified by the task itself.
 23             flow_engine = create_volume.get_flow(
 24                 context_elevated,
 25                 self,
 26                 self.db,
 27                 self.driver,
 28                 self.scheduler_rpcapi,
 29                 self.host,
 30                 volume,
 31                 allow_reschedule,
 32                 context,
 33                 request_spec,
 34                 filter_properties,
 35                 image_volume_cache=self.image_volume_cache,
 36             )
 37         except Exception:
 38             msg = _("Create manager volume flow failed.")
 39             LOG.exception(msg, resource={\'type\': \'volume\', \'id\': volume.id})
 40             raise exception.CinderException(msg)
 41 
 42         snapshot_id = request_spec.get(\'snapshot_id\')
 43         source_volid = request_spec.get(\'source_volid\')
 44 
 45         if snapshot_id is not None:
 46             # Make sure the snapshot is not deleted until we are done with it.
 47             locked_action = "%s-%s" % (snapshot_id, \'delete_snapshot\')
 48         elif source_volid is not None:
 49             # Make sure the volume is not deleted until we are done with it.
 50             locked_action = "%s-%s" % (source_volid, \'delete_volume\')
 51         else:
 52             locked_action = None
 53 
 54         def _run_flow():
 55             # This code executes create volume flow. If something goes wrong,
 56             # flow reverts all job that was done and reraises an exception.
 57             # Otherwise, all data that was generated by flow becomes available
 58             # in flow engine\'s storage.
 59             with flow_utils.DynamicLogListener(flow_engine, logger=LOG):
 60                 flow_engine.run()
 61 
 62         # NOTE(dulek): Flag to indicate if volume was rescheduled. Used to
 63         # decide if allocated_capacity should be incremented.
 64         rescheduled = False
 65 
 66         try:
 67             if locked_action is None:
 68                 _run_flow()
 69             else:
 70                 with coordination.COORDINATOR.get_lock(locked_action):
 71                     _run_flow()
 72         finally:
 73             try:
 74                 flow_engine.storage.fetch(\'refreshed\')
 75             except tfe.NotFound:
 76                 # If there\'s no vol_ref, then flow is reverted. Lets check out
 77                 # if rescheduling occurred.
 78                 try:
 79                     rescheduled = flow_engine.storage.get_revert_result(
 80                         create_volume.OnFailureRescheduleTask.make_name(
 81                             [create_volume.ACTION]))
 82                 except tfe.NotFound:
 83                     pass
 84 
 85             if rescheduled:
 86                 # NOTE(geguileo): Volume was rescheduled so we need to update
 87                 # volume stats because the volume wasn\'t created here.
 88                 # Volume.host is None now, so we pass the original host value.
 89                 self._update_allocated_capacity(volume, decrement=True,
 90                                                 host=original_host)
 91 
 92         # Shared targets is only relevant for iSCSI connections.
 93         # We default to True to be on the safe side.
 94         volume.shared_targets = (
 95             self.driver.capabilities.get(\'storage_protocol\') == \'iSCSI\' and
 96             self.driver.capabilities.get(\'shared_targets\', True))
 97         # TODO(geguileo): service_uuid won\'t be enough on Active/Active
 98         # deployments. There can be 2 services handling volumes from the same
 99         # backend.
100         volume.service_uuid = self.service_uuid
101         volume.save()
102 
103         LOG.info("Created volume successfully.", resource=volume)
104         return volume.id

  上述代码中,同样使用了taskflow, 

  ExtractVolumeRefTask为提取数据库中volume的具体信息

  OnFailureRescheduleTask中execute并无操作,但是revert中有操作,是为了以后的步骤出现错误进行回滚进行部分操作。

  ExtractVolumeSpecTask 提取spec信息

  NotifyVolumeActionTask 广播volume开始创建的消息

 1 def get_flow(context, manager, db, driver, scheduler_rpcapi, host, volume,
 2              allow_reschedule, reschedule_context, request_spec,
 3              filter_properties, image_volume_cache=None):
 4 
 5     flow_name = ACTION.replace(":", "_") + "_manager"
 6     volume_flow = linear_flow.Flow(flow_name)
 7 
 8     # This injects the initial starting flow values into the workflow so that
 9     # the dependency order of the tasks provides/requires can be correctly
10     # determined.
11     create_what = {
12         \'context\': context,
13         \'filter_properties以上是关于openstack Rocky系列之Cinder:Cinder 创建一个卷的主要内容,如果未能解决你的问题,请参考以下文章

Openstack 安装部署指南翻译系列 之 Cinder服务安装(Block Storage)

OpenStack实践系列⑨云硬盘服务Cinder

OpenStack入门之核心组件梳理——Cinder篇

OpenStack的部署之Cinder项目(7-8)

OpenStack的部署之Cinder项目(7-8)

openstack组件之cinder