kube-scheduler 磁盘调度源码分析

Posted CSDN云计算

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kube-scheduler 磁盘调度源码分析相关的知识,希望对你有一定的参考价值。

作者 | leadersnowy

来源 | CSDN博客

kube-scheduler介绍

首先我们知道,kube-scheduler的根本工作任务是根据各种调度算法将Pod调度到最合适的工作节点上

一、整个调度流程分为两个阶段:

1、预选(Predicates):

输入是所有节点,输出是满足预选条件的节点。kube-scheduler根据预选策略过滤掉不满足策略的Nodes。例如,如果某节点的资源不足或者不满足预选策略的条件如“Node的label必须与Pod的Selector一致”时则无法通过预选。

2、优选(Priorities):

输入是预选阶段筛选出的节点,优选会根据优先策略为通过预选的Nodes进行打分排名,选择得分最高的Node。例如,资源越富裕、负载越小的Node可能具有越高的排名。

而整个调度过程是有很多因素影响的,包括节点状态(cpu,内存,磁盘),节点与pod的亲和性,节点标签等,而我们本次只做与磁盘相关的源码分析。

源码分析

2.1、磁盘预选(Predicates)

这一部分核心代码在pkg/scheduler/framework/plugins/nodevolumelimits/这个目录下,分两个文件

csi.go,non_csi.go 两个文件的核心思想基本一致,都是通过Filter来限制一个节点上的最大盘的数量,这个最大数量是默认配置在k8s源码中的。csi的就是限制一个节点上最多能挂几块csi的volume,而non_csi的相对比较复杂,因为牵扯到in-tree插件跟flexvolume插件,而且对每一种in-tree插件都进行了分别的可挂载盘的数量的配置,下面我们对于non-csi的进行具体的源码分析。

话不多说,直接上代码:

2.1.1 nonCSILimits

type nonCSILimits struct 
  name           string
  filter         VolumeFilter
  volumeLimitKey v1.ResourceName
  maxVolumeFunc  func(node *v1.Node) int
  csiNodeLister  storagelisters.CSINodeLister
  pvLister       corelisters.PersistentVolumeLister
  pvcLister      corelisters.PersistentVolumeClaimLister
  scLister       storagelisters.StorageClassLister


  // The string below is generated randomly during the struct's initialization.
  // It is used to prefix volumeID generated inside the predicate() method to
  // avoid conflicts with any real volume.
  randomVolumeIDPrefix string

最核心的结构体nonCSILimits ,提供了几个成员

  1. name 顾名思义,每一个nonCSILimits结构体变量的名称

  2. filter VolumeFilter类型的变量,具体包括几个方法,FilterVolume(),FilterPersistentVolume(),MatchProvisioner()以及IsMigrated()具体调用我们后面再讲

  3. volumeLimitKey 其实就是一个string类型,指定了几种类型的key

  4. maxVolumeFunc() 一个获取该Limits的最大数量的方法

  5. csiNodeLister csinode的监听对象

  6. pvLister pv的监听对象

  7. pvcLister pvc的监听对象

  8. scLister sc的监听对象

  9. randomVolumeIDPrefix 一个string类型,用于生成唯一的pvID

2.1.2 nonCSILimits初始化

func newNonCSILimits(
  filterName string,
  csiNodeLister storagelisters.CSINodeLister,
  scLister storagelisters.StorageClassLister,
  pvLister corelisters.PersistentVolumeLister,
  pvcLister corelisters.PersistentVolumeClaimLister,
) framework.Plugin 
  var filter VolumeFilter
  var volumeLimitKey v1.ResourceName
  var name string


  switch filterName 
  case ebsVolumeFilterType:
    name = EBSName
    filter = ebsVolumeFilter
    volumeLimitKey = v1.ResourceName(volumeutil.EBSVolumeLimitKey)
  case gcePDVolumeFilterType:
    name = GCEPDName
    filter = gcePDVolumeFilter
    volumeLimitKey = v1.ResourceName(volumeutil.GCEVolumeLimitKey)
  case azureDiskVolumeFilterType:
    name = AzureDiskName
    filter = azureDiskVolumeFilter
    volumeLimitKey = v1.ResourceName(volumeutil.AzureVolumeLimitKey)
  case indiskVolumeFilterType:
    name = IndiskName
    filter = indiskVolumeFilter
    volumeLimitKey = v1.ResourceName(volumeutil.IndiskVolumeLimitKey)
  default:
    klog.Fatalf("Wrong filterName, Only Support %v %v %v %v %v", ebsVolumeFilterType,
      gcePDVolumeFilterType, azureDiskVolumeFilterType, cinderVolumeFilterType, indiskVolumeFilterType)
    return nil
  
  pl := &nonCSILimits
    name:                 name,
    filter:               filter,
    volumeLimitKey:       volumeLimitKey,
    maxVolumeFunc:        getMaxVolumeFunc(filterName),
    csiNodeLister:        csiNodeLister,
    pvLister:             pvLister,
    pvcLister:            pvcLister,
    scLister:             scLister,
    randomVolumeIDPrefix: rand.String(32),
  


  return pl

初始化时引用了大量的常量

const (
  // defaultMaxGCEPDVolumes defines the maximum number of PD Volumes for GCE.
  // GCE instances can have up to 16 PD volumes attached.
  defaultMaxGCEPDVolumes = 16
  // defaultMaxAzureDiskVolumes defines the maximum number of PD Volumes for Azure.
  // Larger Azure VMs can actually have much more disks attached.
  // TODO We should determine the max based on VM size
  defaultMaxAzureDiskVolumes = 16
  // ebsVolumeFilterType defines the filter name for ebsVolumeFilter.
  ebsVolumeFilterType = "EBS"
  // gcePDVolumeFilterType defines the filter name for gcePDVolumeFilter.
  gcePDVolumeFilterType = "GCE"
  // azureDiskVolumeFilterType defines the filter name for azureDiskVolumeFilter.
  azureDiskVolumeFilterType = "AzureDisk"
  // cinderVolumeFilterType defines the filter name for cinderVolumeFilter.
  indiskVolumeFilterType = "Indisk"
  // ErrReasonMaxVolumeCountExceeded is used for MaxVolumeCount predicate error.
  ErrReasonMaxVolumeCountExceeded = "node(s) exceed max volume count"
  // KubeMaxPDVols defines the maximum number of PD Volumes per kubelet.
  KubeMaxPDVols = "KUBE_MAX_PD_VOLS"
  // MaxIndiskVolumes defines the maximum number of Indisk Volumes per node.
  MaxIndiskVolumes = "MAX_INDISK_VOLUMES"
  // DefaultIndiskVolumes defines the default number of Indisk Volumes per node.
  DefaultIndiskVolumes = "DEFAULT_INDISK_VOLUMES"
)

这些常量包括各intree存储类型的名字,以及默认的最大volume数量等信息,在nonCSILimits初始化时进行赋值。

除了这些常量之外,最关键的就是filter方法的初始化,看一下cinder的例子,是怎么区分这个volume是cinder的volume的。

var cinderVolumeFilter = VolumeFilter
  FilterVolume: func(vol *v1.Volume) (string, bool) 
    if vol.Cinder != nil 
      return vol.Cinder.VolumeID, true
    
    return "", false
  ,


  FilterPersistentVolume: func(pv *v1.PersistentVolume) (string, bool) 
    if pv.Spec.Cinder != nil 
      return pv.Spec.Cinder.VolumeID, true
    
    return "", false
  ,


  MatchProvisioner: func(sc *storage.StorageClass) (relevant bool) 
    if sc.Provisioner == csilibplugins.CinderInTreePluginName 
      return true
    
    return false
  ,


  IsMigrated: func(csiNode *storage.CSINode) bool 
    return isCSIMigrationOn(csiNode, csilibplugins.CinderInTreePluginName)
  ,

可以看到,filer其实是通过对元数据的字段(vol.Cinder,pv.Spec.Cinder,sc.Provisioner)等进行判断来分辨是不是本类型的volume的。

如果不是intree的,可以通过其它的字段来进行过滤

Flexvolume的我们可以用这些字段:

vol.FlexVolume.Driver,pv.Spec.FlexVolume.Driver

CSI的我们可以用这些字段:

vol.CSI.Driver,pv.Spec.CSI.Driver

2.1.3 核心方法Filter

// Filter invoked at the filter extension point.
func (pl *nonCSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status 
  // If a pod doesn't have any volume attached to it, the predicate will always be true.
  // Thus we make a fast path for it, to avoid unnecessary computations in this case.
  if len(pod.Spec.Volumes) == 0 
    return nil
  


  newVolumes := make(map[string]bool)
  if err := pl.filterVolumes(pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil 
    return framework.NewStatus(framework.Error, err.Error())
  


  // quick return
  if len(newVolumes) == 0 
    return nil
  


  node := nodeInfo.Node()
  if node == nil 
    return framework.NewStatus(framework.Error, fmt.Sprintf("node not found"))
  


  var csiNode *storage.CSINode
  var err error
  if pl.csiNodeLister != nil 
    csiNode, err = pl.csiNodeLister.Get(node.Name)
    if err != nil 
      // we don't fail here because the CSINode object is only necessary
      // for determining whether the migration is enabled or not
      klog.V(5).Infof("Could not get a CSINode object for the node: %v", err)
    
  


  // if a plugin has been migrated to a CSI driver, defer to the CSI predicate
  if pl.filter.IsMigrated(csiNode) 
    return nil
  


  // count unique volumes
  existingVolumes := make(map[string]bool)
  for _, existingPod := range nodeInfo.Pods 
    if err := pl.filterVolumes(existingPod.Pod.Spec.Volumes, existingPod.Pod.Namespace, existingVolumes); err != nil 
      return framework.NewStatus(framework.Error, err.Error())
    
  
  numExistingVolumes := len(existingVolumes)


  // filter out already-mounted volumes
  for k := range existingVolumes 
    delete(newVolumes, k)
  


  numNewVolumes := len(newVolumes)
  maxAttachLimit := pl.maxVolumeFunc(node)
  volumeLimits := volumeLimits(nodeInfo)
  if maxAttachLimitFromAllocatable, ok := volumeLimits[pl.volumeLimitKey]; ok 
    maxAttachLimit = int(maxAttachLimitFromAllocatable)
  


  if numExistingVolumes+numNewVolumes > maxAttachLimit 
    return framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded)
  
  if nodeInfo != nil && nodeInfo.TransientInfo != nil && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) 
    nodeInfo.TransientInfo.TransientLock.Lock()
    defer nodeInfo.TransientInfo.TransientLock.Unlock()
    nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount = maxAttachLimit - numExistingVolumes
    nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes = numNewVolumes
  
  return nil

接下来看一下核心代码的逻辑

最开始是一些校验,首先校验这个pod有没有使用volume,再校验这个pod使用的volume是什么类型的volume,然后由对应类型的volume的filter进行处理。

看一下最核心的处理流程

numExistingVolumes := len(existingVolumes)


  // filter out already-mounted volumes
  for k := range existingVolumes 
    delete(newVolumes, k)
  
    numNewVolumes := len(newVolumes)
  maxAttachLimit := pl.maxVolumeFunc(node)
  volumeLimits := volumeLimits(nodeInfo)
  if maxAttachLimitFromAllocatable, ok := volumeLimits[pl.volumeLimitKey]; ok 
    maxAttachLimit = int(maxAttachLimitFromAllocatable)
  


  if numExistingVolumes+numNewVolumes > maxAttachLimit 
    return framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded)
  
  if nodeInfo != nil && nodeInfo.TransientInfo != nil && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) 
    nodeInfo.TransientInfo.TransientLock.Lock()
    defer nodeInfo.TransientInfo.TransientLock.Unlock()
    nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount = maxAttachLimit - numExistingVolumes
    nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes = numNewVolumes
  
  return nil

逻辑非常清楚

  1. numExistingVolumes 表示该节点上已经有的volume数

  2. numNewVolumes是该pod需要创建的volume数

  3. maxAttachLimit 是该类型的volume在节点上所能创建的最大volume数

  4. 如果 numExistingVolumes+numNewVolumes > maxAttachLimit 则该节点不可调度

  5. 如果可以调度,则return nil,然后释放资源

到此,预选阶段的工作做完。

这里还需要注意一个地方

nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount = maxAttachLimit - numExistingVolumes
    nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes = numNewVolumes

AllocatableVolumesCount,RequestedVolumes这两个变量的赋值是给后面进行优选打分的时候用的

2.2 磁盘优选(Priorities)

优选打分的条件也很多,这里我们只处理跟存储相关的,具体源码在

pkg\\scheduler\\framework\\plugins\\noderesources\\resource_allocation.go

pkg\\scheduler\\framework\\plugins\\noderesources\\least_allocated.go

2.2.1 resource_allocation

resource_allocation 字体意思就可以理解,就是资源分配,而资源分配有多种方,比如least_allocated,most_allocated,requested_to_capacity_ratio都是资源分配的方法,而resource_allocation只是提供一个score方法,代码如下:

func (r *resourceAllocationScorer) score(
  pod *v1.Pod,
  nodeInfo *framework.NodeInfo) (int64, *framework.Status) 
  node := nodeInfo.Node()
  if node == nil 
    return 0, framework.NewStatus(framework.Error, "node not found")
  
  if r.resourceToWeightMap == nil 
    return 0, framework.NewStatus(framework.Error, "resources not found")
  
  requested := make(resourceToValueMap, len(r.resourceToWeightMap))
  allocatable := make(resourceToValueMap, len(r.resourceToWeightMap))
  for resource := range r.resourceToWeightMap 
    allocatable[resource], requested[resource] = calculateResourceAllocatableRequest(nodeInfo, pod, resource)
  
  var score int64


  // Check if the pod has volumes and this could be added to scorer function for balanced resource allocation.
  if len(pod.Spec.Volumes) > 0 && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && nodeInfo.TransientInfo != nil 
    score = r.scorer(requested, allocatable, true, nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes, nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount)
   else 
    score = r.scorer(requested, allocatable, false, 0, 0)
  
  if klog.V(10).Enabled() 
    if len(pod.Spec.Volumes) > 0 && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && nodeInfo.TransientInfo != nil 
      klog.Infof(
        "%v -> %v: %v, map of allocatable resources %v, map of requested resources %v , allocatable volumes %d, requested volumes %d, score %d",
        pod.Name, node.Name, r.Name,
        allocatable, requested, nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount,
        nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes,
        score,
      )
     else 
      klog.Infof(
        "%v -> %v: %v, map of allocatable resources %v, map of requested resources %v ,score %d,",
        pod.Name, node.Name, r.Name,
        allocatable, requested, score,
      )


    
  
  return score, nil

核心代码就一句

if len(pod.Spec.Volumes) > 0 && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && nodeInfo.TransientInfo != nil 
    score = r.scorer(requested, allocatable, true, nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes, nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount)
   else 
    score = r.scorer(requested, allocatable, false, 0, 0)
  

如果pod有volume并且BalanceAttachedNodeVolumes这个feature打开了,并且节点有TransientInfo

那么就走存储相关的打分,否则就不走。

我们看一下r.scorer的参数的后两个,就是我们在预选阶段最后赋值的两个参数

AllocatableVolumesCount 表示还可以创建的volume数量

RequestedVolumes 表示该pod需要的volume数量

2.2.2 least_allocated

least_allocated代码最少资源优先,也就是节点上资源越少,分越高

直接看源码

type resourceAllocationScorer struct 
   Name                string
   scorer              func(requested, allocable resourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64
   resourceToWeightMap resourceToWeightMap



func leastResourceScorer(resToWeightMap resourceToWeightMap) func(resourceToValueMap, resourceToValueMap, bool, int, int) int64 
   return func(requested, allocable resourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 
     var nodeScore, weightSum int64
     for resource, weight := range resToWeightMap 
       resourceScore := leastRequestedScore(requested[resource], allocable[resource])
       nodeScore += resourceScore * weight
       weightSum += weight
     
     return nodeScore / weightSum
   



// The unused capacity is calculated on a scale of 0-MaxNodeScore
// 0 being the lowest priority and `MaxNodeScore` being the highest.
// The more unused resources the higher the score is.
func leastRequestedScore(requested, capacity int64) int64 
   if capacity == 0 
     return 0
   
   if requested > capacity 
     return 0
   


   return ((capacity - requested) * int64(framework.MaxNodeScore)) / capacity

我们来看核心算法

capacity表示还可以剩余资源数量

requested表示该pod需求资源数量

比如:capacity=10,requested=3,framework.MaxNodeScorel默认是100

那么得分就是 (10-3)*100/10=70

但是我们看到 leastResourceScorer中并没有引用存储的部分,所以我们可以手动添加上

if includeVolumes && allocatableVolumes - requestedVolumes > 0 && allocatableVolumes > 0 
      nodeScore += int64(((allocatableVolumes - requestedVolumes) * int(framework.MaxNodeScore)) / allocatableVolumes)
      weightSum += 1
    

至此,kube-scheduler存储部分代码解读。

往期推荐

从 40% 跌至 4%,“糊”了的 Firefox 还能重回巅峰吗?

Gartner 发布 2022 年汽车行业五大技术趋势

使用这个库,让你的服务操作 Redis 速度飞起

漫画:什么是“低代码”开发平台?

点分享

点收藏

点点赞

点在看

以上是关于kube-scheduler 磁盘调度源码分析的主要内容,如果未能解决你的问题,请参考以下文章

k8s调度器启动流程分析 | 视频文字稿

k8s调度器启动流程分析 | 视频文字稿

kubernetes集群之调度系统

云原生 | kubernetes- 资源调度

关于Kubernetes中kube-scheduler的一些笔记

Kubernetes 调度器调度策略分析