kubernetes pv-controller 解析

Posted 阿里云云栖号

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kubernetes pv-controller 解析相关的知识,希望对你有一定的参考价值。

简介:pv controller是 kcm 的组件之一,它负责处理集群中的pvc/pv对象,对pvc/pv 对象进行状态转换。本文将基于 kubernetes 1.23进行解析。

作者 | 牧琦
来源 | 阿里技术公众号

基于 kubernetes 1.23

一 简介

pv controller是 kcm 的组件之一,它负责处理集群中的pvc/pv对象,对pvc/pv 对象进行状态转换。

二 pvController 初始化

初始化代码在 pkg/controller/volume/persistentvolume/pv_controller_base.go 文件中,NewController 主要做了如下几件事情

  • 初始化 eventRecorder
  • 初始化 PersistentVolumeController 对象,
  • 调用 VolumePluginMgr.InitPlugins() 方法 初始化存储插件,代码存在于 pkg/volume/plugins.go 文件中
  • 开始创建 informer 监听集群内的资源,初始化了如下 informer

    • PersistentVolumeInformer
    • PersistentVolumeClaimInformer
    • StorageClassInformer
    • PodInformer
    • NodeInformer
  • 将 PV & PVC 的 event 分别放入 volumeQueue & claimQueue
  • 为了不每次都迭代 pods ,自定义一个通过 pvc 键索引 pod 的索引器
  • 初始化 intree 存储 -> csi 迁移相关功能的 manager

NewController代码在cmd/kube-controller-manager代码里面被调用,初始化成功之后紧接着调用go Run()方法运行 pvController

三 开始运行

// 开始运行 pvController 
func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct) 
  defer utilruntime.HandleCrash()
  defer ctrl.claimQueue.ShutDown()
  defer ctrl.volumeQueue.ShutDown()

  klog.Infof("Starting persistent volume controller")
  defer klog.Infof("Shutting down persistent volume controller")

  if !cache.WaitForNamedCacheSync("persistent volume", stopCh, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced, ctrl.podListerSynced, ctrl.NodeListerSynced) 
    return
  

  ctrl.initializeCaches(ctrl.volumeLister, ctrl.claimLister)

  go wait.Until(ctrl.resync, ctrl.resyncPeriod, stopCh)
  go wait.Until(ctrl.volumeWorker, time.Second, stopCh)
  go wait.Until(ctrl.claimWorker, time.Second, stopCh)

  metrics.Register(ctrl.volumes.store, ctrl.claims, &ctrl.volumePluginMgr)

  <-stopCh

同步缓存之后开始周期性执行 ctrl.resync,ctrl.volumeWorker , ctrl.claimWorker , 我们看下 initalizeCaches 方法

func (ctrl *PersistentVolumeController) initializeCaches(volumeLister corelisters.PersistentVolumeLister, claimLister corelisters.PersistentVolumeClaimLister) 
  // 这里不访问 apiserver,是从本地缓存拿出的对象,这些对象不可以被外部函数修改
  volumeList, err := volumeLister.List(labels.Everything())
  if err != nil 
    klog.Errorf("PersistentVolumeController can't initialize caches: %v", err)
    return
  
  for _, volume := range volumeList 
    // 我们不能改变 volume 对象,所以这里我们copy一份新对象,对新对象进行操作
    volumeClone := volume.DeepCopy()
    if _, err = ctrl.storeVolumeUpdate(volumeClone); err != nil 
      klog.Errorf("error updating volume cache: %v", err)
    
  

  claimList, err := claimLister.List(labels.Everything())
  if err != nil 
    klog.Errorf("PersistentVolumeController can't initialize caches: %v", err)
    return
  
  for _, claim := range claimList 
    if _, err = ctrl.storeClaimUpdate(claim.DeepCopy()); err != nil 
      klog.Errorf("error updating claim cache: %v", err)
    
  
  klog.V(4).Infof("controller initialized")


type persistentVolumeOrderedIndex struct 
  store cache.Indexer

该方法将 cache.listener 里面的缓存转存在 persistentVolumeOrderedIndex 中,它是按 AccessModes 索引并按存储容量排序的 persistentVolume 的缓存。

1 resync

func (ctrl *PersistentVolumeController) resync() 
  klog.V(4).Infof("resyncing PV controller")

  pvcs, err := ctrl.claimLister.List(labels.NewSelector())
  if err != nil 
    klog.Warningf("cannot list claims: %s", err)
    return
  
  for _, pvc := range pvcs 
    ctrl.enqueueWork(ctrl.claimQueue, pvc)
  

  pvs, err := ctrl.volumeLister.List(labels.NewSelector())
  if err != nil 
    klog.Warningf("cannot list persistent volumes: %s", err)
    return
  
  for _, pv := range pvs 
    ctrl.enqueueWork(ctrl.volumeQueue, pv)
  

这里将集群内所有的 pvc/pv 统一都放到对应的 claimQueue & volumeQueue 里面重新处理。 这个resyncPeriod 等于一个random time.Duration * config.time(在 kcm 启动时设置)。

2 volumeWorker

一个无限循环, 不断的处理从 volumeQueue 里面获取到的 PersistentVolume

workFunc := func() bool 
    keyObj, quit := ctrl.volumeQueue.Get()
    if quit 
      return true
    
    defer ctrl.volumeQueue.Done(keyObj)
    key := keyObj.(string)
    klog.V(5).Infof("volumeWorker[%s]", key)

    _, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil 
      klog.V(4).Infof("error getting name of volume %q to get volume from informer: %v", key, err)
      return false
    
    volume, err := ctrl.volumeLister.Get(name)
    if err == nil 
      // The volume still exists in informer cache, the event must have
      // been add/update/sync
      ctrl.updateVolume(volume)
      return false
    
    if !errors.IsNotFound(err) 
      klog.V(2).Infof("error getting volume %q from informer: %v", key, err)
      return false
    

    // The volume is not in informer cache, the event must have been
    // "delete"
    volumeObj, found, err := ctrl.volumes.store.GetByKey(key)
    if err != nil 
      klog.V(2).Infof("error getting volume %q from cache: %v", key, err)
      return false
    
    if !found 
      // The controller has already processed the delete event and
      // deleted the volume from its cache
      klog.V(2).Infof("deletion of volume %q was already processed", key)
      return false
    
    volume, ok := volumeObj.(*v1.PersistentVolume)
    if !ok 
      klog.Errorf("expected volume, got %+v", volumeObj)
      return false
    
    ctrl.deleteVolume(volume)
    return false
  

我们主要关注 ctrl.updateVolume(volume) 方法

updateVolume

updateVolume 方法是对于集群内的 events 实际 handler 方法,它里面主要调用了 ctrl.syncVolume 方法来处理

func (ctrl *PersistentVolumeController) syncVolume(ctx context.Context, volume *v1.PersistentVolume) error 
  klog.V(4).Infof("synchronizing PersistentVolume[%s]: %s", volume.Name, getVolumeStatusForLogging(volume))

    ...
  // [Unit test set 4]
  if volume.Spec.ClaimRef == nil 
    // Volume is unused
    klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is unused", volume.Name)
    if _, err := ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ""); err != nil 
      // Nothing was saved; we will fall back into the same
      // condition in the next call to this method
      return err
    
    return nil
   else /* pv.Spec.ClaimRef != nil */ 
    // Volume is bound to a claim.
    if volume.Spec.ClaimRef.UID == "" 
      // The PV is reserved for a PVC; that PVC has not yet been
      // bound to this PV; the PVC sync will handle it.
      klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is pre-bound to claim %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
      if _, err := ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ""); err != nil 
        // Nothing was saved; we will fall back into the same
        // condition in the next call to this method
        return err
      
      return nil
    
    klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound to claim %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
    // Get the PVC by _name_
    var claim *v1.PersistentVolumeClaim
    claimName := claimrefToClaimKey(volume.Spec.ClaimRef)
    obj, found, err := ctrl.claims.GetByKey(claimName)
    if err != nil 
      return err
    
    if !found 
      if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed 
        obj, err = ctrl.claimLister.PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(volume.Spec.ClaimRef.Name)
        if err != nil && !apierrors.IsNotFound(err) 
          return err
        
        found = !apierrors.IsNotFound(err)
        if !found 
          obj, err = ctrl.kubeClient.CoreV1().PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(context.TODO(), volume.Spec.ClaimRef.Name, metav1.GetOptions)
          if err != nil && !apierrors.IsNotFound(err) 
            return err
          
          found = !apierrors.IsNotFound(err)
        
      
    
    if !found 
      klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s not found", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
      // Fall through with claim = nil
     else 
      var ok bool
      claim, ok = obj.(*v1.PersistentVolumeClaim)
      if !ok 
        return fmt.Errorf("cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, obj)
      
      klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s found: %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef), getClaimStatusForLogging(claim))
    
    if claim != nil && claim.UID != volume.Spec.ClaimRef.UID 
      klog.V(4).Infof("Maybe cached claim: %s is not the newest one, we should fetch it from apiserver", claimrefToClaimKey(volume.Spec.ClaimRef))

      claim, err = ctrl.kubeClient.CoreV1().PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(context.TODO(), volume.Spec.ClaimRef.Name, metav1.GetOptions)
      if err != nil && !apierrors.IsNotFound(err) 
        return err
       else if claim != nil 
        // Treat the volume as bound to a missing claim.
        if claim.UID != volume.Spec.ClaimRef.UID 
          klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s has a newer UID than pv.ClaimRef, the old one must have been deleted", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
          claim = nil
         else 
          klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s has a same UID with pv.ClaimRef", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
        
      
    

    if claim == nil 
      if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed 
        // Also, log this only once:
        klog.V(2).Infof("volume %q is released and reclaim policy %q will be executed", volume.Name, volume.Spec.PersistentVolumeReclaimPolicy)
        if volume, err = ctrl.updateVolumePhase(volume, v1.VolumeReleased, ""); err != nil 
          // Nothing was saved; we will fall back into the same condition
          // in the next call to this method
          return err
        
      
      if err = ctrl.reclaimVolume(volume); err != nil 
        // Release failed, we will fall back into the same condition
        // in the next call to this method
        return err
      
      if volume.Spec.PersistentVolumeReclaimPolicy == v1.PersistentVolumeReclaimRetain 
        // volume is being retained, it references a claim that does not exist now.
        klog.V(4).Infof("PersistentVolume[%s] references a claim %q (%s) that is not found", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef), volume.Spec.ClaimRef.UID)
      
      return nil
     else if claim.Spec.VolumeName == "" 
      if pvutil.CheckVolumeModeMismatches(&claim.Spec, &volume.Spec) 
        volumeMsg := fmt.Sprintf("Cannot bind PersistentVolume to requested PersistentVolumeClaim %q due to incompatible volumeMode.", claim.Name)
        ctrl.eventRecorder.Event(volume, v1.EventTypeWarning, events.VolumeMismatch, volumeMsg)
        claimMsg := fmt.Sprintf("Cannot bind PersistentVolume %q to requested PersistentVolumeClaim due to incompatible volumeMode.", volume.Name)
        ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, claimMsg)
        // Skipping syncClaim
        return nil
      

      if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnBoundByController) 
        // The binding is not completed; let PVC sync handle it
        klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume not bound yet, waiting for syncClaim to fix it", volume.Name)
       else 
        // Dangling PV; try to re-establish the link in the PVC sync
        klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume was bound and got unbound (by user?), waiting for syncClaim to fix it", volume.Name)
      
      ctrl.claimQueue.Add(claimToClaimKey(claim))
      return nil
     else if claim.Spec.VolumeName == volume.Name 
      // Volume is bound to a claim properly, update status if necessary
      klog.V(4).Infof("synchronizing PersistentVolume[%s]: all is bound", volume.Name)
      if _, err = ctrl.updateVolumePhase(volume, v1.VolumeBound, ""); err != nil 
        // Nothing was saved; we will fall back into the same
        // condition in the next call to this method
        return err
      
      return nil
     else 
      // Volume is bound to a claim, but the claim is bound elsewhere
      if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnDynamicallyProvisioned) && volume.Spec.PersistentVolumeReclaimPolicy == v1.PersistentVolumeReclaimDelete 
        if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed 
          // Also, log this only once:
          klog.V(2).Infof("dynamically volume %q is released and it will be deleted", volume.Name)
          if volume, err = ctrl.updateVolumePhase(volume, v1.VolumeReleased, ""); err != nil 
            // Nothing was saved; we will fall back into the same condition
            // in the next call to this method
            return err
          
        
        if err = ctrl.reclaimVolume(volume); err != nil 
          return err
        
        return nil
       else 
        if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnBoundByController) 
          klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound by controller to a claim that is bound to another volume, unbinding", volume.Name)
          if err = ctrl.unbindVolume(volume); err != nil 
            return err
          
          return nil
         else 
          // The PV must have been created with this ptr; leave it alone.
          klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound by user to a claim that is bound to another volume, waiting for the claim to get unbound", volume.Name)
          if err = ctrl.unbindVolume(volume); err != nil 
            return err
          
          return nil
        
      
    
  

1、当 pv 的 Spec.ClaimRef 的值为空的时候,说明当前 pv 未被使用,调用 ctrl.updateVolumePhase 使得 pv 进入 Available 状态

2、当 pv 的 Spec.ClaimRef 的值不为空的时候, 说明当前 pv 已绑定一个pvc

  • 当Spec.ClaimRef.UID 为空的时候,说明 pvc 还未绑定 pv, 调用ctrl.updateVolumePhase 使得 pv 进入 Available 状态, 方法返回,等待 pvc syncClaim 方法处理
  • 使用 Spec.ClaimRef 相关的 pvc 信息获取 pv_controller缓存的pvc
  • 如果 pvc 没有找到

    • 有可能是集群压力过大缓存没有更新,则进一步从 informercache 中找,如果 informercache里面还是没有的话则进一步从apiserver中去找
    • 这里如果发现 非 Released & 非 Failed 的pv 经过上述步骤仍然找不到 pvc 的话,说明 pvc 被删除。在最新的kubernetes 版本中会检查reclaimPoilcy,对 pv的状态进行处理
  • 找到 pvc 之后

1)如果 pvc 的 uid 和 Spec.ClaimRef.UID 不一致,这样一般是 pv 指向的 pvc 被删了,然后立即创建了一个同名的pvc, 而缓存还没有更新,这时我们需要doublecheck一下,若 double check 之后依旧不存在,则判断是pv绑定了一个不存在的pvc, 将pvc置为空,执行上述pvc 没有找到的逻辑

2)如果pvc 的 volumeName 为空

  • 检查 pvc的 volumeMode 和 pv 的 volumeMode是否一致,不一致报 event 出来
  • 如果发现有这个 pv 有 AnnBoundByController = "pv.kubernetes.io/bound-by-controller" 这个annotation 说明 pvc/pv 流程正在绑定中
  • 将 pvc 放到 claimQueue 里面, 让 claimWorker 进行处理

3)如果 pvc.Spec.volumeName == pv.volumeName 的时候,直接将 pv 设置为 bound 状态

4)如果 pvc.Spec.volumeName != pv.volumeName 的时候

  • 如果是 pv 是动态创建的情况下,并且 pv 的 ReclaimPolicy 是 delete 的情况下, 说明 pvc 已经绑定了其他pv, 将 pv 置为 released 的状态, 等待deleters 删除
  • 如果 pv 不是动态创建的情况下,将 pv 的 ClaimRef 字段置为空,将其 unbound 掉

3 claimWorker

一个无限循环,不断的处理从 claimQueue 里面获取到的 PersistentVolumeClaim

    workFunc := func() bool 
    keyObj, quit := ctrl.claimQueue.Get()
    if quit 
      return true
    
    defer ctrl.claimQueue.Done(keyObj)
    key := keyObj.(string)
    klog.V(5).Infof("claimWorker[%s]", key)

    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil 
      klog.V(4).Infof("error getting namespace & name of claim %q to get claim from informer: %v", key, err)
      return false
    
    claim, err := ctrl.claimLister.PersistentVolumeClaims(namespace).Get(name)
    if err == nil 
      // The claim still exists in informer cache, the event must have
      // been add/update/sync
      ctrl.updateClaim(claim)
      return false
    
    if !errors.IsNotFound(err) 
      klog.V(2).Infof("error getting claim %q from informer: %v", key, err)
      return false
    

    // The claim is not in informer cache, the event must have been "delete"
    claimObj, found, err := ctrl.claims.GetByKey(key)
    if err != nil 
      klog.V(2).Infof("error getting claim %q from   cache: %v", key, err)
      return false
    
    if !found 
      // The controller has already processed the delete event and
      // deleted the claim from its cache
      klog.V(2).Infof("deletion of claim %q was already processed", key)
      return false
    
    claim, ok := claimObj.(*v1.PersistentVolumeClaim)
    if !ok 
      klog.Errorf("expected claim, got %+v", claimObj)
      return false
    
    ctrl.deleteClaim(claim)
    return false
    

我们主要关注 ctrl.updateClaim(claim) 方法, 与上面同样,它里面主要调用了 ctrl.syncClaim 方法来处理, 在 syncClaim 里面根据 pvc 的状态分别调用了 ctrl.syncUnboundClaim & ctrl.syncBoundClaim 方法来处理

syncUnboundClaim

func (ctrl *PersistentVolumeController) syncUnboundClaim(ctx context.Context, claim *v1.PersistentVolumeClaim) error 
  if claim.Spec.VolumeName == "" 
    // User did not care which PV they get.
    delayBinding, err := pvutil.IsDelayBindingMode(claim, ctrl.classLister)
    if err != nil 
      return err
    

    // [Unit test set 1]
    volume, err := ctrl.volumes.findBestMatchForClaim(claim, delayBinding)
    if err != nil 
      klog.V(2).Infof("synchronizing unbound PersistentVolumeClaim[%s]: Error finding PV for claim: %v", claimToClaimKey(claim), err)
      return fmt.Errorf("error finding PV for claim %q: %w", claimToClaimKey(claim), err)
    
    if volume == nil 
      klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: no volume found", claimToClaimKey(claim))
      switch 
      case delayBinding && !pvutil.IsDelayBindingProvisioning(claim):
        if err = ctrl.emitEventForUnboundDelayBindingClaim(claim); err != nil 
          return err
        
      case storagehelpers.GetPersistentVolumeClaimClass(claim) != "":
        if err = ctrl.provisionClaim(ctx, claim); err != nil 
          return err
        
        return nil
      default:
        ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.FailedBinding, "no persistent volumes available for this claim and no storage class is set")
      

      // Mark the claim as Pending and try to find a match in the next
      // periodic syncClaim
      if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil 
        return err
      
      return nil
     else /* pv != nil */ 
      // Found a PV for this claim
      // OBSERVATION: pvc is "Pending", pv is "Available"
      claimKey := claimToClaimKey(claim)
      klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q found: %s", claimKey, volume.Name, getVolumeStatusForLogging(volume))
      if err = ctrl.bind(volume, claim); err != nil 
        metrics.RecordMetric(claimKey, &ctrl.operationTimestamps, err)
        return err
      
      metrics.RecordMetric(claimKey, &ctrl.operationTimestamps, nil)
      return nil
    
   else /* pvc.Spec.VolumeName != nil */ 
    klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested", claimToClaimKey(claim), claim.Spec.VolumeName)
    obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName)
    if err != nil 
      return err
    
    if !found 
      klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and not found, will try again next time", claimToClaimKey(claim), claim.Spec.VolumeName)
      if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil 
        return err
      
      return nil
     else 
      volume, ok := obj.(*v1.PersistentVolume)
      if !ok 
        return fmt.Errorf("cannot convert object from volume cache to volume %q!?: %+v", claim.Spec.VolumeName, obj)
      
      klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and found: %s", claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume))
      if volume.Spec.ClaimRef == nil 
        klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume is unbound, binding", claimToClaimKey(claim))
        if err = checkVolumeSatisfyClaim(volume, claim); err != nil 
          klog.V(4).Infof("Can't bind the claim to volume %q: %v", volume.Name, err)
          // send an event
          msg := fmt.Sprintf("Cannot bind to requested volume %q: %s", volume.Name, err)
          ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, msg)
          // volume does not satisfy the requirements of the claim
          if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil 
            return err
          
         else if err = ctrl.bind(volume, claim); err != nil 
          // On any error saving the volume or the claim, subsequent
          // syncClaim will finish the binding.
          return err
        
        // OBSERVATION: pvc is "Bound", pv is "Bound"
        return nil
       else if pvutil.IsVolumeBoundToClaim(volume, claim) 
        // User asked for a PV that is claimed by this PVC
        // OBSERVATION: pvc is "Pending", pv is "Bound"
        klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound, finishing the binding", claimToClaimKey(claim))

        // Finish the volume binding by adding claim UID.
        if err = ctrl.bind(volume, claim); err != nil 
          return err
        
        // OBSERVATION: pvc is "Bound", pv is "Bound"
        return nil
       else 
        // User asked for a PV that is claimed by someone else
        // OBSERVATION: pvc is "Pending", pv is "Bound"
        if !metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBoundByController) 
          klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim by user, will retry later", claimToClaimKey(claim))
          claimMsg := fmt.Sprintf("volume %q already bound to a different claim.", volume.Name)
          ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.FailedBinding, claimMsg)
          // User asked for a specific PV, retry later
          if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil 
            return err
          
          return nil
         else 
          klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim %q by controller, THIS SHOULD NEVER HAPPEN", claimToClaimKey(claim), claimrefToClaimKey(volume.Spec.ClaimRef))
          claimMsg := fmt.Sprintf("volume %q already bound to a different claim.", volume.Name)
          ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.FailedBinding, claimMsg)

          return fmt.Errorf("invalid binding of claim %q to volume %q: volume already claimed by %q", claimToClaimKey(claim), claim.Spec.VolumeName, claimrefToClaimKey(volume.Spec.ClaimRef))
        
      
    
  

梳理下整体流程

  • 如果当前 pvc 的 volumeName 为空

    • 判断当前pvc 是否是延迟绑定的
    • 调用 volume, err := ctrl.volumes.findBestMatchForClaim(claim, delayBinding) 找出对应的 pv
  • 如果找到 volume 的话

    • 调用 ctrl.bind(volume, claim) 方法进行绑定
  • 如果没有找到 volume 的话

    • 如果是延迟绑定, 并且还未触发(pod 未引用)则 emit event 到 pvc 上
    • 如果 pvc 绑定了 sc, 调用 ctrl.provisionClaim(ctx, claim) 方法
  1. 分析 pvc yaml, 找到 provisioner driver
  2. 启动一个 goroutine
  3. 调用 ctrl.provisionClaimOperation(ctx, claim, plugin, storageClass) 进行创建工作

provisionClaimOperation

func (ctrl *PersistentVolumeController) provisionClaimOperation(
  ctx context.Context,
  claim *v1.PersistentVolumeClaim,
  plugin vol.ProvisionableVolumePlugin,
  storageClass *storage.StorageClass) (string, error) 
  claimClass := storagehelpers.GetPersistentVolumeClaimClass(claim)
  klog.V(4).Infof("provisionClaimOperation [%s] started, class: %q", claimToClaimKey(claim), claimClass)

  pluginName := plugin.GetPluginName()
  if pluginName != "kubernetes.io/csi" && claim.Spec.DataSource != nil 
    strerr := fmt.Sprintf("plugin %q is not a CSI plugin. Only CSI plugin can provision a claim with a datasource", pluginName)
    return pluginName, fmt.Errorf(strerr)

  
  provisionerName := storageClass.Provisioner
  // Add provisioner annotation to be consistent with external provisioner workflow
  newClaim, err := ctrl.setClaimProvisioner(ctx, claim, provisionerName)
  if err != nil 
    // Save failed, the controller will retry in the next sync
    klog.V(2).Infof("error saving claim %s: %v", claimToClaimKey(claim), err)
    return pluginName, err
  
  claim = newClaim


  pvName := ctrl.getProvisionedVolumeNameForClaim(claim)
  volume, err := ctrl.kubeClient.CoreV1().PersistentVolumes().Get(context.TODO(), pvName, metav1.GetOptions)
  if err != nil && !apierrors.IsNotFound(err) 
    klog.V(3).Infof("error reading persistent volume %q: %v", pvName, err)
    return pluginName, err
  
  if err == nil && volume != nil 
    // Volume has been already provisioned, nothing to do.
    klog.V(4).Infof("provisionClaimOperation [%s]: volume already exists, skipping", claimToClaimKey(claim))
    return pluginName, err
  

  // Prepare a claimRef to the claim early (to fail before a volume is
  // provisioned)
  claimRef, err := ref.GetReference(scheme.Scheme, claim)
  if err != nil 
    klog.V(3).Infof("unexpected error getting claim reference: %v", err)
    return pluginName, err
  

  // Gather provisioning options
  tags := make(map[string]string)
  tags[CloudVolumeCreatedForClaimNamespaceTag] = claim.Namespace
  tags[CloudVolumeCreatedForClaimNameTag] = claim.Name
  tags[CloudVolumeCreatedForVolumeNameTag] = pvName

  options := vol.VolumeOptions
    PersistentVolumeReclaimPolicy: *storageClass.ReclaimPolicy,
    MountOptions:                  storageClass.MountOptions,
    CloudTags:                     &tags,
    ClusterName:                   ctrl.clusterName,
    PVName:                        pvName,
    PVC:                           claim,
    Parameters:                    storageClass.Parameters,
  

  // Refuse to provision if the plugin doesn't support mount options, creation
  // of PV would be rejected by validation anyway
  if !plugin.SupportsMountOption() && len(options.MountOptions) > 0 
    strerr := fmt.Sprintf("Mount options are not supported by the provisioner but StorageClass %q has mount options %v", storageClass.Name, options.MountOptions)
    klog.V(2).Infof("Mount options are not supported by the provisioner but claim %q's StorageClass %q has mount options %v", claimToClaimKey(claim), storageClass.Name, options.MountOptions)
    ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)
    return pluginName, fmt.Errorf("provisioner %q doesn't support mount options", plugin.GetPluginName())
  

  // Provision the volume
  provisioner, err := plugin.NewProvisioner(options)
  if err != nil 
    strerr := fmt.Sprintf("Failed to create provisioner: %v", err)
    klog.V(2).Infof("failed to create provisioner for claim %q with StorageClass %q: %v", claimToClaimKey(claim), storageClass.Name, err)
    ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)
    return pluginName, err
  

  var selectedNode *v1.Node = nil
  if nodeName, ok := claim.Annotations[pvutil.AnnSelectedNode]; ok 
    selectedNode, err = ctrl.NodeLister.Get(nodeName)
    if err != nil 
      strerr := fmt.Sprintf("Failed to get target node: %v", err)
      klog.V(3).Infof("unexpected error getting target node %q for claim %q: %v", nodeName, claimToClaimKey(claim), err)
      ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)
      return pluginName, err
    
  
  allowedTopologies := storageClass.AllowedTopologies

  opComplete := util.OperationCompleteHook(plugin.GetPluginName(), "volume_provision")
  volume, err = provisioner.Provision(selectedNode, allowedTopologies)
  opComplete(volumetypes.CompleteFuncParamErr: &err)
  if err != nil 
    ctrl.rescheduleProvisioning(claim)

    strerr := fmt.Sprintf("Failed to provision volume with StorageClass %q: %v", storageClass.Name, err)
    klog.V(2).Infof("failed to provision volume for claim %q with StorageClass %q: %v", claimToClaimKey(claim), storageClass.Name, err)
    ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)
    return pluginName, err
  

  klog.V(3).Infof("volume %q for claim %q created", volume.Name, claimToClaimKey(claim))

  // Create Kubernetes PV object for the volume.
  if volume.Name == "" 
    volume.Name = pvName
  
  // Bind it to the claim
  volume.Spec.ClaimRef = claimRef
  volume.Status.Phase = v1.VolumeBound
  volume.Spec.StorageClassName = claimClass

  // Add AnnBoundByController (used in deleting the volume)
  metav1.SetMetaDataAnnotation(&volume.ObjectMeta, pvutil.AnnBoundByController, "yes")
  metav1.SetMetaDataAnnotation(&volume.ObjectMeta, pvutil.AnnDynamicallyProvisioned, plugin.GetPluginName())

  // Try to create the PV object several times
  for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ 
    klog.V(4).Infof("provisionClaimOperation [%s]: trying to save volume %s", claimToClaimKey(claim), volume.Name)
    var newVol *v1.PersistentVolume
    if newVol, err = ctrl.kubeClient.CoreV1().PersistentVolumes().Create(context.TODO(), volume, metav1.CreateOptions); err == nil || apierrors.IsAlreadyExists(err) 
      // Save succeeded.
      if err != nil 
        klog.V(3).Infof("volume %q for claim %q already exists, reusing", volume.Name, claimToClaimKey(claim))
        err = nil
       else 
        klog.V(3).Infof("volume %q for claim %q saved", volume.Name, claimToClaimKey(claim))

        _, updateErr := ctrl.storeVolumeUpdate(newVol)
        if updateErr != nil 
          // We will get an "volume added" event soon, this is not a big error
          klog.V(4).Infof("provisionClaimOperation [%s]: cannot update internal cache: %v", volume.Name, updateErr)
        
      
      break
    
    // Save failed, try again after a while.
    klog.V(3).Infof("failed to save volume %q for claim %q: %v", volume.Name, claimToClaimKey(claim), err)
    time.Sleep(ctrl.createProvisionedPVInterval)
  

  if err != nil 
    strerr := fmt.Sprintf("Error creating provisioned PV object for claim %s: %v. Deleting the volume.", claimToClaimKey(claim), err)
    klog.V(3).Info(strerr)
    ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)

    var deleteErr error
    var deleted bool
    for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ 
      _, deleted, deleteErr = ctrl.doDeleteVolume(volume)
      if deleteErr == nil && deleted 
        // Delete succeeded
        klog.V(4).Infof("provisionClaimOperation [%s]: cleaning volume %s succeeded", claimToClaimKey(claim), volume.Name)
        break
      
      if !deleted 
        klog.Errorf("Error finding internal deleter for volume plugin %q", plugin.GetPluginName())
        break
      
      // Delete failed, try again after a while.
      klog.V(3).Infof("failed to delete volume %q: %v", volume.Name, deleteErr)
      time.Sleep(ctrl.createProvisionedPVInterval)
    

    if deleteErr != nil 
      strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), deleteErr)
      klog.V(2).Info(strerr)
      ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningCleanupFailed, strerr)
    
   else 
    klog.V(2).Infof("volume %q provisioned for claim %q", volume.Name, claimToClaimKey(claim))
    msg := fmt.Sprintf("Successfully provisioned volume %s using %s", volume.Name, plugin.GetPluginName())
    ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.ProvisioningSucceeded, msg)
  
  return pluginName, nil

provisionClaimOperation 的基本逻辑如下

  • 检查driver,只有 csi 类型的 driver 才允许使用 dataSource 字段
  • 为 pvc 加 claim.Annotations["volume.kubernetes.io/storage-provisioner"] = class.Provisioner annotation
  • 根据规则拼出 pv Name = "pvc-" + pvc.UID
  • 如果找到了 pv, 则说明 pv已经存在,跳过 provision
  • 收集pvc/pv 基本信息封装到 options 中
  • 对 plugin 进行校验, 如果plugin不支持mount操作,则直接拒绝provision 请求
  • 调用plugin.NewProvisioner(options) 创建 provisioner, 接口实现了Provision(selectedNode *v1.Node, allowedTopologies []v1.TopologySelectorTerm) 方法,注意,该方法为同步方法
  • Provision 方法返回了 PersistentVolume实例
  • 为创建出来的 pv 关联 pvc 对象(ClaimRef),尝试创建 pv 对象 (重复多次)
  • 如果创建 pv 失败,则尝试调用 Delete 方法删除创建的volume资源

syncBoundClaim

func (ctrl *PersistentVolumeController) syncBoundClaim(claim *v1.PersistentVolumeClaim) error 

  if claim.Spec.VolumeName == "" 
    // Claim was bound before but not any more.
    if _, err := ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimLost", "Bound claim has lost reference to PersistentVolume. Data on the volume is lost!"); err != nil 
      return err
    
    return nil
  
  obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName)
  if err != nil 
    return err
  
  if !found 
    // Claim is bound to a non-existing volume.
    if _, err = ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimLost", "Bound claim has lost its PersistentVolume. Data on the volume is lost!"); err != nil 
      return err
    
    return nil
   else 
    volume, ok := obj.(*v1.PersistentVolume)
    if !ok 
      return fmt.Errorf("cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, obj)
    

    klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: volume %q found: %s", claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume))
    if volume.Spec.ClaimRef == nil 
      // Claim is bound but volume has come unbound.
      // Or, a claim was bound and the controller has not received updated
      // volume yet. We can't distinguish these cases.
      // Bind the volume again and set all states to Bound.
      klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: volume is unbound, fixing", claimToClaimKey(claim))
      if err = ctrl.bind(volume, claim); err != nil 
        // Objects not saved, next syncPV or syncClaim will try again
        return err
      
      return nil
     else if volume.Spec.ClaimRef.UID == claim.UID 
      // All is well
      // NOTE: syncPV can handle this so it can be left out.
      // NOTE: bind() call here will do nothing in most cases as
      // everything should be already set.
      klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: claim is already correctly bound", claimToClaimKey(claim))
      if err = ctrl.bind(volume, claim); err != nil 
        // Objects not saved, next syncPV or syncClaim will try again
        return err
      
      return nil
     else 
      // Claim is bound but volume has a different claimant.
      // Set the claim phase to 'Lost', which is a terminal
      // phase.
      if _, err = ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimMisbound", "Two claims are bound to the same volume, this one is bound incorrectly"); err != nil 
        return err
      
      return nil
    
  


1)如果 pvc.Spec.VolumeName 为空, 说明这个 pvc 之前被 bound 过,但是已经不存在指向的pv, 报出event并返回

2)从 cache 里面找 pvc 绑定的 pv

  • 如果没找到, 说明 pvc 绑定了一个不存在的pv,报 event 并返回
  • 如果找到了pv

    • 检查 pv.Spec.ClaimRef 字段, 如果 为空,说明 pv 还没有绑定 pvc, 调用 ctrl.bind(volume, claim); 方法进行绑定
    • pv.ClaimRef.UID == pvc.UID, 调用 bind 方法,但是大多数情况会直接返回(因为所有的操作都已经做完了)
    • 其他情况说明 volume 绑定了其他的 pvc, 更新pvc 的状态 为 lost 并报出 event

四 总结

最后用一张 pvc/pv 的状态流转图来总结一下

原文链接

本文为阿里云原创内容,未经允许不得转载。 

以上是关于kubernetes pv-controller 解析的主要内容,如果未能解决你的问题,请参考以下文章

Kubernetes——Kubernetes基础+部署Kubernetes集群

Kubernetes - Kubernetes部署Kubernetes Dashbaord

kubernetes学习01—kubernetes介绍

Kubernetes——Kubernetes的介绍和使用 kubeadm方式搭建Kubernetes集群

Kubernetes——Kubernetes资源管理+Kubernetes实战入门

Kubernetes——Kubernetes资源管理+Kubernetes实战入门