[源码分析]Kubernests-csi与Openstack-Cinder使用Ceph-rbd创建快照过程对比及源码分析

Posted cocoㄋㄟㄋㄟ好喝到咩噗茶

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[源码分析]Kubernests-csi与Openstack-Cinder使用Ceph-rbd创建快照过程对比及源码分析相关的知识,希望对你有一定的参考价值。

Kubernests-csi与Openstack-Cinder使用Ceph-rbd创建快照过程对比及源码分析


Ceph版本:v14.2.20(nautilus)
Cinder版本:v3.5.0(queens)
ceph-csi版本:release-v3.3

Cinder

cinder是开源项目openstack中提供块存储服务的子项目,该项目由python开发,主要是为虚拟机实例提供虚拟磁盘。

代码链接如下:
openstack/cinder:queens

  • 创建快照:
    代码位于 cinder/cinder/volume/drivers/rbd.py
    方法create_snapshot
    def create_snapshot(self, snapshot):
        """Creates an rbd snapshot."""
        with RBDVolumeProxy(self, snapshot.volume_name) as volume:
            snap = utils.convert_str(snapshot.name)
            volume.create_snap(snap)
            volume.protect_snap(snap)

过程很简单,

  1. 直接创建快照
  2. 然后对快照进行protect
  • 快照恢复卷
    代码位于 cinder/cinder/volume/drivers/rbd.py
    方法 create_volume_from_snapshot
    def create_volume_from_snapshot(self, volume, snapshot):
        """Creates a volume from a snapshot."""
        volume_update = self._clone(volume, self.configuration.rbd_pool,
                                    snapshot.volume_name, snapshot.name)
        if self.configuration.rbd_flatten_volume_from_snapshot:
            self._flatten(self.configuration.rbd_pool, volume.name)
        if int(volume.size):
            self._resize(volume)
        return volume_update

过程也很简单,
1、直接克隆快照,
2、根据用户配置参数判断是否还原快照克隆出来的镜像
3、扩容等

  • 克隆卷
    代码位于 cinder/cinder/volume/drivers/rbd.py
    方法 create_cloned_volume
    def create_cloned_volume(self, volume, src_vref):
        """Create a cloned volume from another volume.

        Since we are cloning from a volume and not a snapshot, we must first
        create a snapshot of the source volume.

        The user has the option to limit how long a volume's clone chain can be
        by setting rbd_max_clone_depth. If a clone is made of another clone
        and that clone has rbd_max_clone_depth clones behind it, the dest
        volume will be flattened.
        """
        src_name = utils.convert_str(src_vref.name)
        dest_name = utils.convert_str(volume.name)
        clone_snap = "%s.clone_snap" % dest_name

        # Do full copy if requested
        if self.configuration.rbd_max_clone_depth <= 0:
            with RBDVolumeProxy(self, src_name, read_only=True) as vol:
                vol.copy(vol.ioctx, dest_name)
                self._extend_if_required(volume, src_vref)
            return

        # Otherwise do COW clone.
        with RADOSClient(self) as client:
            src_volume = self.rbd.Image(client.ioctx, src_name)
            LOG.debug("creating snapshot='%s'", clone_snap)
            try:
                # Create new snapshot of source volume
                src_volume.create_snap(clone_snap)
                src_volume.protect_snap(clone_snap)
                # Now clone source volume snapshot
                LOG.debug("cloning '%(src_vol)s@%(src_snap)s' to "
                          "'%(dest)s'",
                          {'src_vol': src_name, 'src_snap': clone_snap,
                           'dest': dest_name})
                self.RBDProxy().clone(client.ioctx, src_name, clone_snap,
                                      client.ioctx, dest_name,
                                      features=client.features)
            except Exception as e:
                src_volume.unprotect_snap(clone_snap)
                src_volume.remove_snap(clone_snap)
                src_volume.close()
                msg = (_("Failed to clone '%(src_vol)s@%(src_snap)s' to "
                         "'%(dest)s', error: %(error)s") %
                       {'src_vol': src_name,
                        'src_snap': clone_snap,
                        'dest': dest_name,
                        'error': e})
                LOG.exception(msg)
                raise exception.VolumeBackendAPIException(data=msg)

            depth = self._get_clone_depth(client, src_name)
            # If dest volume is a clone and rbd_max_clone_depth reached,
            # flatten the dest after cloning. Zero rbd_max_clone_depth means
            # infinite is allowed.
            if depth >= self.configuration.rbd_max_clone_depth:
                LOG.info("maximum clone depth (%d) has been reached - "
                         "flattening dest volume",
                         self.configuration.rbd_max_clone_depth)
                dest_volume = self.rbd.Image(client.ioctx, dest_name)
                try:
                    # Flatten destination volume
                    LOG.debug("flattening dest volume %s", dest_name)
                    dest_volume.flatten()
                except Exception as e:
                    msg = (_("Failed to flatten volume %(volume)s with "
                             "error: %(error)s.") %
                           {'volume': dest_name,
                            'error': e})
                    LOG.exception(msg)
                    src_volume.close()
                    raise exception.VolumeBackendAPIException(data=msg)
                finally:
                    dest_volume.close()

                try:
                    # remove temporary snap
                    LOG.debug("remove temporary snap %s", clone_snap)
                    src_volume.unprotect_snap(clone_snap)
                    src_volume.remove_snap(clone_snap)
                except Exception as e:
                    msg = (_("Failed to remove temporary snap "
                             "%(snap_name)s, error: %(error)s") %
                           {'snap_name': clone_snap,
                            'error': e})
                    LOG.exception(msg)
                    src_volume.close()
                    raise exception.VolumeBackendAPIException(data=msg)

            try:
                volume_update = self._enable_replication_if_needed(volume)
            except Exception:
                self.RBDProxy().remove(client.ioctx, dest_name)
                src_volume.unprotect_snap(clone_snap)
                src_volume.remove_snap(clone_snap)
                err_msg = (_('Failed to enable image replication'))
                raise exception.ReplicationError(reason=err_msg,
                                                 volume_id=volume.id)
            finally:
                src_volume.close()

            self._extend_if_required(volume, src_vref)

        LOG.debug("clone created successfully")
        return volume_update

代码逻辑较长,我们只看重点

                src_volume.create_snap(clone_snap)
                src_volume.protect_snap(clone_snap)
                self.RBDProxy().clone(client.ioctx, src_name, clone_snap,
                                      client.ioctx, dest_name,
                                      features=client.features)

排除异常和改变默认参数设定后,仅为3步:
1、创建快照
2、保护快照
2、克隆快照

Ceph-csi

csi(Container StorageInterface)是开源项目Kubernetes从1.9版本开始引入容器存储接口,用于在Kubernetes和外部存储系统之间建立一套标准的存储管理接口,通过该接口为容器提供存储服务。

ceph-csi是开源分布式存储项目Ceph是Kubernetes-csi的一个具体实现项目,该项目由golang开发,即实现了rbd块存储,也实现了cephfs文件存储。
代码链接如下:
ceph-csi:release-v3.3

  • 创建快照
    代码位于internal/rbd/controllerserver.go
    函数入口为CreateSnapshot,我们直接关注重点函数 doSnapshotClone(函数名有些怪异,如果分析错了,请指正)
func (cs *ControllerServer) doSnapshotClone(ctx context.Context, parentVol *rbdVolume, rbdSnap *rbdSnapshot, cr *util.Credentials) (bool, *rbdVolume, error) {
	// generate cloned volume details from snapshot
	cloneRbd := generateVolFromSnap(rbdSnap)
	defer cloneRbd.Destroy()
	// add image feature for cloneRbd
	f := []string{librbd.FeatureNameLayering, librbd.FeatureNameDeepFlatten}
	cloneRbd.imageFeatureSet = librbd.FeatureSetFromNames(f)
	ready := false

	err := cloneRbd.Connect(cr)
	if err != nil {
		return ready, cloneRbd, err
	}

	err = createRBDClone(ctx, parentVol, cloneRbd, rbdSnap, cr)
	if err != nil {
		util.ErrorLog(ctx, "failed to create snapshot: %v", err)
		return ready, cloneRbd, status.Error(codes.Internal, err.Error())
	}

	defer func() {
		if err != nil {
			if !errors.Is(err, ErrFlattenInProgress) {
				// cleanup clone and snapshot
				errCleanUp := cleanUpSnapshot(ctx, cloneRbd, rbdSnap, cloneRbd, cr)
				if errCleanUp != nil {
					util.ErrorLog(ctx, "failed to cleanup snapshot and clone: %v", errCleanUp)
				}
			}
		}
	}()

	if parentVol.isEncrypted() {
		cryptErr := parentVol.copyEncryptionConfig(&cloneRbd.rbdImage)
		if cryptErr != nil {
			util.WarningLog(ctx, "failed copy encryption "+
				"config for %q: %v", cloneRbd.String(), cryptErr)
			return ready, nil, status.Errorf(codes.Internal,
				err.Error())
		}
	}

	err = cloneRbd.createSnapshot(ctx, rbdSnap)
	if err != nil {
		// update rbd image name for logging
		rbdSnap.RbdImageName = cloneRbd.RbdImageName
		util.ErrorLog(ctx, "failed to create snapshot %s: %v", rbdSnap, err)
		return ready, cloneRbd, err
	}

	err = cloneRbd.getImageID()
	if err != nil {
		util.ErrorLog(ctx, "failed to get image id: %v", err)
		return ready, cloneRbd, err
	}
	var j = &journal.Connection{}
	// save image ID
	j, err = snapJournal.Connect(rbdSnap.Monitors, rbdSnap.RadosNamespace, cr)
	if err != nil {
		util.ErrorLog(ctx, "failed to connect to cluster: %v", err)
		return ready, cloneRbd, err
	}
	defer j.Destroy()

	err = j.StoreImageID(ctx, rbdSnap.JournalPool, rbdSnap.ReservedID, cloneRbd.ImageID)
	if err != nil {
		util.ErrorLog(ctx, "failed to reserve volume id: %v", err)
		return ready, cloneRbd, err
	}

	err = cloneRbd.flattenRbdImage(ctx, cr, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth)
	if err != nil {
		if errors.Is(err, ErrFlattenInProgress) {
			return ready, cloneRbd, nil
		}
		return ready, cloneRbd, err
	}
	ready = true
	return ready, cloneRbd, nil
}

其中第一步,函数createRBDClone(ctx, parentVol, cloneRbd, rbdSnap, cr)过程较为复杂,需要展开分析,
代码位于internal/rbd/snapshot.go

func createRBDClone(ctx context.Context, parentVol, cloneRbdVol *rbdVolume, snap *rbdSnapshot, cr *util.Credentials) error {
	// create snapshot
	err := parentVol.createSnapshot(ctx, snap)
	if err != nil {
		util.ErrorLog(ctx, "failed to create snapshot %s: %v", snap, err)
		return err
	}

	snap.RbdImageName = parentVol.RbdImageName
	// create clone image and delete snapshot
	err = cloneRbdVol.cloneRbdImageFromSnapshot(ctx, snap)
	if err != nil {
		util.ErrorLog(ctx, "failed to clone rbd image %s from snapshot %s: %v", cloneRbdVol.RbdImageName, snap.RbdSnapName, err)
		err = fmt.Errorf("failed to clone rbd image %s from snapshot %s: %w", cloneRbdVol.RbdImageName, snap.RbdSnapName, err)
	}
	errSnap := parentVol.deleteSnapshot(ctx, snap)
	if errSnap != nil {
		util.ErrorLog(ctx, "failed to delete snapshot: %v", errSnap)
		delErr := deleteImage(ctx, cloneRbdVol, cr)
		if delErr != nil {
			util.ErrorLog(ctx, "failed to delete rbd image: %s with error: %v", cloneRbdVol, delErr)
		}
		return err
	}

	err = cloneRbdVol.getImageInfo()
	if err != nil {
		util.ErrorLog(ctx, "failed to get rbd image: %s details with error: %v", cloneRbdVol, err)
		delErr := deleteImage(ctx, cloneRbdVol, cr)
		if delErr != nil {
			util.ErrorLog(ctx, "failed to delete rbd image: %s with error: %v", cloneRbdVol, delErr)
		}
		return err
	}

	return nil
}

其中有很多校验和异常场景我们忽略,只看正常流程,
1、克隆

​ a、 源image创建快照

​ b、克隆快照

​ c、删除1.a创建的快照

2、用1.b克隆快照得到的image创建快照

3、还原1.b快照克隆出来的镜像

存疑:源码中未看到快照保护和解保护流程,而进行clone与删除操作。欢迎讨论与指正

  • 快照恢复
    代码位于 internal/rbd/controllerserver.go
    函数createVolumeFromSnapshot
func (cs *ControllerServer) createVolumeFromSnapshot(ctx context.Context, cr *util.Credentials, secrets map[string]string, rbdVol *rbdVolume, snapshotID string) error {
	rbdSnap := &rbdSnapshot{}
	if acquired := cs.SnapshotLocks.TryAcquire(snapshotID); !acquired {
		util.ErrorLog(ctx, util.SnapshotOperationAlreadyExistsFmt, snapshotID)
		return status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, snapshotID)
	}
	defer cs.SnapshotLocks.Release(snapshotID)

	err := genSnapFromSnapID(ctx, rbdSnap, snapshotID, cr, secrets)
	if err != nil {
		if errors.Is(err, util.ErrPoolNotFound) {
			util.ErrorLog(ctx, "failed to get backend snapshot for %s: %v", snapshotID, err)
			return status.Error(codes.InvalidArgument, err.Error())
		}
		return status.Error(codes.Internal, err.Error())
	}

	// update parent name(rbd image name in snapshot)
	rbdSnap.RbdImageName = rbdSnap.RbdSnapName
	// create clone image and delete snapshot
	err = rbdVol.cloneRbdImageFromSnapshot(ctx, rbdSnap)
	if err != nil {
		util.ErrorLog(ctx, "failed to clone rbd image %s from snapshot %s: %v", rbdVol, rbdSnap, err)
		return err
	}

	util.DebugLog(ctx, "create volume %s from snapshot %s", rbdVol.RequestName, rbdSnap.RbdSnapName)
	return nil
}

过程比较简单,只是clone快照

  • 克隆卷
    代码位于 internal/rbd/clone.gp
    函数 createCloneFromImage
func (rv *rbdVolume) createCloneFromImage(ctx context.Context, parentVol *rbdVolume) error {
	// generate temp cloned volume
	tempClone := rv.generateTempClone()
	// snapshot name is same as temporary cloned image, This helps to
	// flatten the temporary cloned images as we cannot have more than 510
	// snapshots on an rbd image
	tempSnap := &rbdSnapshot{}
	tempSnap.RbdSnapName = tempClone.RbdImageName
	tempSnap.Pool = rv.Pool

	cloneSnap := &rbdSnapshot{}
	cloneSnap.RbdSnapName = rv.RbdImageName
	cloneSnap.Pool = rv.Pool

	var (
		errClone   error
		errFlatten error
		err        error
	)
	var j = &journal.Connection{}

	j, err = volJournal.Connect(rv.Monitors, rv.RadosNamespace, rv.conn.Creds)
	if err != nil {
		return status.Error(codes.Internal, err.Error())
	}
	defer j.Destroy()

	// create snapshot and temporary clone and delete snapshot
	err = createRBDClone(ctx, parentVol, tempClone, tempSnap, rv.conn.Creds)
	if err != nil {
		return err
	}

	defer func() {
		if err != nil || errClone != nil {
			cErr := cleanUpSnapshot(ctx, tempClone, cloneSnap, rv, rv.conn.Creds)
			if cErr != nil {
				util.ErrorLog(ctx, "failed to cleanup image %s or snapshot %s: %v", cloneSnap, tempClone, cErr)
			}
		}

		if err != nil || errFlatten != nil {
			if !errors.Is(errFlatten, ErrFlattenInProgress) {
				// cleanup snapshot
				cErr := cleanUpSnapshot(ctx, parentVol, tempSnap, tempClone, rv.conn.Creds)
				if cErr != nil {
					util.ErrorLog(ctx, "failed to cleanup image %s or snapshot %s: %v", tempSnap, tempClone, cErr)
				}
			}
		}
	}()
	// flatten clone
	errFlatten = tempClone.flattenRbdImage(ctx, rv.conn.Creds, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth)
	if errFlatten != nil {
		return errFlatten
	}
	// create snap of temp clone from temporary cloned image
	// create final clone
	// delete snap of temp clone
	errClone = createRBDClone(ctx, tempClone, rv, cloneSnap[源码分析]Kubernests-csi与Openstack-Cinder使用Ceph-rbd创建快照过程对比及源码分析

[源码分析]Kubernests-csi与Openstack-Cinder使用Ceph-rbd创建快照过程对比及源码分析

Linux内核 eBPF基础:perf基础perf_event_open系统调用内核源码分析

open-ethereum-pool以太坊矿池源码分析-redis模块

android adb源码分析

openfalcon源码分析之Judge