Kubelet源码分析之volume manager源码分析

Posted K8S中文社区

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kubelet源码分析之volume manager源码分析相关的知识,希望对你有一定的参考价值。


 kubernetes version: v1.12.1


kubernetes采用Volume Plugins来实现存储卷的挂载等操作。


这图谁画的咋这么好呢,借来用用!!!


Kubelet源码分析之volume manager源码分析


kubelet会调用VolumeManager,为pods准备存储设备,存储设备就绪会挂载存储设备到pod所在的节点上,并在容器启动的时候挂载在容器指定的目录中;同时,删除卸载不在使用的存储


VolumeManager接口


  • 运行在kubelet 里让存储Ready的部件,主要是mount/unmount(attach/detach可选)


  • pod调度到这个node上后才会有卷的相应操作,所以它的触发端是kubelet(严格讲是kubelet里的pod manager),根据Pod Manager里pod spec里申明的存储来触发卷的挂载操作


  • Kubelet会监听到调度到该节点上的pod声明,会把pod缓存到Pod Manager中,VolumeManager通过Pod Manager获取PV/PVC的状态,并进行分析出具体的attach/detach、mount/umount, 操作然后调用plugin进行相应的业务处理


 
   
   
 
  1. // VolumeManager runs a set of asynchronous loops that figure out which volumes

  2. // need to be attached/mounted/unmounted/detached based on the pods scheduled on

  3. // this node and makes it so.

  4. type VolumeManager interface {


  5.    Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})


  6.    WaitForAttachAndMount(pod *v1.Pod) error


  7.    GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap


  8.    GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64


  9.    GetVolumesInUse() []v1.UniqueVolumeName


  10.    ReconcilerStatesHasBeenSynced() bool


  11.    VolumeIsAttached(volumeName v1.UniqueVolumeName) bool


  12.    MarkVolumesAsReportedInUse(volumesReportedAsInUse []v1.UniqueVolumeName)

  13. }

volumeManager结构体

   路径:pkg/kubelet/volumemanager/volume_manager.go

  • volumeManager结构体实现了VolumeManager接口,主要有两个需要注意:

  • desiredStateOfWorld:预期状态,volume需要被attach,哪些pods引用这个volume

  • actualStateOfWorld:实际状态,volume已经被atttach哪个node,哪个pod mount volume


   
     
     
   
  1. / volumeManager implements the VolumeManager interface

  2. type volumeManager struct {

  3.    // kubeClient is the kube API client used by DesiredStateOfWorldPopulator to

  4.    // communicate with the API server to fetch PV and PVC objects

  5.    kubeClient clientset.Interface


  6.    // volumePluginMgr is the volume plugin manager used to access volume

  7.    // plugins. It must be pre-initialized.

  8.    volumePluginMgr *volume.VolumePluginMgr


  9.    // desiredStateOfWorld is a data structure containing the desired state of

  10.    // the world according to the volume manager: i.e. what volumes should be

  11.    // attached and which pods are referencing the volumes).

  12.    // The data structure is populated by the desired state of the world

  13.    // populator using the kubelet pod manager.

  14.    desiredStateOfWorld cache.DesiredStateOfWorld


  15.    // actualStateOfWorld is a data structure containing the actual state of

  16.    // the world according to the manager: i.e. which volumes are attached to

  17.    // this node and what pods the volumes are mounted to.

  18.    // The data structure is populated upon successful completion of attach,

  19.    // detach, mount, and unmount actions triggered by the reconciler.

  20.    actualStateOfWorld cache.ActualStateOfWorld


  21.    // operationExecutor is used to start asynchronous attach, detach, mount,

  22.    // and unmount operations.

  23.    operationExecutor operationexecutor.OperationExecutor


  24.    // reconciler runs an asynchronous periodic loop to reconcile the

  25.    // desiredStateOfWorld with the actualStateOfWorld by triggering attach,

  26.    // detach, mount, and unmount operations using the operationExecutor.

  27.    reconciler reconciler.Reconciler


  28.    // desiredStateOfWorldPopulator runs an asynchronous periodic loop to

  29.    // populate the desiredStateOfWorld using the kubelet PodManager.

  30.    desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator

  31. }

1. volumeManager初始化

    路径: pkg/kubelet/kubelet.go文件中NewMainKubelet函数

    初始化VolumeManager结构体,结构体VolumeManager如上所示:

   
     
     
   
  1.    // setup volumeManager

  2.    klet.volumeManager = volumemanager.NewVolumeManager(

  3.        kubeCfg.EnableControllerAttachDetach,

  4.        nodeName,

  5.        klet.podManager,

  6.        klet.statusManager,

  7.        klet.kubeClient,

  8.        klet.volumePluginMgr,

  9.        klet.containerRuntime,

  10.        kubeDeps.Mounter,

  11.        klet.getPodsDir(),

  12.        kubeDeps.Recorder,

  13.        experimentalCheckNodeCapabilitiesBeforeMount,

  14.        keepTerminatedPodVolumes)

1.1 启动VolumeManager的run方法

  • goroutine启动descriedStateOfWorldPopulator的Run函数,从apiserver同步到的pod信息,来更新DesiredStateOfWorld

  • goroutine启动reconciler的Run函数 ,预期状态和实际状态的协调者,负责调整实际状态至预期状态

   
     
     
   
  1. func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {

  2.    defer runtime.HandleCrash()


  3.    go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh)

  4.    glog.V(2).Infof("The desired_state_of_world populator starts")


  5.    glog.Infof("Starting Kubelet Volume Manager")

  6.    go vm.reconciler.Run(stopCh)


  7.    metrics.Register(vm.actualStateOfWorld, vm.desiredStateOfWorld, vm.volumePluginMgr)


  8.    <-stopCh

  9.    glog.Infof("Shutting down Kubelet Volume Manager")

  10. }

1.2 desiredStateOfWorldPopulator Run函数

从apiserver同步到的pod信息,来更新DesiredStateOfWorld,主要函数为populatorLoop函数第2章节讲解

   
     
     
   
  1. func (dswp *desiredStateOfWorldPopulator) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {

  2.    // Wait for the completion of a loop that started after sources are all ready, then set hasAddedPods accordingly

  3.    glog.Infof("Desired state populator starts to run")

  4.    wait.PollUntil(dswp.loopSleepDuration, func() (bool, error) {

  5.        done := sourcesReady.AllReady()

  6.        dswp.populatorLoop()

  7.        return done, nil

  8.    }, stopCh)

  9.    dswp.hasAddedPodsLock.Lock()

  10.    dswp.hasAddedPods = true

  11.    dswp.hasAddedPodsLock.Unlock()

  12.    wait.Until(dswp.populatorLoop, dswp.loopSleepDuration, stopCh)

  13. }

1.3 reconciler Run函数

预期状态和实际状态的协调者,负责调整实际状态至预期状态,reconcile函数第4章节讲解

   
     
     
   
  1. func (rc *reconciler) Run(stopCh <-chan struct{}) {

  2.    wait.Until(rc.reconciliationLoopFunc(), rc.loopSleepDuration, stopCh)

  3. }


  4. func (rc *reconciler) reconciliationLoopFunc() func() {

  5.    return func() {

  6.        rc.reconcile()


  7.        // Sync the state with the reality once after all existing pods are added to the desired state from all sources.

  8.        // Otherwise, the reconstruct process may clean up pods' volumes that are still in use because

  9.        // desired state of world does not contain a complete list of pods.

  10.        if rc.populatorHasAddedPods() && !rc.StatesHasBeenSynced() {

  11.            glog.Infof("Reconciler: start to sync state")

  12.            rc.sync()

  13.        }

  14.    }

  15. }

2. populatorLoop函数

findAndAddNewPods函数主要是调用podManager获取所有pods,调用processPodVolumes更新desiredStateOfWorld

   
     
     
   
  1. func (dswp *desiredStateOfWorldPopulator) populatorLoop() {

  2.    dswp.findAndAddNewPods()


  3.    // findAndRemoveDeletedPods() calls out to the container runtime to

  4.    // determine if the containers for a given pod are terminated. This is

  5.    // an expensive operation, therefore we limit the rate that

  6.    // findAndRemoveDeletedPods() is called independently of the main

  7.    // populator loop.

  8.    if time.Since(dswp.timeOfLastGetPodStatus) < dswp.getPodStatusRetryDuration {

  9.        glog.V(5).Infof(

  10.            "Skipping findAndRemoveDeletedPods(). Not permitted until %v (getPodStatusRetryDuration %v).",

  11.            dswp.timeOfLastGetPodStatus.Add(dswp.getPodStatusRetryDuration),

  12.            dswp.getPodStatusRetryDuration)


  13.        return

  14.    }


  15.    dswp.findAndRemoveDeletedPods()

  16. }

2.1 findAndAddNewPods函数

  • 调用podManager获取所有的pods

  • 调用processPodVolumes去更新desiredStateOfWorld,第3章节讲解

   
     
     
   
  1. // Iterate through all pods and add to desired state of world if they don't

  2. // exist but should

  3. func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {

  4.    // Map unique pod name to outer volume name to MountedVolume.

  5.    mountedVolumesForPod := make(map[volumetypes.UniquePodName]map[string]cache.MountedVolume)

  6.    if utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes) {

  7.        for _, mountedVolume := range dswp.actualStateOfWorld.GetMountedVolumes() {

  8.            mountedVolumes, exist := mountedVolumesForPod[mountedVolume.PodName]

  9.            if !exist {

  10.                mountedVolumes = make(map[string]cache.MountedVolume)

  11.                mountedVolumesForPod[mountedVolume.PodName] = mountedVolumes

  12.            }

  13.            mountedVolumes[mountedVolume.OuterVolumeSpecName] = mountedVolume

  14.        }

  15.    }


  16.    processedVolumesForFSResize := sets.NewString()

  17.    for _, pod := range dswp.podManager.GetPods() {

  18.        if dswp.isPodTerminated(pod) {

  19.            // Do not (re)add volumes for terminated pods

  20.            continue

  21.        }

  22.        dswp.processPodVolumes(pod, mountedVolumesForPod, processedVolumesForFSResize)

  23.    }

  24. }

3. processPodVolumes函数

更新desiredStateOfWorld

3.1 如果先前在processedPods map中,表示无需处理


   
     
     
   
  1.    uniquePodName := util.GetUniquePodName(pod)

  2.    if dswp.podPreviouslyProcessed(uniquePodName) {

  3.        return

  4.    }

3.2 对pod下所有container的volumeDevices与volumeMounts加入map中 

volumeDevices    <[]Object>

     volumeDevices is the list of block devices to be used by the container.

     This is an alpha feature and may change in the future.

   volumeMounts    <[]Object>

     Pod volumes to mount into the container's filesystem. Cannot be updated.


   
     
     
   
  1. func (dswp *desiredStateOfWorldPopulator) makeVolumeMap(containers []v1.Container) (map[string]bool, map[string]bool) {

  2.    volumeDevicesMap := make(map[string]bool)

  3.    volumeMountsMap := make(map[string]bool)


  4.    for _, container := range containers {

  5.        if container.VolumeMounts != nil {

  6.            for _, mount := range container.VolumeMounts {

  7.                volumeMountsMap[mount.Name] = true

  8.            }

  9.        }

  10.        // TODO: remove feature gate check after no longer needed

  11.        if utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) &&

  12.            container.VolumeDevices != nil {

  13.            for _, device := range container.VolumeDevices {

  14.                volumeDevicesMap[device.Name] = true

  15.            }

  16.        }

  17.    }


  18.    return volumeMountsMap, volumeDevicesMap

  19. }

3.3 AddPodToVolume函数

3.3.1 调用FindPluginBySpec函数根据volume.spec找到volume plugin


   
     
     
   
  1.    volumePlugin, err := dsw.volumePluginMgr.FindPluginBySpec(volumeSpec)

  2.    if err != nil || volumePlugin == nil {

  3.        return "", fmt.Errorf(

  4.            "failed to get Plugin from volumeSpec for volume %q err=%v",

  5.            volumeSpec.Name(),

  6.            err)

  7.    }

3.3.2 isAttachableVolume函数,调用NewAttacher()对plugin初始化

   
     
     
   
  1.    // The unique volume name used depends on whether the volume is attachable

  2.    // or not.

  3.    attachable := dsw.isAttachableVolume(volumeSpec)

  4.    if attachable {

  5.        // For attachable volumes, use the unique volume name as reported by

  6.        // the plugin.

  7.        volumeName, err =

  8.            util.GetUniqueVolumeNameFromSpec(volumePlugin, volumeSpec)

  9.        if err != nil {

  10.            return "", fmt.Errorf(

  11.                "failed to GetUniqueVolumeNameFromSpec for volumeSpec %q using volume plugin %q err=%v",

  12.                volumeSpec.Name(),

  13.                volumePlugin.GetPluginName(),

  14.                err)

  15.        }

  16.    } else {

  17.        // For non-attachable volumes, generate a unique name based on the pod

  18.        // namespace and name and the name of the volume within the pod.

  19.        volumeName = util.GetUniqueVolumeNameForNonAttachableVolume(podName, volumePlugin, volumeSpec)

  20.    }

3.3.3 记录volume与pod之间的关系

   
     
     
   
  1.        if _, volumeExists := dsw.volumesToMount[volumeName]; !volumeExists {

  2.        dsw.volumesToMount[volumeName] = volumeToMount{

  3.            volumeName:              volumeName,

  4.            podsToMount:             make(map[types.UniquePodName]podToMount),

  5.            pluginIsAttachable:      attachable,

  6.            pluginIsDeviceMountable: deviceMountable,

  7.            volumeGidValue:          volumeGidValue,

  8.            reportedInUse:           false,

  9.        }

  10.    }


  11.    // Create new podToMount object. If it already exists, it is refreshed with

  12.    // updated values (this is required for volumes that require remounting on

  13.    // pod update, like Downward API volumes).

  14.    dsw.volumesToMount[volumeName].podsToMount[podName] = podToMount{

  15.        podName:             podName,

  16.        pod:                 pod,

  17.        volumeSpec:          volumeSpec,

  18.        outerVolumeSpecName: outerVolumeSpecName,

  19.    }

3.3.4 对pod name标记为已处理,actual_state_of_world标记重新挂载

   
     
     
   
  1.        // some of the volume additions may have failed, should not mark this pod as fully processed

  2.    if allVolumesAdded {

  3.        dswp.markPodProcessed(uniquePodName)

  4.        // New pod has been synced. Re-mount all volumes that need it

  5.        // (e.g. DownwardAPI)

  6.        dswp.actualStateOfWorld.MarkRemountRequired(uniquePodName)

  7.    }

4. reconcile函数

路径 pkg/kubelet/volumemanager/reconciler/reconciler.go


   
     
     
   
  1. func (rc *reconciler) reconcile() {

  2.    // Unmounts are triggered before mounts so that a volume that was

  3.    // referenced by a pod that was deleted and is now referenced by another

  4.    // pod is unmounted from the first pod before being mounted to the new

  5.    // pod.

4.1 对于实际已经挂载的与预期不一样的需要unmount

UnmountVolume函数中处理,volume分为filesystem与block,在desiredStateOfWorld不包括actualStateOfWorld的情况需要unmount


   
     
     
   
  1.    // Ensure volumes that should be unmounted are unmounted.

  2.    for _, mountedVolume := range rc.actualStateOfWorld.GetMountedVolumes() {

  3.        if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {

  4.            // Volume is mounted, unmount it

  5.            glog.V(5).Infof(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", ""))

  6.            err := rc.operationExecutor.UnmountVolume(

  7.                mountedVolume.MountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)

  8.            if err != nil &&

  9.                !nestedpendingoperations.IsAlreadyExists(err) &&

  10.                !exponentialbackoff.IsExponentialBackoff(err) {

  11.                // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.

  12.                // Log all other errors.

  13.                glog.Errorf(mountedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())

  14.            }

  15.            if err == nil {

  16.                glog.Infof(mountedVolume.GenerateMsgDetailed("operationExecutor.UnmountVolume started", ""))

  17.            }

  18.        }

  19.    }

4.2 从desiredStateOfWorld中获取需要mount的volomes


调用PodExistesInVolume与实际podName进行对比


attachdVolumes不存在volumeName则为volumeNotAttachedError


mountedPods中存在podName,podObj.remountRequired为 newRemountRequiredError, podObj.fsResizeRequired为newFsResizeRequiredError


   
     
     
   
  1. // Ensure volumes that should be attached/mounted are attached/mounted.

  2.    for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {

  3.        volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)

  4.        volumeToMount.DevicePath = devicePath

4.3 err为volumeNotAttachedError的情况


如果volume没有attach,actualStateOfWorld结构体中attachedVolumes没有volumeName,在则调用AttachVolume操作

   
     
     
   
  1.       if cache.IsVolumeNotAttachedError(err) {

  2.            if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {

  3.                // Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait

  4.                // for controller to finish attaching volume.

  5.                glog.V(5).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.VerifyControllerAttachedVolume", ""))

  6.                err := rc.operationExecutor.VerifyControllerAttachedVolume(

  7.                    volumeToMount.VolumeToMount,

  8.                    rc.nodeName,

  9.                    rc.actualStateOfWorld)

  10.                if err != nil &&

  11.                    !nestedpendingoperations.IsAlreadyExists(err) &&

  12.                    !exponentialbackoff.IsExponentialBackoff(err) {

  13.                    // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.

  14.                    // Log all other errors.

  15.                    glog.Errorf(volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.VerifyControllerAttachedVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())

  16.                }

  17.                if err == nil {

  18.                    glog.Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume started", ""))

  19.                }

  20.            } else {

  21.                // Volume is not attached to node, kubelet attach is enabled, volume implements an attacher,

  22.                // so attach it

  23.                volumeToAttach := operationexecutor.VolumeToAttach{

  24.                    VolumeName: volumeToMount.VolumeName,

  25.                    VolumeSpec: volumeToMount.VolumeSpec,

  26.                    NodeName:   rc.nodeName,

  27.                }

  28.                glog.V(5).Infof(volumeToAttach.GenerateMsgDetailed("Starting operationExecutor.AttachVolume", ""))

  29.                err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)

  30.                if err != nil &&

  31.                    !nestedpendingoperations.IsAlreadyExists(err) &&

  32.                    !exponentialbackoff.IsExponentialBackoff(err) {

  33.                    // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.

  34.                    // Log all other errors.

  35.                    glog.Errorf(volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.AttachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())

  36.                }

  37.                if err == nil {

  38.                    glog.Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.AttachVolume started", ""))

  39.                }

  40.            }

  41.        }

原文链接:

https://blog.csdn.net/zhonglinzhang/article/details/82800287


推荐阅读

以上是关于Kubelet源码分析之volume manager源码分析的主要内容,如果未能解决你的问题,请参考以下文章

kubelet源码分析: CNI 插件处理流程

存储之volume

heapster源码分析——kubelet的api调用分析

Kubernetes存储之volume

Swoole 源码分析——Server模块之Worker事件循环

kubernetes之kubelet运行机制分析