kubernetes pv-controller 解析
Posted 阿里云云栖号
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kubernetes pv-controller 解析相关的知识,希望对你有一定的参考价值。
简介:pv controller是 kcm 的组件之一,它负责处理集群中的pvc/pv对象,对pvc/pv 对象进行状态转换。本文将基于 kubernetes 1.23进行解析。
作者 | 牧琦
来源 | 阿里技术公众号
基于 kubernetes 1.23
一 简介
pv controller是 kcm 的组件之一,它负责处理集群中的pvc/pv对象,对pvc/pv 对象进行状态转换。
二 pvController 初始化
初始化代码在 pkg/controller/volume/persistentvolume/pv_controller_base.go 文件中,NewController 主要做了如下几件事情
- 初始化 eventRecorder
- 初始化 PersistentVolumeController 对象,
- 调用 VolumePluginMgr.InitPlugins() 方法 初始化存储插件,代码存在于 pkg/volume/plugins.go 文件中
-
开始创建 informer 监听集群内的资源,初始化了如下 informer
- PersistentVolumeInformer
- PersistentVolumeClaimInformer
- StorageClassInformer
- PodInformer
- NodeInformer
- 将 PV & PVC 的 event 分别放入 volumeQueue & claimQueue
- 为了不每次都迭代 pods ,自定义一个通过 pvc 键索引 pod 的索引器
- 初始化 intree 存储 -> csi 迁移相关功能的 manager
NewController代码在cmd/kube-controller-manager代码里面被调用,初始化成功之后紧接着调用go Run()方法运行 pvController
三 开始运行
// 开始运行 pvController
func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct)
defer utilruntime.HandleCrash()
defer ctrl.claimQueue.ShutDown()
defer ctrl.volumeQueue.ShutDown()
klog.Infof("Starting persistent volume controller")
defer klog.Infof("Shutting down persistent volume controller")
if !cache.WaitForNamedCacheSync("persistent volume", stopCh, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced, ctrl.podListerSynced, ctrl.NodeListerSynced)
return
ctrl.initializeCaches(ctrl.volumeLister, ctrl.claimLister)
go wait.Until(ctrl.resync, ctrl.resyncPeriod, stopCh)
go wait.Until(ctrl.volumeWorker, time.Second, stopCh)
go wait.Until(ctrl.claimWorker, time.Second, stopCh)
metrics.Register(ctrl.volumes.store, ctrl.claims, &ctrl.volumePluginMgr)
<-stopCh
同步缓存之后开始周期性执行 ctrl.resync,ctrl.volumeWorker , ctrl.claimWorker , 我们看下 initalizeCaches 方法
func (ctrl *PersistentVolumeController) initializeCaches(volumeLister corelisters.PersistentVolumeLister, claimLister corelisters.PersistentVolumeClaimLister)
// 这里不访问 apiserver,是从本地缓存拿出的对象,这些对象不可以被外部函数修改
volumeList, err := volumeLister.List(labels.Everything())
if err != nil
klog.Errorf("PersistentVolumeController can't initialize caches: %v", err)
return
for _, volume := range volumeList
// 我们不能改变 volume 对象,所以这里我们copy一份新对象,对新对象进行操作
volumeClone := volume.DeepCopy()
if _, err = ctrl.storeVolumeUpdate(volumeClone); err != nil
klog.Errorf("error updating volume cache: %v", err)
claimList, err := claimLister.List(labels.Everything())
if err != nil
klog.Errorf("PersistentVolumeController can't initialize caches: %v", err)
return
for _, claim := range claimList
if _, err = ctrl.storeClaimUpdate(claim.DeepCopy()); err != nil
klog.Errorf("error updating claim cache: %v", err)
klog.V(4).Infof("controller initialized")
type persistentVolumeOrderedIndex struct
store cache.Indexer
该方法将 cache.listener 里面的缓存转存在 persistentVolumeOrderedIndex 中,它是按 AccessModes 索引并按存储容量排序的 persistentVolume 的缓存。
1 resync
func (ctrl *PersistentVolumeController) resync()
klog.V(4).Infof("resyncing PV controller")
pvcs, err := ctrl.claimLister.List(labels.NewSelector())
if err != nil
klog.Warningf("cannot list claims: %s", err)
return
for _, pvc := range pvcs
ctrl.enqueueWork(ctrl.claimQueue, pvc)
pvs, err := ctrl.volumeLister.List(labels.NewSelector())
if err != nil
klog.Warningf("cannot list persistent volumes: %s", err)
return
for _, pv := range pvs
ctrl.enqueueWork(ctrl.volumeQueue, pv)
这里将集群内所有的 pvc/pv 统一都放到对应的 claimQueue & volumeQueue 里面重新处理。 这个resyncPeriod 等于一个random time.Duration * config.time(在 kcm 启动时设置)。
2 volumeWorker
一个无限循环, 不断的处理从 volumeQueue 里面获取到的 PersistentVolume
workFunc := func() bool
keyObj, quit := ctrl.volumeQueue.Get()
if quit
return true
defer ctrl.volumeQueue.Done(keyObj)
key := keyObj.(string)
klog.V(5).Infof("volumeWorker[%s]", key)
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil
klog.V(4).Infof("error getting name of volume %q to get volume from informer: %v", key, err)
return false
volume, err := ctrl.volumeLister.Get(name)
if err == nil
// The volume still exists in informer cache, the event must have
// been add/update/sync
ctrl.updateVolume(volume)
return false
if !errors.IsNotFound(err)
klog.V(2).Infof("error getting volume %q from informer: %v", key, err)
return false
// The volume is not in informer cache, the event must have been
// "delete"
volumeObj, found, err := ctrl.volumes.store.GetByKey(key)
if err != nil
klog.V(2).Infof("error getting volume %q from cache: %v", key, err)
return false
if !found
// The controller has already processed the delete event and
// deleted the volume from its cache
klog.V(2).Infof("deletion of volume %q was already processed", key)
return false
volume, ok := volumeObj.(*v1.PersistentVolume)
if !ok
klog.Errorf("expected volume, got %+v", volumeObj)
return false
ctrl.deleteVolume(volume)
return false
我们主要关注 ctrl.updateVolume(volume) 方法
updateVolume
updateVolume 方法是对于集群内的 events 实际 handler 方法,它里面主要调用了 ctrl.syncVolume 方法来处理
func (ctrl *PersistentVolumeController) syncVolume(ctx context.Context, volume *v1.PersistentVolume) error
klog.V(4).Infof("synchronizing PersistentVolume[%s]: %s", volume.Name, getVolumeStatusForLogging(volume))
...
// [Unit test set 4]
if volume.Spec.ClaimRef == nil
// Volume is unused
klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is unused", volume.Name)
if _, err := ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ""); err != nil
// Nothing was saved; we will fall back into the same
// condition in the next call to this method
return err
return nil
else /* pv.Spec.ClaimRef != nil */
// Volume is bound to a claim.
if volume.Spec.ClaimRef.UID == ""
// The PV is reserved for a PVC; that PVC has not yet been
// bound to this PV; the PVC sync will handle it.
klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is pre-bound to claim %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
if _, err := ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ""); err != nil
// Nothing was saved; we will fall back into the same
// condition in the next call to this method
return err
return nil
klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound to claim %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
// Get the PVC by _name_
var claim *v1.PersistentVolumeClaim
claimName := claimrefToClaimKey(volume.Spec.ClaimRef)
obj, found, err := ctrl.claims.GetByKey(claimName)
if err != nil
return err
if !found
if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed
obj, err = ctrl.claimLister.PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(volume.Spec.ClaimRef.Name)
if err != nil && !apierrors.IsNotFound(err)
return err
found = !apierrors.IsNotFound(err)
if !found
obj, err = ctrl.kubeClient.CoreV1().PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(context.TODO(), volume.Spec.ClaimRef.Name, metav1.GetOptions)
if err != nil && !apierrors.IsNotFound(err)
return err
found = !apierrors.IsNotFound(err)
if !found
klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s not found", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
// Fall through with claim = nil
else
var ok bool
claim, ok = obj.(*v1.PersistentVolumeClaim)
if !ok
return fmt.Errorf("cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, obj)
klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s found: %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef), getClaimStatusForLogging(claim))
if claim != nil && claim.UID != volume.Spec.ClaimRef.UID
klog.V(4).Infof("Maybe cached claim: %s is not the newest one, we should fetch it from apiserver", claimrefToClaimKey(volume.Spec.ClaimRef))
claim, err = ctrl.kubeClient.CoreV1().PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(context.TODO(), volume.Spec.ClaimRef.Name, metav1.GetOptions)
if err != nil && !apierrors.IsNotFound(err)
return err
else if claim != nil
// Treat the volume as bound to a missing claim.
if claim.UID != volume.Spec.ClaimRef.UID
klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s has a newer UID than pv.ClaimRef, the old one must have been deleted", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
claim = nil
else
klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s has a same UID with pv.ClaimRef", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
if claim == nil
if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed
// Also, log this only once:
klog.V(2).Infof("volume %q is released and reclaim policy %q will be executed", volume.Name, volume.Spec.PersistentVolumeReclaimPolicy)
if volume, err = ctrl.updateVolumePhase(volume, v1.VolumeReleased, ""); err != nil
// Nothing was saved; we will fall back into the same condition
// in the next call to this method
return err
if err = ctrl.reclaimVolume(volume); err != nil
// Release failed, we will fall back into the same condition
// in the next call to this method
return err
if volume.Spec.PersistentVolumeReclaimPolicy == v1.PersistentVolumeReclaimRetain
// volume is being retained, it references a claim that does not exist now.
klog.V(4).Infof("PersistentVolume[%s] references a claim %q (%s) that is not found", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef), volume.Spec.ClaimRef.UID)
return nil
else if claim.Spec.VolumeName == ""
if pvutil.CheckVolumeModeMismatches(&claim.Spec, &volume.Spec)
volumeMsg := fmt.Sprintf("Cannot bind PersistentVolume to requested PersistentVolumeClaim %q due to incompatible volumeMode.", claim.Name)
ctrl.eventRecorder.Event(volume, v1.EventTypeWarning, events.VolumeMismatch, volumeMsg)
claimMsg := fmt.Sprintf("Cannot bind PersistentVolume %q to requested PersistentVolumeClaim due to incompatible volumeMode.", volume.Name)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, claimMsg)
// Skipping syncClaim
return nil
if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnBoundByController)
// The binding is not completed; let PVC sync handle it
klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume not bound yet, waiting for syncClaim to fix it", volume.Name)
else
// Dangling PV; try to re-establish the link in the PVC sync
klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume was bound and got unbound (by user?), waiting for syncClaim to fix it", volume.Name)
ctrl.claimQueue.Add(claimToClaimKey(claim))
return nil
else if claim.Spec.VolumeName == volume.Name
// Volume is bound to a claim properly, update status if necessary
klog.V(4).Infof("synchronizing PersistentVolume[%s]: all is bound", volume.Name)
if _, err = ctrl.updateVolumePhase(volume, v1.VolumeBound, ""); err != nil
// Nothing was saved; we will fall back into the same
// condition in the next call to this method
return err
return nil
else
// Volume is bound to a claim, but the claim is bound elsewhere
if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnDynamicallyProvisioned) && volume.Spec.PersistentVolumeReclaimPolicy == v1.PersistentVolumeReclaimDelete
if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed
// Also, log this only once:
klog.V(2).Infof("dynamically volume %q is released and it will be deleted", volume.Name)
if volume, err = ctrl.updateVolumePhase(volume, v1.VolumeReleased, ""); err != nil
// Nothing was saved; we will fall back into the same condition
// in the next call to this method
return err
if err = ctrl.reclaimVolume(volume); err != nil
return err
return nil
else
if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnBoundByController)
klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound by controller to a claim that is bound to another volume, unbinding", volume.Name)
if err = ctrl.unbindVolume(volume); err != nil
return err
return nil
else
// The PV must have been created with this ptr; leave it alone.
klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound by user to a claim that is bound to another volume, waiting for the claim to get unbound", volume.Name)
if err = ctrl.unbindVolume(volume); err != nil
return err
return nil
1、当 pv 的 Spec.ClaimRef 的值为空的时候,说明当前 pv 未被使用,调用 ctrl.updateVolumePhase 使得 pv 进入 Available 状态
2、当 pv 的 Spec.ClaimRef 的值不为空的时候, 说明当前 pv 已绑定一个pvc
- 当Spec.ClaimRef.UID 为空的时候,说明 pvc 还未绑定 pv, 调用ctrl.updateVolumePhase 使得 pv 进入 Available 状态, 方法返回,等待 pvc syncClaim 方法处理
- 使用 Spec.ClaimRef 相关的 pvc 信息获取 pv_controller缓存的pvc
-
如果 pvc 没有找到
- 有可能是集群压力过大缓存没有更新,则进一步从 informercache 中找,如果 informercache里面还是没有的话则进一步从apiserver中去找
- 这里如果发现 非 Released & 非 Failed 的pv 经过上述步骤仍然找不到 pvc 的话,说明 pvc 被删除。在最新的kubernetes 版本中会检查reclaimPoilcy,对 pv的状态进行处理
- 找到 pvc 之后
1)如果 pvc 的 uid 和 Spec.ClaimRef.UID 不一致,这样一般是 pv 指向的 pvc 被删了,然后立即创建了一个同名的pvc, 而缓存还没有更新,这时我们需要doublecheck一下,若 double check 之后依旧不存在,则判断是pv绑定了一个不存在的pvc, 将pvc置为空,执行上述pvc 没有找到的逻辑
2)如果pvc 的 volumeName 为空
- 检查 pvc的 volumeMode 和 pv 的 volumeMode是否一致,不一致报 event 出来
- 如果发现有这个 pv 有 AnnBoundByController = "pv.kubernetes.io/bound-by-controller" 这个annotation 说明 pvc/pv 流程正在绑定中
- 将 pvc 放到 claimQueue 里面, 让 claimWorker 进行处理
3)如果 pvc.Spec.volumeName == pv.volumeName 的时候,直接将 pv 设置为 bound 状态
4)如果 pvc.Spec.volumeName != pv.volumeName 的时候
- 如果是 pv 是动态创建的情况下,并且 pv 的 ReclaimPolicy 是 delete 的情况下, 说明 pvc 已经绑定了其他pv, 将 pv 置为 released 的状态, 等待deleters 删除
- 如果 pv 不是动态创建的情况下,将 pv 的 ClaimRef 字段置为空,将其 unbound 掉
3 claimWorker
一个无限循环,不断的处理从 claimQueue 里面获取到的 PersistentVolumeClaim
workFunc := func() bool
keyObj, quit := ctrl.claimQueue.Get()
if quit
return true
defer ctrl.claimQueue.Done(keyObj)
key := keyObj.(string)
klog.V(5).Infof("claimWorker[%s]", key)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil
klog.V(4).Infof("error getting namespace & name of claim %q to get claim from informer: %v", key, err)
return false
claim, err := ctrl.claimLister.PersistentVolumeClaims(namespace).Get(name)
if err == nil
// The claim still exists in informer cache, the event must have
// been add/update/sync
ctrl.updateClaim(claim)
return false
if !errors.IsNotFound(err)
klog.V(2).Infof("error getting claim %q from informer: %v", key, err)
return false
// The claim is not in informer cache, the event must have been "delete"
claimObj, found, err := ctrl.claims.GetByKey(key)
if err != nil
klog.V(2).Infof("error getting claim %q from cache: %v", key, err)
return false
if !found
// The controller has already processed the delete event and
// deleted the claim from its cache
klog.V(2).Infof("deletion of claim %q was already processed", key)
return false
claim, ok := claimObj.(*v1.PersistentVolumeClaim)
if !ok
klog.Errorf("expected claim, got %+v", claimObj)
return false
ctrl.deleteClaim(claim)
return false
我们主要关注 ctrl.updateClaim(claim) 方法, 与上面同样,它里面主要调用了 ctrl.syncClaim 方法来处理, 在 syncClaim 里面根据 pvc 的状态分别调用了 ctrl.syncUnboundClaim & ctrl.syncBoundClaim 方法来处理
syncUnboundClaim
func (ctrl *PersistentVolumeController) syncUnboundClaim(ctx context.Context, claim *v1.PersistentVolumeClaim) error
if claim.Spec.VolumeName == ""
// User did not care which PV they get.
delayBinding, err := pvutil.IsDelayBindingMode(claim, ctrl.classLister)
if err != nil
return err
// [Unit test set 1]
volume, err := ctrl.volumes.findBestMatchForClaim(claim, delayBinding)
if err != nil
klog.V(2).Infof("synchronizing unbound PersistentVolumeClaim[%s]: Error finding PV for claim: %v", claimToClaimKey(claim), err)
return fmt.Errorf("error finding PV for claim %q: %w", claimToClaimKey(claim), err)
if volume == nil
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: no volume found", claimToClaimKey(claim))
switch
case delayBinding && !pvutil.IsDelayBindingProvisioning(claim):
if err = ctrl.emitEventForUnboundDelayBindingClaim(claim); err != nil
return err
case storagehelpers.GetPersistentVolumeClaimClass(claim) != "":
if err = ctrl.provisionClaim(ctx, claim); err != nil
return err
return nil
default:
ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.FailedBinding, "no persistent volumes available for this claim and no storage class is set")
// Mark the claim as Pending and try to find a match in the next
// periodic syncClaim
if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil
return err
return nil
else /* pv != nil */
// Found a PV for this claim
// OBSERVATION: pvc is "Pending", pv is "Available"
claimKey := claimToClaimKey(claim)
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q found: %s", claimKey, volume.Name, getVolumeStatusForLogging(volume))
if err = ctrl.bind(volume, claim); err != nil
metrics.RecordMetric(claimKey, &ctrl.operationTimestamps, err)
return err
metrics.RecordMetric(claimKey, &ctrl.operationTimestamps, nil)
return nil
else /* pvc.Spec.VolumeName != nil */
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested", claimToClaimKey(claim), claim.Spec.VolumeName)
obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName)
if err != nil
return err
if !found
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and not found, will try again next time", claimToClaimKey(claim), claim.Spec.VolumeName)
if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil
return err
return nil
else
volume, ok := obj.(*v1.PersistentVolume)
if !ok
return fmt.Errorf("cannot convert object from volume cache to volume %q!?: %+v", claim.Spec.VolumeName, obj)
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and found: %s", claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume))
if volume.Spec.ClaimRef == nil
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume is unbound, binding", claimToClaimKey(claim))
if err = checkVolumeSatisfyClaim(volume, claim); err != nil
klog.V(4).Infof("Can't bind the claim to volume %q: %v", volume.Name, err)
// send an event
msg := fmt.Sprintf("Cannot bind to requested volume %q: %s", volume.Name, err)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, msg)
// volume does not satisfy the requirements of the claim
if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil
return err
else if err = ctrl.bind(volume, claim); err != nil
// On any error saving the volume or the claim, subsequent
// syncClaim will finish the binding.
return err
// OBSERVATION: pvc is "Bound", pv is "Bound"
return nil
else if pvutil.IsVolumeBoundToClaim(volume, claim)
// User asked for a PV that is claimed by this PVC
// OBSERVATION: pvc is "Pending", pv is "Bound"
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound, finishing the binding", claimToClaimKey(claim))
// Finish the volume binding by adding claim UID.
if err = ctrl.bind(volume, claim); err != nil
return err
// OBSERVATION: pvc is "Bound", pv is "Bound"
return nil
else
// User asked for a PV that is claimed by someone else
// OBSERVATION: pvc is "Pending", pv is "Bound"
if !metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBoundByController)
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim by user, will retry later", claimToClaimKey(claim))
claimMsg := fmt.Sprintf("volume %q already bound to a different claim.", volume.Name)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.FailedBinding, claimMsg)
// User asked for a specific PV, retry later
if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil
return err
return nil
else
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim %q by controller, THIS SHOULD NEVER HAPPEN", claimToClaimKey(claim), claimrefToClaimKey(volume.Spec.ClaimRef))
claimMsg := fmt.Sprintf("volume %q already bound to a different claim.", volume.Name)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.FailedBinding, claimMsg)
return fmt.Errorf("invalid binding of claim %q to volume %q: volume already claimed by %q", claimToClaimKey(claim), claim.Spec.VolumeName, claimrefToClaimKey(volume.Spec.ClaimRef))
梳理下整体流程
-
如果当前 pvc 的 volumeName 为空
- 判断当前pvc 是否是延迟绑定的
- 调用 volume, err := ctrl.volumes.findBestMatchForClaim(claim, delayBinding) 找出对应的 pv
-
如果找到 volume 的话
- 调用 ctrl.bind(volume, claim) 方法进行绑定
-
如果没有找到 volume 的话
- 如果是延迟绑定, 并且还未触发(pod 未引用)则 emit event 到 pvc 上
- 如果 pvc 绑定了 sc, 调用 ctrl.provisionClaim(ctx, claim) 方法
- 分析 pvc yaml, 找到 provisioner driver
- 启动一个 goroutine
- 调用 ctrl.provisionClaimOperation(ctx, claim, plugin, storageClass) 进行创建工作
provisionClaimOperation
func (ctrl *PersistentVolumeController) provisionClaimOperation(
ctx context.Context,
claim *v1.PersistentVolumeClaim,
plugin vol.ProvisionableVolumePlugin,
storageClass *storage.StorageClass) (string, error)
claimClass := storagehelpers.GetPersistentVolumeClaimClass(claim)
klog.V(4).Infof("provisionClaimOperation [%s] started, class: %q", claimToClaimKey(claim), claimClass)
pluginName := plugin.GetPluginName()
if pluginName != "kubernetes.io/csi" && claim.Spec.DataSource != nil
strerr := fmt.Sprintf("plugin %q is not a CSI plugin. Only CSI plugin can provision a claim with a datasource", pluginName)
return pluginName, fmt.Errorf(strerr)
provisionerName := storageClass.Provisioner
// Add provisioner annotation to be consistent with external provisioner workflow
newClaim, err := ctrl.setClaimProvisioner(ctx, claim, provisionerName)
if err != nil
// Save failed, the controller will retry in the next sync
klog.V(2).Infof("error saving claim %s: %v", claimToClaimKey(claim), err)
return pluginName, err
claim = newClaim
pvName := ctrl.getProvisionedVolumeNameForClaim(claim)
volume, err := ctrl.kubeClient.CoreV1().PersistentVolumes().Get(context.TODO(), pvName, metav1.GetOptions)
if err != nil && !apierrors.IsNotFound(err)
klog.V(3).Infof("error reading persistent volume %q: %v", pvName, err)
return pluginName, err
if err == nil && volume != nil
// Volume has been already provisioned, nothing to do.
klog.V(4).Infof("provisionClaimOperation [%s]: volume already exists, skipping", claimToClaimKey(claim))
return pluginName, err
// Prepare a claimRef to the claim early (to fail before a volume is
// provisioned)
claimRef, err := ref.GetReference(scheme.Scheme, claim)
if err != nil
klog.V(3).Infof("unexpected error getting claim reference: %v", err)
return pluginName, err
// Gather provisioning options
tags := make(map[string]string)
tags[CloudVolumeCreatedForClaimNamespaceTag] = claim.Namespace
tags[CloudVolumeCreatedForClaimNameTag] = claim.Name
tags[CloudVolumeCreatedForVolumeNameTag] = pvName
options := vol.VolumeOptions
PersistentVolumeReclaimPolicy: *storageClass.ReclaimPolicy,
MountOptions: storageClass.MountOptions,
CloudTags: &tags,
ClusterName: ctrl.clusterName,
PVName: pvName,
PVC: claim,
Parameters: storageClass.Parameters,
// Refuse to provision if the plugin doesn't support mount options, creation
// of PV would be rejected by validation anyway
if !plugin.SupportsMountOption() && len(options.MountOptions) > 0
strerr := fmt.Sprintf("Mount options are not supported by the provisioner but StorageClass %q has mount options %v", storageClass.Name, options.MountOptions)
klog.V(2).Infof("Mount options are not supported by the provisioner but claim %q's StorageClass %q has mount options %v", claimToClaimKey(claim), storageClass.Name, options.MountOptions)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)
return pluginName, fmt.Errorf("provisioner %q doesn't support mount options", plugin.GetPluginName())
// Provision the volume
provisioner, err := plugin.NewProvisioner(options)
if err != nil
strerr := fmt.Sprintf("Failed to create provisioner: %v", err)
klog.V(2).Infof("failed to create provisioner for claim %q with StorageClass %q: %v", claimToClaimKey(claim), storageClass.Name, err)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)
return pluginName, err
var selectedNode *v1.Node = nil
if nodeName, ok := claim.Annotations[pvutil.AnnSelectedNode]; ok
selectedNode, err = ctrl.NodeLister.Get(nodeName)
if err != nil
strerr := fmt.Sprintf("Failed to get target node: %v", err)
klog.V(3).Infof("unexpected error getting target node %q for claim %q: %v", nodeName, claimToClaimKey(claim), err)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)
return pluginName, err
allowedTopologies := storageClass.AllowedTopologies
opComplete := util.OperationCompleteHook(plugin.GetPluginName(), "volume_provision")
volume, err = provisioner.Provision(selectedNode, allowedTopologies)
opComplete(volumetypes.CompleteFuncParamErr: &err)
if err != nil
ctrl.rescheduleProvisioning(claim)
strerr := fmt.Sprintf("Failed to provision volume with StorageClass %q: %v", storageClass.Name, err)
klog.V(2).Infof("failed to provision volume for claim %q with StorageClass %q: %v", claimToClaimKey(claim), storageClass.Name, err)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)
return pluginName, err
klog.V(3).Infof("volume %q for claim %q created", volume.Name, claimToClaimKey(claim))
// Create Kubernetes PV object for the volume.
if volume.Name == ""
volume.Name = pvName
// Bind it to the claim
volume.Spec.ClaimRef = claimRef
volume.Status.Phase = v1.VolumeBound
volume.Spec.StorageClassName = claimClass
// Add AnnBoundByController (used in deleting the volume)
metav1.SetMetaDataAnnotation(&volume.ObjectMeta, pvutil.AnnBoundByController, "yes")
metav1.SetMetaDataAnnotation(&volume.ObjectMeta, pvutil.AnnDynamicallyProvisioned, plugin.GetPluginName())
// Try to create the PV object several times
for i := 0; i < ctrl.createProvisionedPVRetryCount; i++
klog.V(4).Infof("provisionClaimOperation [%s]: trying to save volume %s", claimToClaimKey(claim), volume.Name)
var newVol *v1.PersistentVolume
if newVol, err = ctrl.kubeClient.CoreV1().PersistentVolumes().Create(context.TODO(), volume, metav1.CreateOptions); err == nil || apierrors.IsAlreadyExists(err)
// Save succeeded.
if err != nil
klog.V(3).Infof("volume %q for claim %q already exists, reusing", volume.Name, claimToClaimKey(claim))
err = nil
else
klog.V(3).Infof("volume %q for claim %q saved", volume.Name, claimToClaimKey(claim))
_, updateErr := ctrl.storeVolumeUpdate(newVol)
if updateErr != nil
// We will get an "volume added" event soon, this is not a big error
klog.V(4).Infof("provisionClaimOperation [%s]: cannot update internal cache: %v", volume.Name, updateErr)
break
// Save failed, try again after a while.
klog.V(3).Infof("failed to save volume %q for claim %q: %v", volume.Name, claimToClaimKey(claim), err)
time.Sleep(ctrl.createProvisionedPVInterval)
if err != nil
strerr := fmt.Sprintf("Error creating provisioned PV object for claim %s: %v. Deleting the volume.", claimToClaimKey(claim), err)
klog.V(3).Info(strerr)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)
var deleteErr error
var deleted bool
for i := 0; i < ctrl.createProvisionedPVRetryCount; i++
_, deleted, deleteErr = ctrl.doDeleteVolume(volume)
if deleteErr == nil && deleted
// Delete succeeded
klog.V(4).Infof("provisionClaimOperation [%s]: cleaning volume %s succeeded", claimToClaimKey(claim), volume.Name)
break
if !deleted
klog.Errorf("Error finding internal deleter for volume plugin %q", plugin.GetPluginName())
break
// Delete failed, try again after a while.
klog.V(3).Infof("failed to delete volume %q: %v", volume.Name, deleteErr)
time.Sleep(ctrl.createProvisionedPVInterval)
if deleteErr != nil
strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), deleteErr)
klog.V(2).Info(strerr)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningCleanupFailed, strerr)
else
klog.V(2).Infof("volume %q provisioned for claim %q", volume.Name, claimToClaimKey(claim))
msg := fmt.Sprintf("Successfully provisioned volume %s using %s", volume.Name, plugin.GetPluginName())
ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.ProvisioningSucceeded, msg)
return pluginName, nil
provisionClaimOperation 的基本逻辑如下
- 检查driver,只有 csi 类型的 driver 才允许使用 dataSource 字段
- 为 pvc 加 claim.Annotations["volume.kubernetes.io/storage-provisioner"] = class.Provisioner annotation
- 根据规则拼出 pv Name = "pvc-" + pvc.UID
- 如果找到了 pv, 则说明 pv已经存在,跳过 provision
- 收集pvc/pv 基本信息封装到 options 中
- 对 plugin 进行校验, 如果plugin不支持mount操作,则直接拒绝provision 请求
- 调用plugin.NewProvisioner(options) 创建 provisioner, 接口实现了Provision(selectedNode *v1.Node, allowedTopologies []v1.TopologySelectorTerm) 方法,注意,该方法为同步方法
- Provision 方法返回了 PersistentVolume实例
- 为创建出来的 pv 关联 pvc 对象(ClaimRef),尝试创建 pv 对象 (重复多次)
- 如果创建 pv 失败,则尝试调用 Delete 方法删除创建的volume资源
syncBoundClaim
func (ctrl *PersistentVolumeController) syncBoundClaim(claim *v1.PersistentVolumeClaim) error
if claim.Spec.VolumeName == ""
// Claim was bound before but not any more.
if _, err := ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimLost", "Bound claim has lost reference to PersistentVolume. Data on the volume is lost!"); err != nil
return err
return nil
obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName)
if err != nil
return err
if !found
// Claim is bound to a non-existing volume.
if _, err = ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimLost", "Bound claim has lost its PersistentVolume. Data on the volume is lost!"); err != nil
return err
return nil
else
volume, ok := obj.(*v1.PersistentVolume)
if !ok
return fmt.Errorf("cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, obj)
klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: volume %q found: %s", claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume))
if volume.Spec.ClaimRef == nil
// Claim is bound but volume has come unbound.
// Or, a claim was bound and the controller has not received updated
// volume yet. We can't distinguish these cases.
// Bind the volume again and set all states to Bound.
klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: volume is unbound, fixing", claimToClaimKey(claim))
if err = ctrl.bind(volume, claim); err != nil
// Objects not saved, next syncPV or syncClaim will try again
return err
return nil
else if volume.Spec.ClaimRef.UID == claim.UID
// All is well
// NOTE: syncPV can handle this so it can be left out.
// NOTE: bind() call here will do nothing in most cases as
// everything should be already set.
klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: claim is already correctly bound", claimToClaimKey(claim))
if err = ctrl.bind(volume, claim); err != nil
// Objects not saved, next syncPV or syncClaim will try again
return err
return nil
else
// Claim is bound but volume has a different claimant.
// Set the claim phase to 'Lost', which is a terminal
// phase.
if _, err = ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimMisbound", "Two claims are bound to the same volume, this one is bound incorrectly"); err != nil
return err
return nil
1)如果 pvc.Spec.VolumeName 为空, 说明这个 pvc 之前被 bound 过,但是已经不存在指向的pv, 报出event并返回
2)从 cache 里面找 pvc 绑定的 pv
- 如果没找到, 说明 pvc 绑定了一个不存在的pv,报 event 并返回
-
如果找到了pv
-
- 检查 pv.Spec.ClaimRef 字段, 如果 为空,说明 pv 还没有绑定 pvc, 调用 ctrl.bind(volume, claim); 方法进行绑定
- pv.ClaimRef.UID == pvc.UID, 调用 bind 方法,但是大多数情况会直接返回(因为所有的操作都已经做完了)
- 其他情况说明 volume 绑定了其他的 pvc, 更新pvc 的状态 为 lost 并报出 event
四 总结
最后用一张 pvc/pv 的状态流转图来总结一下
本文为阿里云原创内容,未经允许不得转载。
以上是关于kubernetes pv-controller 解析的主要内容,如果未能解决你的问题,请参考以下文章
Kubernetes——Kubernetes基础+部署Kubernetes集群
Kubernetes - Kubernetes部署Kubernetes Dashbaord
Kubernetes——Kubernetes的介绍和使用 kubeadm方式搭建Kubernetes集群