kube-scheduler 磁盘调度源码分析
Posted CSDN云计算
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kube-scheduler 磁盘调度源码分析相关的知识,希望对你有一定的参考价值。
作者 | leadersnowy
来源 | CSDN博客
kube-scheduler介绍
首先我们知道,kube-scheduler的根本工作任务是根据各种调度算法将Pod调度到最合适的工作节点上
一、整个调度流程分为两个阶段:
1、预选(Predicates):
输入是所有节点,输出是满足预选条件的节点。kube-scheduler根据预选策略过滤掉不满足策略的Nodes。例如,如果某节点的资源不足或者不满足预选策略的条件如“Node的label必须与Pod的Selector一致”时则无法通过预选。
2、优选(Priorities):
输入是预选阶段筛选出的节点,优选会根据优先策略为通过预选的Nodes进行打分排名,选择得分最高的Node。例如,资源越富裕、负载越小的Node可能具有越高的排名。
而整个调度过程是有很多因素影响的,包括节点状态(cpu,内存,磁盘),节点与pod的亲和性,节点标签等,而我们本次只做与磁盘相关的源码分析。
源码分析
2.1、磁盘预选(Predicates)
这一部分核心代码在pkg/scheduler/framework/plugins/nodevolumelimits/这个目录下,分两个文件
csi.go,non_csi.go 两个文件的核心思想基本一致,都是通过Filter来限制一个节点上的最大盘的数量,这个最大数量是默认配置在k8s源码中的。csi的就是限制一个节点上最多能挂几块csi的volume,而non_csi的相对比较复杂,因为牵扯到in-tree插件跟flexvolume插件,而且对每一种in-tree插件都进行了分别的可挂载盘的数量的配置,下面我们对于non-csi的进行具体的源码分析。
话不多说,直接上代码:
2.1.1 nonCSILimits
type nonCSILimits struct
name string
filter VolumeFilter
volumeLimitKey v1.ResourceName
maxVolumeFunc func(node *v1.Node) int
csiNodeLister storagelisters.CSINodeLister
pvLister corelisters.PersistentVolumeLister
pvcLister corelisters.PersistentVolumeClaimLister
scLister storagelisters.StorageClassLister
// The string below is generated randomly during the struct's initialization.
// It is used to prefix volumeID generated inside the predicate() method to
// avoid conflicts with any real volume.
randomVolumeIDPrefix string
最核心的结构体nonCSILimits ,提供了几个成员
name 顾名思义,每一个nonCSILimits结构体变量的名称
filter VolumeFilter类型的变量,具体包括几个方法,FilterVolume(),FilterPersistentVolume(),MatchProvisioner()以及IsMigrated()具体调用我们后面再讲
volumeLimitKey 其实就是一个string类型,指定了几种类型的key
maxVolumeFunc() 一个获取该Limits的最大数量的方法
csiNodeLister csinode的监听对象
pvLister pv的监听对象
pvcLister pvc的监听对象
scLister sc的监听对象
randomVolumeIDPrefix 一个string类型,用于生成唯一的pvID
2.1.2 nonCSILimits初始化
func newNonCSILimits(
filterName string,
csiNodeLister storagelisters.CSINodeLister,
scLister storagelisters.StorageClassLister,
pvLister corelisters.PersistentVolumeLister,
pvcLister corelisters.PersistentVolumeClaimLister,
) framework.Plugin
var filter VolumeFilter
var volumeLimitKey v1.ResourceName
var name string
switch filterName
case ebsVolumeFilterType:
name = EBSName
filter = ebsVolumeFilter
volumeLimitKey = v1.ResourceName(volumeutil.EBSVolumeLimitKey)
case gcePDVolumeFilterType:
name = GCEPDName
filter = gcePDVolumeFilter
volumeLimitKey = v1.ResourceName(volumeutil.GCEVolumeLimitKey)
case azureDiskVolumeFilterType:
name = AzureDiskName
filter = azureDiskVolumeFilter
volumeLimitKey = v1.ResourceName(volumeutil.AzureVolumeLimitKey)
case indiskVolumeFilterType:
name = IndiskName
filter = indiskVolumeFilter
volumeLimitKey = v1.ResourceName(volumeutil.IndiskVolumeLimitKey)
default:
klog.Fatalf("Wrong filterName, Only Support %v %v %v %v %v", ebsVolumeFilterType,
gcePDVolumeFilterType, azureDiskVolumeFilterType, cinderVolumeFilterType, indiskVolumeFilterType)
return nil
pl := &nonCSILimits
name: name,
filter: filter,
volumeLimitKey: volumeLimitKey,
maxVolumeFunc: getMaxVolumeFunc(filterName),
csiNodeLister: csiNodeLister,
pvLister: pvLister,
pvcLister: pvcLister,
scLister: scLister,
randomVolumeIDPrefix: rand.String(32),
return pl
初始化时引用了大量的常量
const (
// defaultMaxGCEPDVolumes defines the maximum number of PD Volumes for GCE.
// GCE instances can have up to 16 PD volumes attached.
defaultMaxGCEPDVolumes = 16
// defaultMaxAzureDiskVolumes defines the maximum number of PD Volumes for Azure.
// Larger Azure VMs can actually have much more disks attached.
// TODO We should determine the max based on VM size
defaultMaxAzureDiskVolumes = 16
// ebsVolumeFilterType defines the filter name for ebsVolumeFilter.
ebsVolumeFilterType = "EBS"
// gcePDVolumeFilterType defines the filter name for gcePDVolumeFilter.
gcePDVolumeFilterType = "GCE"
// azureDiskVolumeFilterType defines the filter name for azureDiskVolumeFilter.
azureDiskVolumeFilterType = "AzureDisk"
// cinderVolumeFilterType defines the filter name for cinderVolumeFilter.
indiskVolumeFilterType = "Indisk"
// ErrReasonMaxVolumeCountExceeded is used for MaxVolumeCount predicate error.
ErrReasonMaxVolumeCountExceeded = "node(s) exceed max volume count"
// KubeMaxPDVols defines the maximum number of PD Volumes per kubelet.
KubeMaxPDVols = "KUBE_MAX_PD_VOLS"
// MaxIndiskVolumes defines the maximum number of Indisk Volumes per node.
MaxIndiskVolumes = "MAX_INDISK_VOLUMES"
// DefaultIndiskVolumes defines the default number of Indisk Volumes per node.
DefaultIndiskVolumes = "DEFAULT_INDISK_VOLUMES"
)
这些常量包括各intree存储类型的名字,以及默认的最大volume数量等信息,在nonCSILimits初始化时进行赋值。
除了这些常量之外,最关键的就是filter方法的初始化,看一下cinder的例子,是怎么区分这个volume是cinder的volume的。
var cinderVolumeFilter = VolumeFilter
FilterVolume: func(vol *v1.Volume) (string, bool)
if vol.Cinder != nil
return vol.Cinder.VolumeID, true
return "", false
,
FilterPersistentVolume: func(pv *v1.PersistentVolume) (string, bool)
if pv.Spec.Cinder != nil
return pv.Spec.Cinder.VolumeID, true
return "", false
,
MatchProvisioner: func(sc *storage.StorageClass) (relevant bool)
if sc.Provisioner == csilibplugins.CinderInTreePluginName
return true
return false
,
IsMigrated: func(csiNode *storage.CSINode) bool
return isCSIMigrationOn(csiNode, csilibplugins.CinderInTreePluginName)
,
可以看到,filer其实是通过对元数据的字段(vol.Cinder,pv.Spec.Cinder,sc.Provisioner)等进行判断来分辨是不是本类型的volume的。
如果不是intree的,可以通过其它的字段来进行过滤
Flexvolume的我们可以用这些字段:
vol.FlexVolume.Driver,pv.Spec.FlexVolume.Driver
CSI的我们可以用这些字段:
vol.CSI.Driver,pv.Spec.CSI.Driver
2.1.3 核心方法Filter
// Filter invoked at the filter extension point.
func (pl *nonCSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status
// If a pod doesn't have any volume attached to it, the predicate will always be true.
// Thus we make a fast path for it, to avoid unnecessary computations in this case.
if len(pod.Spec.Volumes) == 0
return nil
newVolumes := make(map[string]bool)
if err := pl.filterVolumes(pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil
return framework.NewStatus(framework.Error, err.Error())
// quick return
if len(newVolumes) == 0
return nil
node := nodeInfo.Node()
if node == nil
return framework.NewStatus(framework.Error, fmt.Sprintf("node not found"))
var csiNode *storage.CSINode
var err error
if pl.csiNodeLister != nil
csiNode, err = pl.csiNodeLister.Get(node.Name)
if err != nil
// we don't fail here because the CSINode object is only necessary
// for determining whether the migration is enabled or not
klog.V(5).Infof("Could not get a CSINode object for the node: %v", err)
// if a plugin has been migrated to a CSI driver, defer to the CSI predicate
if pl.filter.IsMigrated(csiNode)
return nil
// count unique volumes
existingVolumes := make(map[string]bool)
for _, existingPod := range nodeInfo.Pods
if err := pl.filterVolumes(existingPod.Pod.Spec.Volumes, existingPod.Pod.Namespace, existingVolumes); err != nil
return framework.NewStatus(framework.Error, err.Error())
numExistingVolumes := len(existingVolumes)
// filter out already-mounted volumes
for k := range existingVolumes
delete(newVolumes, k)
numNewVolumes := len(newVolumes)
maxAttachLimit := pl.maxVolumeFunc(node)
volumeLimits := volumeLimits(nodeInfo)
if maxAttachLimitFromAllocatable, ok := volumeLimits[pl.volumeLimitKey]; ok
maxAttachLimit = int(maxAttachLimitFromAllocatable)
if numExistingVolumes+numNewVolumes > maxAttachLimit
return framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded)
if nodeInfo != nil && nodeInfo.TransientInfo != nil && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes)
nodeInfo.TransientInfo.TransientLock.Lock()
defer nodeInfo.TransientInfo.TransientLock.Unlock()
nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount = maxAttachLimit - numExistingVolumes
nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes = numNewVolumes
return nil
接下来看一下核心代码的逻辑
最开始是一些校验,首先校验这个pod有没有使用volume,再校验这个pod使用的volume是什么类型的volume,然后由对应类型的volume的filter进行处理。
看一下最核心的处理流程
numExistingVolumes := len(existingVolumes)
// filter out already-mounted volumes
for k := range existingVolumes
delete(newVolumes, k)
numNewVolumes := len(newVolumes)
maxAttachLimit := pl.maxVolumeFunc(node)
volumeLimits := volumeLimits(nodeInfo)
if maxAttachLimitFromAllocatable, ok := volumeLimits[pl.volumeLimitKey]; ok
maxAttachLimit = int(maxAttachLimitFromAllocatable)
if numExistingVolumes+numNewVolumes > maxAttachLimit
return framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded)
if nodeInfo != nil && nodeInfo.TransientInfo != nil && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes)
nodeInfo.TransientInfo.TransientLock.Lock()
defer nodeInfo.TransientInfo.TransientLock.Unlock()
nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount = maxAttachLimit - numExistingVolumes
nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes = numNewVolumes
return nil
逻辑非常清楚
numExistingVolumes 表示该节点上已经有的volume数
numNewVolumes是该pod需要创建的volume数
maxAttachLimit 是该类型的volume在节点上所能创建的最大volume数
如果 numExistingVolumes+numNewVolumes > maxAttachLimit 则该节点不可调度
如果可以调度,则return nil,然后释放资源
到此,预选阶段的工作做完。
这里还需要注意一个地方
nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount = maxAttachLimit - numExistingVolumes
nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes = numNewVolumes
AllocatableVolumesCount,RequestedVolumes这两个变量的赋值是给后面进行优选打分的时候用的
2.2 磁盘优选(Priorities)
优选打分的条件也很多,这里我们只处理跟存储相关的,具体源码在
pkg\\scheduler\\framework\\plugins\\noderesources\\resource_allocation.go
pkg\\scheduler\\framework\\plugins\\noderesources\\least_allocated.go
2.2.1 resource_allocation
resource_allocation 字体意思就可以理解,就是资源分配,而资源分配有多种方,比如least_allocated,most_allocated,requested_to_capacity_ratio都是资源分配的方法,而resource_allocation只是提供一个score方法,代码如下:
func (r *resourceAllocationScorer) score(
pod *v1.Pod,
nodeInfo *framework.NodeInfo) (int64, *framework.Status)
node := nodeInfo.Node()
if node == nil
return 0, framework.NewStatus(framework.Error, "node not found")
if r.resourceToWeightMap == nil
return 0, framework.NewStatus(framework.Error, "resources not found")
requested := make(resourceToValueMap, len(r.resourceToWeightMap))
allocatable := make(resourceToValueMap, len(r.resourceToWeightMap))
for resource := range r.resourceToWeightMap
allocatable[resource], requested[resource] = calculateResourceAllocatableRequest(nodeInfo, pod, resource)
var score int64
// Check if the pod has volumes and this could be added to scorer function for balanced resource allocation.
if len(pod.Spec.Volumes) > 0 && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && nodeInfo.TransientInfo != nil
score = r.scorer(requested, allocatable, true, nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes, nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount)
else
score = r.scorer(requested, allocatable, false, 0, 0)
if klog.V(10).Enabled()
if len(pod.Spec.Volumes) > 0 && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && nodeInfo.TransientInfo != nil
klog.Infof(
"%v -> %v: %v, map of allocatable resources %v, map of requested resources %v , allocatable volumes %d, requested volumes %d, score %d",
pod.Name, node.Name, r.Name,
allocatable, requested, nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount,
nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes,
score,
)
else
klog.Infof(
"%v -> %v: %v, map of allocatable resources %v, map of requested resources %v ,score %d,",
pod.Name, node.Name, r.Name,
allocatable, requested, score,
)
return score, nil
核心代码就一句
if len(pod.Spec.Volumes) > 0 && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && nodeInfo.TransientInfo != nil
score = r.scorer(requested, allocatable, true, nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes, nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount)
else
score = r.scorer(requested, allocatable, false, 0, 0)
如果pod有volume并且BalanceAttachedNodeVolumes这个feature打开了,并且节点有TransientInfo
那么就走存储相关的打分,否则就不走。
我们看一下r.scorer的参数的后两个,就是我们在预选阶段最后赋值的两个参数
AllocatableVolumesCount 表示还可以创建的volume数量
RequestedVolumes 表示该pod需要的volume数量
2.2.2 least_allocated
least_allocated代码最少资源优先,也就是节点上资源越少,分越高
直接看源码
type resourceAllocationScorer struct
Name string
scorer func(requested, allocable resourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64
resourceToWeightMap resourceToWeightMap
func leastResourceScorer(resToWeightMap resourceToWeightMap) func(resourceToValueMap, resourceToValueMap, bool, int, int) int64
return func(requested, allocable resourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64
var nodeScore, weightSum int64
for resource, weight := range resToWeightMap
resourceScore := leastRequestedScore(requested[resource], allocable[resource])
nodeScore += resourceScore * weight
weightSum += weight
return nodeScore / weightSum
// The unused capacity is calculated on a scale of 0-MaxNodeScore
// 0 being the lowest priority and `MaxNodeScore` being the highest.
// The more unused resources the higher the score is.
func leastRequestedScore(requested, capacity int64) int64
if capacity == 0
return 0
if requested > capacity
return 0
return ((capacity - requested) * int64(framework.MaxNodeScore)) / capacity
我们来看核心算法
capacity表示还可以剩余资源数量
requested表示该pod需求资源数量
比如:capacity=10,requested=3,framework.MaxNodeScorel默认是100
那么得分就是 (10-3)*100/10=70
但是我们看到 leastResourceScorer中并没有引用存储的部分,所以我们可以手动添加上
if includeVolumes && allocatableVolumes - requestedVolumes > 0 && allocatableVolumes > 0
nodeScore += int64(((allocatableVolumes - requestedVolumes) * int(framework.MaxNodeScore)) / allocatableVolumes)
weightSum += 1
至此,kube-scheduler存储部分代码解读。
往期推荐
从 40% 跌至 4%,“糊”了的 Firefox 还能重回巅峰吗?
点分享
点收藏
点点赞
点在看
以上是关于kube-scheduler 磁盘调度源码分析的主要内容,如果未能解决你的问题,请参考以下文章