基于kubernetes调度框架的自定义调度器实现

Posted 琦彦

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于kubernetes调度框架的自定义调度器实现相关的知识,希望对你有一定的参考价值。

文章目录

基于kubernetes调度框架的自定义调度器实现

kube-scheduler 是 kubernetes 的核心组件之一,主要负责整个集群资源的调度功能,根据特定的调度算法和策略,将 Pod 调度到最优的工作节点上面去,从而更加合理、更加充分的利用集群的资源,这也是我们选择使用 kubernetes 一个非常重要的理由。如果一门新的技术不能帮助企业节约成本、提供效率,我相信是很难推进的。

调度流程

默认情况下,kube-scheduler 提供的默认调度器能够满足我们绝大多数的要求,我们前面和大家接触的示例也基本上用的默认的策略,都可以保证我们的 Pod 可以被分配到资源充足的节点上运行。但是在实际的线上项目中,可能我们自己会比 kubernetes 更加了解我们自己的应用,比如我们希望一个 Pod 只能运行在特定的几个节点上,或者这几个节点只能用来运行特定类型的应用,这就需要我们的调度器能够可控。

kube-scheduler 的主要作用就是根据特定的调度算法和调度策略将 Pod 调度到合适的 Node 节点上去,是一个独立的二进制程序,启动之后会一直监听 API Server,获取到 PodSpec.NodeName 为空的 Pod,对每个 Pod 都会创建一个 binding。

kube-scheduler structrue

这个过程在我们看来好像比较简单,但在实际的生产环境中,需要考虑的问题就有很多了:

  • 如何保证全部的节点调度的公平性?要知道并不是所有节点资源配置一定都是一样的

  • 如何保证每个节点都能被分配资源?

  • 集群资源如何能够被高效利用?

  • 集群资源如何才能被最大化使用?

  • 如何保证 Pod 调度的性能和效率?

  • 用户是否可以根据自己的实际需求定制自己的调度策略?

考虑到实际环境中的各种复杂情况,kubernetes 的调度器采用插件化的形式实现,可以方便用户进行定制或者二次开发,我们可以自定义一个调度器并以插件形式和 kubernetes 进行集成。

kubernetes 调度器的源码位于 kubernetes/pkg/scheduler 中,大体的代码目录结构如下所示:(不同的版本目录结构可能不太一样)

kubernetes/pkg/scheduler
-- scheduler.go         //调度相关的具体实现
|-- algorithm
|   |-- predicates      //节点筛选策略
|   |-- priorities      //节点打分策略
|-- algorithmprovider
|   |-- defaults         //定义默认的调度器

其中 Scheduler 创建和运行的核心程序,对应的代码在 pkg/scheduler/scheduler.go,如果要查看kube-scheduler 的入口程序,对应的代码在 cmd/kube-scheduler/scheduler.go

自定义调度器

一般来说,我们有4种扩展 Kubernetes 调度器的方法。

  1. 一种方法就是直接 clone 官方的 kube-scheduler 源代码,在合适的位置直接修改代码,然后重新编译运行修改后的程序,当然这种方法是最不建议使用的,也不实用,因为需要花费大量额外的精力来和上游的调度程序更改保持一致。

  2. 第二种方法就是和默认的调度程序一起运行独立的调度程序,默认的调度器和我们自定义的调度器可以通过 Pod 的 spec.schedulerName 来覆盖各自的 Pod,默认是使用 default 默认的调度器,但是多个调度程序共存的情况下也比较麻烦,比如当多个调度器将 Pod 调度到同一个节点的时候,可能会遇到一些问题,

    1. 因为很有可能两个调度器都同时将两个 Pod 调度到同一个节点上去,但是很有可能其中一个 Pod 运行后其实资源就消耗完了;
    2. 并且维护一个高质量的自定义调度程序也不是很容易的,因为我们需要全面了解默认的调度程序,整体 Kubernetes 的架构知识以及各种 Kubernetes API 对象的各种关系或限制。
  3. 第三种方法是调度器扩展程序,这个方案目前是一个可行的方案,可以和上游调度程序兼容,所谓的调度器扩展程序其实就是一个可配置的 Webhook 而已,里面包含 过滤器优先级 两个端点,分别对应调度周期中的两个主要阶段(过滤和打分)。在我们现在的 v1.16 版本中上面的 调度器扩展程序 也已经被废弃了,2022年4月1日后也被移除了。

Kubernetes 开始只提供了 Extender ,通过部署一个 Web 服务实现无侵入式扩展 scheduler插件,但其存在以下几个问题:

  1. **Extender 扩展点的数量是有限的:**在调度期间只有“Filter”和“Prioritize”扩展点。 “Preempt”扩展点在运行默认抢占机制后被调用。“Bind”扩展点用于绑定Pod,且 Bind 扩展点只能绑定一个扩展程序,扩展点会替换掉调度器的 bind 操作。Extender 不能在其他点被调用,例如,不能在运行 predicate 函数之前被调用。
  2. **性能问题:**对扩展程序的每次调用都涉及 JSON 的编解码。调用 webhook(HTTP 请求)也比调用原生函数慢。
  3. **调度器无法通知 Extender 已中止 Pod 的调度。**例如,如果一个 Extender 提供了一个集群资源,而调度器联系了 Extender 并要求它为正在调度的 pod 提供资源的实例,然后调度器在调度 pod 时遇到错误并决定中止调度,那么将很难与扩展程序沟通错误并要求它撤消资源的配置。
  4. 由于当前的Extender作为一个单独的进程运行,因此不能使用调度器的缓存。要么从 API Server构建自己的缓存,要么只处理他们从调度器接收到的信息。
  1. 第四种方法是通过调度框架(Scheduling Framework),Kubernetes v1.15 版本中引入了可插拔架构的调度框架,使得定制调度器这个任务变得更加的容易。调库框架向现有的调度器中添加了一组插件化的 API,该 API 在保持调度程序“核心”简单且易于维护的同时,使得大部分的调度功能以插件的形式存在,而且在我们现在的 v1.16 版本中上面的 调度器扩展程序 也已经被废弃了,2022年4月1日后也被移除了,所以以后调度框架才是自定义调度器的核心方式

    Scheduling Framework 调度框架是一组新的“插件”API,通过这些 API 允许将许多调度功能使用插件实现,同时保持调度器“核心”简单和可维护。调度器插件必须用 Go 编写,并使用 Kubernetes 调度器代码编译,有一定学习成本。

这里我们可以简单介绍下调度框架(Scheduling Framework)方式的实现。

调度框架(Scheduling Framework)

调度框架定义了一组扩展点,用户可以实现扩展点定义的接口来定义自己的调度逻辑(我们称之为扩展),并将扩展注册到扩展点上,调度框架在执行调度工作流时,遇到对应的扩展点时,将调用用户注册的扩展。调度框架在预留扩展点时,都是有特定的目的,有些扩展点上的扩展可以改变调度程序的决策方法,有些扩展点上的扩展只是发送一个通知。

设计目标

  1. 使调度器更具可扩展性。
  2. 通过将调度器核心的一些功能移至插件,使调度器核心更简单。
  3. 在框架中提出扩展点。
  4. 提出一种机制来接收插件结果并根据收到的结果继续或中止。
  5. 提出一种机制来处理错误并与插件进行通信。

调度过程和绑定过程

我们知道每当调度一个 Pod 时,都会按照两个过程来执行:调度过程和绑定过程。

调度过程为 Pod 选择一个合适的节点,绑定过程则将调度过程的决策应用到集群中(也就是在被选定的节点上运行 Pod),将调度过程和绑定过程合在一起,称之为调度上下文(scheduling context)。需要注意的是调度过程是同步运行的(同一时间点只为一个 Pod 进行调度),绑定过程可异步运行(同一时间点可并发为多个 Pod 执行绑定)。

调度过程和绑定过程遇到如下情况时会中途退出:

  • 调度程序认为当前没有该 Pod 的可选节点

  • 内部错误

这个时候,该 Pod 将被放回到 待调度队列,并等待下次重试。

扩展点(Extension Points)

参考:https://kubernetes.io/docs/concepts/scheduling-eviction/scheduling-framework/

下图展示了 Pod 的调度上下文和调度框架暴露的扩展点。在这张图中,“Filter”相当于“Predicate”“Scoring”相当于“Priority function”

一个插件可以在多个扩展点注册以执行更复杂或有状态的任务。

scheduling framework extensions

  1. QueueSort 扩展用于对 Pod 的待调度队列进行排序,以决定先调度哪个 Pod,QueueSort 扩展本质上只需要实现一个方法 Less(Pod1, Pod2) 用于比较两个 Pod 谁更优先获得调度即可,同一时间点只能有一个 QueueSort 插件生效。

  2. Pre-filter 扩展用于对 Pod 的信息进行预处理,或者检查一些集群或 Pod 必须满足的前提条件,如果 pre-filter 返回了 error,则调度过程终止。

    PreFilter 在每个调度周期中调用一次。这些插件用于预处理有关 pod 的信息,或检查集群、pod 必须满足的某些条件。一个前置过滤器插件应该实现一个PreFilter 接口,如果前置过滤器返回一个错误,则调度周期被中止。

    Pre-filter 插件可以实现可选的 PreFilterExtensions 接口,该接口定义 AddPod 和 RemovePod 方法以增量修改其预处理信息。框架保证这些函数只会在 PreFilter 之后调用,在克隆的 CycleState 上,并且在调用特定节点上的 Filter 之前可能会多次调用这些函数。

  3. Filter 扩展用于排除那些不能运行该 Pod 的节点,对于每一个节点,调度器将按顺序执行 filter 扩展;如果任何一个 filter 将节点标记为不可选,则余下的 filter 扩展将不会被执行。调度器可以同时对多个节点执行 filter 扩展。

  4. Post-filter **在Filter阶段之后被调用,仅在没有为 pod 找到可行节点时调用。**插件按其配置的顺序调用。如果任何 PostFilter 插件将该节点标记为可调度,则不会调用其余插件。一个典型的 PostFilter 实现是抢占,试图通过抢占其他 Pod 来使 Pod可调度。

  5. PreScore 这些插件用于执行 “前置评分(pre-scoring)” 工作,即生成一个可共享状态供 Score 插件使用。 如果 PreScore 插件返回错误,则调度周期将终止。

  6. Score 扩展用于为所有可选节点进行打分,调度器将针对每一个节点调用 Soring 扩展,评分结果是一个范围内的整数。在 normalize scoring 阶段,调度器将会把每个 scoring 扩展对具体某个节点的评分结果和该扩展的权重合并起来,作为最终评分结果。

  7. Normalize scoring 扩展在调度器对节点进行最终排序之前修改每个节点的评分结果,注册到该扩展点的扩展在被调用时,将获得同一个插件中的 scoring 扩展的评分结果作为参数,调度框架每执行一次调度,都将调用所有插件中的一个 normalize scoring 扩展一次。

  8. Reserve 是一个通知性质的扩展点**,有状态的插件可以使用该扩展点来获得节点上为 Pod 预留的资源**,该事件发生在调度器将 Pod 绑定到节点之前,目的是避免调度器在等待 Pod 与节点绑定的过程中调度新的 Pod 到节点上时,发生实际使用资源超出可用资源的情况。(因为绑定 Pod 到节点上是异步发生的)。这是调度过程的最后一个步骤,Pod 进入 reserved 状态以后,要么在绑定失败时触发 Unreserve 扩展,要么在绑定成功时,由 Post-bind 扩展结束绑定过程。

  9. Permit 扩展用于阻止或者延迟 Pod 与节点的绑定。Permit 扩展可以做下面三件事中的一项:

    • approve(批准):当所有的 permit 扩展都 approve 了 Pod 与节点的绑定,调度器将继续执行绑定过程
    • deny(拒绝):如果任何一个 permit 扩展 deny 了 Pod 与节点的绑定,Pod 将被放回到待调度队列,此时将触发 Unreserve 扩展
    • wait(等待):如果一个 permit 扩展返回了 wait,则 Pod 将保持在 permit 阶段,直到被其他扩展 approve,如果超时事件发生,wait 状态变成 deny,Pod 将被放回到待调度队列,此时将触发 Unreserve 扩展
  1. Pre-bind 扩展用于在 Pod 绑定之前执行某些逻辑。例如,pre-bind 扩展可以将一个基于网络的数据卷挂载到节点上,以便 Pod 可以使用。如果任何一个 pre-bind 扩展返回错误,Pod 将被放回到待调度队列,此时将触发 Unreserve 扩展。Bind 扩展用于将 Pod 绑定到节点上:
    • 只有所有的 pre-bind 扩展都成功执行了,bind 扩展才会执行
    • 调度框架按照 bind 扩展注册的顺序逐个调用 bind 扩展
    • 具体某个 bind 扩展可以选择处理或者不处理该 Pod
    • 如果某个 bind 扩展处理了该 Pod 与节点的绑定,余下的 bind 扩展将被忽略
  1. Bind 用于将 Pod 绑定到节点上。直到所有的 PreBind 插件都完成,Bind 插件才会被调用。 各 Bind 插件按照配置顺序被调用。Bind 插件可以选择是否处理指定的 Pod。 如果某 Bind 插件选择处理某 Pod,剩余的 Bind 插件将被跳过
  2. Post-bind 是一个通知性质的扩展:
    • Post-bind 扩展在 Pod 成功绑定到节点上之后被动调用
    • Post-bind 扩展是绑定过程的最后一个步骤,可以用来执行资源清理的动作
  1. Unreserve 是一个通知性质的扩展,如果为 Pod 预留了资源,Pod 又在被绑定过程中被拒绝绑定,则 unreserve 扩展将被调用。Unreserve 扩展应该释放已经为 Pod 预留的节点上的计算资源。在一个插件中,reserve 扩展和 unreserve 扩展应该成对出现。

扩展点源码分析

如果我们要实现自己的插件,必须向调度框架注册插件并完成配置,另外还必须实现扩展点接口对应的扩展点接口我们可以在源码 pkg/scheduler/framework/v1alpha1/interface.go 文件中找到,如下所示:

// Plugin is the parent type for all the scheduling framework plugins.
type Plugin interface 
	Name() string


type QueueSortPlugin interface 
	Plugin
	Less(*PodInfo, *PodInfo) bool


// PreFilterPlugin is an interface that must be implemented by "prefilter" plugins.
// These plugins are called at the beginning of the scheduling cycle.
type PreFilterPlugin interface 
	Plugin
	PreFilter(pc *PluginContext, p *v1.Pod) *Status


// FilterPlugin is an interface for Filter plugins. These plugins are called at the
// filter extension point for filtering out hosts that cannot run a pod.
// This concept used to be called 'predicate' in the original scheduler.
// These plugins should return "Success", "Unschedulable" or "Error" in Status.code.
// However, the scheduler accepts other valid codes as well.
// Anything other than "Success" will lead to exclusion of the given host from
// running the pod.
type FilterPlugin interface 
	Plugin
	Filter(pc *PluginContext, pod *v1.Pod, nodeName string) *Status


// PostFilterPlugin is an interface for Post-filter plugin. Post-filter is an
// informational extension point. Plugins will be called with a list of nodes
// that passed the filtering phase. A plugin may use this data to update internal
// state or to generate logs/metrics.
type PostFilterPlugin interface 
	Plugin
	PostFilter(pc *PluginContext, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap) *Status


// ScorePlugin is an interface that must be implemented by "score" plugins to rank
// nodes that passed the filtering phase.
type ScorePlugin interface 
	Plugin
	Score(pc *PluginContext, p *v1.Pod, nodeName string) (int, *Status)


// ScoreWithNormalizePlugin is an interface that must be implemented by "score"
// plugins that also need to normalize the node scoring results produced by the same
// plugin's "Score" method.
type ScoreWithNormalizePlugin interface 
	ScorePlugin
	NormalizeScore(pc *PluginContext, p *v1.Pod, scores NodeScoreList) *Status


// ReservePlugin is an interface for Reserve plugins. These plugins are called
// at the reservation point. These are meant to update the state of the plugin.
// This concept used to be called 'assume' in the original scheduler.
// These plugins should return only Success or Error in Status.code. However,
// the scheduler accepts other valid codes as well. Anything other than Success
// will lead to rejection of the pod.
type ReservePlugin interface 
	Plugin
	Reserve(pc *PluginContext, p *v1.Pod, nodeName string) *Status


// PreBindPlugin is an interface that must be implemented by "prebind" plugins.
// These plugins are called before a pod being scheduled.
type PreBindPlugin interface 
	Plugin
	PreBind(pc *PluginContext, p *v1.Pod, nodeName string) *Status


// PostBindPlugin is an interface that must be implemented by "postbind" plugins.
// These plugins are called after a pod is successfully bound to a node.
type PostBindPlugin interface 
	Plugin
	PostBind(pc *PluginContext, p *v1.Pod, nodeName string)


// UnreservePlugin is an interface for Unreserve plugins. This is an informational
// extension point. If a pod was reserved and then rejected in a later phase, then
// un-reserve plugins will be notified. Un-reserve plugins should clean up state
// associated with the reserved Pod.
type UnreservePlugin interface 
	Plugin
	Unreserve(pc *PluginContext, p *v1.Pod, nodeName string)


// PermitPlugin is an interface that must be implemented by "permit" plugins.
// These plugins are called before a pod is bound to a node.
type PermitPlugin interface 
	Plugin
	Permit(pc *PluginContext, p *v1.Pod, nodeName string) (*Status, time.Duration)


// BindPlugin is an interface that must be implemented by "bind" plugins. Bind
// plugins are used to bind a pod to a Node.
type BindPlugin interface 
	Plugin
	Bind(pc *PluginContext, p *v1.Pod, nodeName string) *Status

调度框架插件的启用或者禁用

对于调度框架插件的启用或者禁用,我们同样可以使用KubeSchedulerConfiguration](https://godoc.org/k8s.io/kubernetes/pkg/scheduler/apis/config#KubeSchedulerConfiguration) 资源对象来进行配置。

我们可以通过查看官方文档,可以通过 --config 参数指定调度器将使用哪些参数,该配置文件应该包含一个 KubeSchedulerConfiguration 对象,如下所示格式:(/etc/kubernetes/scheduler-extender.yaml)

 # 通过"--config" 传递文件内容
apiVersion: kubescheduler.config.k8s.io/v1alpha1
kind: KubeSchedulerConfiguration
clientConnection:
  kubeconfig: "/etc/kubernetes/scheduler.conf"
algorithmSource:
  policy:
    file:
      path: "/etc/kubernetes/scheduler-extender-policy.yaml"  # 指定自定义调度策略文件

我们在这里应该输入的关键参数是 algorithmSource.policy,这个策略文件可以是本地文件也可以是 ConfigMap 资源对象,这取决于调度程序的部署方式,比如我们这里默认的调度器是静态 Pod 方式启动的,所以我们可以用本地文件的形式来配置。

下面的例子中的配置启用了一个实现了 reservepreBind 扩展点的插件,并且禁用了另外一个插件,同时为插件 foo 提供了一些配置信息:

apiVersion: kubescheduler.config.k8s.io/v1alpha1
kind: KubeSchedulerConfiguration

...

plugins:
  reserve:
    enabled:
    - name: foo
    - name: bar
    disabled:
    - name: baz
  preBind:
    enabled:
    - name: foo
    disabled:
    - name: baz

pluginConfig:
- name: foo
  args: >
    foo插件可以解析的任意内容

扩展的调用顺序如下:

  • 如果某个扩展点没有配置对应的扩展,调度框架将使用默认插件中的扩展

  • 如果为某个扩展点配置且激活了扩展,则调度框架将先调用默认插件的扩展,再调用配置中的扩展

  • 默认插件的扩展始终被最先调用,然后按照 KubeSchedulerConfiguration 中扩展的激活 enabled 顺序逐个调用扩展点的扩展

  • 可以先禁用默认插件的扩展,然后在 enabled 列表中的某个位置激活默认插件的扩展,这种做法可以改变默认插件的扩展被调用时的顺序

假设默认插件 foo 实现了 reserve 扩展点,此时我们要添加一个插件 bar,想要在 foo 之前被调用,则应该先禁用 foo 再按照 bar foo 的顺序激活。示例配置如下所示:

apiVersion: kubescheduler.config.k8s.io/v1alpha1
kind: KubeSchedulerConfiguration

...

plugins:
  reserve:
    enabled:
    - name: bar
    - name: foo
    disabled:
    - name: foo

在源码目录 pkg/scheduler/framework/plugins/examples 中有几个示范插件,我们可以参照其实现方式。

scheduler示范插件: https://github.com/kubernetes/kubernetes/tree/master/pkg/scheduler/framework/plugins/examples

示例:基于kubernetes调度框架的自定义调度器实现

源码:https://github.com/FLY-Open-K8s/sample-scheduler-framework.git

0. Plugin API

插件 API 有两个步骤。 1. 注册和配置,2. 使用扩展点接口。扩展点接口有以下形式。

type Plugin interface 
   Name() string


type QueueSortPlugin interface 
   Plugin
   Less(*PodInfo, *PodInfo) bool


type PreFilterPlugin interface 
   Plugin
   PreFilter(CycleState, *v1.Pod) *Status


// ... 

1. 依赖问题

因为插件最终的运行方式是独立于默认的scheduler的,所以需要一个main函数作为调度器的入口。我们不需要自己完整地山寨一个scheduler,只需要引用kubernetes scheduler里的部分函数作为入口即可。

但在动手时发现一个问题,kubernetes在设计上会独立发布各个模块,并对外屏蔽部分不稳定的代码,避免主仓库作为一个整体被外部引用,所以将部分模块的依赖版本写成了v0.0.0,然后再用replace替换成了代码仓库里的相对路径,即staging目录里的独立模块。如此一来,直接使用kubernetes代码是没问题的,但是使用go get或者go mod去获取kubernetes主仓库作为依赖时会遇到诸如此类的错误:k8s.io/api@v0.0.0: reading k8s.io/api/go.mod at revision v0.0.0: unknown revision

官方kubernetes仓库并不能直接作为module使用, 所以无法通过go get或是go mod download下载其中的package, 只有某些已发布的子组件可以(应该是指k8s.io/api, k8s.io/kube-controller-manager这种吧).

解决方法是用shell脚本,指定一个kubernetes版本,一方面直接下载官方仓库里被写成v0.0.0版本的各个模块的该版本的代码到本地作为依赖,另一方面修改go mod,将依赖replace成指定的版本,这样版本的需求和供给即实现了匹配。

该shell脚本仓库路径为:hack/get-k8s-as-dep.sh

在项目中执行以下脚本:sh hack/get-k8s-as-dep.sh v1.18.6, 注意替换你需要的版本。这个脚本通过修改go.mod文件保证能够获取相关依赖。

2. 自定义调度器-入口函数

其实要实现一个调度框架的插件,并不难,我们只要实现对应的扩展点,然后将插件注册到调度器中即可,下面是默认调度器在初始化的时候注册的插件

func NewRegistry() Registry 
	return Registry
		// FactoryMap:
		// New plugins are registered here.
		// example:
		// 
		//  stateful_plugin.Name: stateful.NewStatefulMultipointExample,
		//  fooplugin.Name: fooplugin.New,
		// 
	

但是可以看到默认并没有注册一些插件,所以要想让调度器能够识别我们的插件代码,就需要自己来实现一个调度器了,当然这个调度器我们完全没必要完全自己实现,直接调用默认的调度器,然后在上面的 **NewRegistry()** 函数中将我们的插件注册进去即可。

kube-scheduler 的源码文件 kubernetes/cmd/kube-scheduler/app/server.go 中有一个 NewSchedulerCommand 入口函数,其中的参数是一个类型为 Option 的列表,而这个 Option 恰好就是一个插件配置的定义:

// Option configures a framework.Registry.
type Option func(framework.Registry) error

// NewSchedulerCommand creates a *cobra.Command object with default parameters and registryOptions
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command 
  ......

所以我们完全就可以直接调用这个函数来作为我们的函数入口,并且传入我们自己实现的插件作为参数即可,而且该文件下面还有一个名为 WithPlugin 的函数可以来创建一个 Option 实例:

// WithPlugin creates an Option based on plugin name and factory.
func WithPlugin(name string, factory framework.PluginFactory) Option 
	return func(registry framework.Registry) error 
		return registry.Register(name, factory)
	

所以最终我们的入口函数如下所示:

func main() 
	rand.Seed(time.Now().UTC().UnixNano())

	command := app.NewSchedulerCommand(
		app.WithPlugin(sample.Name, sample.New), 
	)

	logs.InitLogs()
	defer logs.FlushLogs()

	if err := command.Execute(); err != nil 
		_, _ = fmt.Fprintf(os.Stderr, "%v\\n", err)
		os.Exit(1)
	


其中 app.WithPlugin(sample.Name, sample.New) 就是我们接下来要实现的插件,从 WithPlugin 函数的参数也可以看出我们这里的 sample.New 必须是一个 framework.PluginFactory 类型的值,而 PluginFactory 的定义就是一个函数:

type PluginFactory = func(configuration *runtime.Unknown, f FrameworkHandle) (Plugin, error)

所以 sample.New 实际上就是上面的这个函数,在这个函数中我们可以获取到插件中的一些数据然后进行逻辑处理即可,插件实现如下所示,我们这里只是简单获取下数据打印日志,如果你有实际需求的可以根据获取的数据就行处理即可,我们这里只是实现了 **PreFilter****Filter****PreBind** 三个扩展点,其他的可以用同样的方式来扩展即可:

3. 插件实现

// 插件名称
const Name = "sample-plugin"

type Args struct 
	FavoriteColor  string `json:"favorite_color,omitempty"`
	FavoriteNumber int    `json:"favorite_number,omitempty"`
	ThanksTo       string `json:"thanks_to,omitempty"`


type Sample struct 
	args   *Args
	handle framework.FrameworkHandle


func (s *Sample) Name() string 
	return Name


func (s *Sample) PreFilter(pc *framework.PluginContext, pod *v1.Pod) *framework.Status 
	klog.V(3).Infof("prefilter pod: %v", pod.Name)
	return framework.NewStatus(framework.Success, "")


func (s *Sample) Filter(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status 
	klog.V(3).Infof("filter pod: %v, node: %v", pod.Name, nodeName)
	return framework.NewStatus(framework.Success, "")


func (s *Sample) PreBind(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status 
	if nodeInfo, ok := s.handle.NodeInfoSnapshot().NodeInfoMap[nodeName]; !ok 
		return framework.NewStatus(framework.Error, fmt.Sprintf("prebind get node info error: %+v", nodeName))
	 else 
		klog.V(3).Infof("prebind node info: %+v", nodeInfo.Node())
		return framework.NewStatus(framework.Success, "")
	


//type PluginFactory = func(configuration *runtime.Unknown, f FrameworkHandle) (Plugin, error)
func New(configuration *runtime.Unknown, f framework.FrameworkHandle) (framework.Plugin, error) 
	args := &Args
	if err := framework.DecodeInto(configuration, args); err != nil 
		return nil, err
	
	klog.V(3).Infof("get plugin config args: %+v", args)
	return &Sample
		args: args,
		handle: f,
	, nil

完整代码可以前往仓库 https://github.com/cnych/sample-scheduler-framework 获取。

4. 编译打包成镜像

实现完成后,编译打包成镜像即可,然后我们就可以当成普通的应用用一个 Deployment 控制器来部署即可,由于我们需要去获取集群中的一些资源对象,所以当然需要申请 RBAC 权限,然后同样通过 --config 参数来配置我们的调度器,同样还是使用一个 KubeSchedulerConfiguration 资源对象配置,可以通过 plugins 来启用或者禁用我们实现的插件,也可以通过 pluginConfig 来传递一些参数值给插件:

1. 创建ClusterRole–sample-scheduler-clusterrole

2. 创建ServiceAccount–sample-scheduler-sa

3. 创建ClusterRoleBinding–ServiceAccount绑定 名为sample-scheduler-clusterrole的ClusterRole

4. 创建ConfigMap–scheduler-config.yaml 指定插件sample-plugin分别在PreFilter、Filter、PreBind 三个扩展点的启用

5. 部署自定义插件(Deployment方式)

# 1. 创建ClusterRole--sample-scheduler-clusterrole
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: sample-scheduler-clusterrole
rules:
  - apiGroups:
      - ""
    resources:
      - endpoints
      - events
    verbs:
      - create
      - get
      - update
  - apiGroups:
      - 基于scrapy源码实现的自定义微型异步爬虫框架

从零开始入门 K8s | 调度器的调度流程和算法介绍

Kubernetes 调度器调度策略配置修改

Kubernetes 跨集群流量调度实战 :访问控制

Kubernetes入门至精通 | 调度器

作业帮 Kubernetes 原生调度器优化实践