Kubernetes Node Controller源码分析之Taint Controller

Posted WaltonWang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kubernetes Node Controller源码分析之Taint Controller相关的知识,希望对你有一定的参考价值。

Author: xidianwangtao@gmail.com

摘要:我认为,Node Controller是Kubernetes几十个Controller中最为重要的Controller之一,其重要程度在Top3,然而这可能也是最为复杂的一个Controller,因此对其的源码分析,我将做一个系列文章,希望能帮助自己有一个深入的理解。本博文主要对Node Controller触发的Taint Controller进行源码分析,看看NoExecuteTaintManager是如何进行Taint Pod Evictions的。

NewNoExecuteTaintManager

Kubernetes Node Controller源码分析之创建篇中提到:

  • PodInformer添加Event Handler时,通过调用taintManager.PodUpdated(oldPod *v1.Pod, newPod *v1.Pod)往tc.podUpdateQueue添加updateItem。
  • NodeInformer添加Event Handler时,通过调用taintManager.NodeUpdated(oldNode *v1.Node, newNode *v1.Node)往tc.nodeUpdateQueue添加updateItem。
  • 当创建NodeController时,如果runTaintManager为true(通过kube-controller-manager的–enable-taint-manager中指定,默认为true),则会通过NewNoExecuteTaintManager来实例化一个Taint Manager。
pkg/controller/node/nodecontroller.go:195

func NewNodeController(..) (*NodeController, error) 
    ...
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs
        AddFunc: func(obj interface) 
            ...
            if nc.taintManager != nil 
                nc.taintManager.PodUpdated(nil, pod)
            
        ,
        ...
    
    ...
     else 
        nodeEventHandlerFuncs = cache.ResourceEventHandlerFuncs
            AddFunc: func(originalObj interface) 
                ...
                if nc.taintManager != nil 
                    nc.taintManager.NodeUpdated(nil, node)
                
            ,
            ...
        
    
    ...
    if nc.runTaintManager 
        nc.taintManager = NewNoExecuteTaintManager(kubeClient)
    

    ...

    return nc, nil

因此,创建NodeController时已经配置了监听pod和node的事件,并会将相关数据发送到tc.podUpdateQueue和tc.nodeUpdateQueue,然后由Taint Manager从中取出数据进行处理。在此之前,我们先来看看NewNoExecuteTaintManager是如何实例化一个Taint Manager的。

pkg/controller/node/taint_controller.go:152

func NewNoExecuteTaintManager(c clientset.Interface) *NoExecuteTaintManager 
    ...

    tm := &NoExecuteTaintManager
        client:            c,
        recorder:          recorder,

        // taintedNodes记录每个Node对应的Taint信息。
        taintedNodes:      make(map[string][]v1.Taint),

        // nodeUpdateQueue中取出的updateItem会发送到nodeUpdateChannel,Tait Manager从该Channel中取出对应的node update info。
        nodeUpdateChannel: make(chan *nodeUpdateItem, nodeUpdateChannelSize),

        // podUpdateQueue中取出的updateItem会发送到podUpdateChannel,Tait Manager从该Channel中取出对应的pod update info。
        podUpdateChannel:  make(chan *podUpdateItem, podUpdateChannelSize),

        // Node Controller监听到的node update info会发送到nodeUpdateQueue。
        nodeUpdateQueue: workqueue.New(),

        // Node Controller监听到的pod update info会发送到podUpdateQueue。
        podUpdateQueue:  workqueue.New(),
    

    // CreateWorkerQueue creates a new TimedWorkerQueue for workers that will execute deletePodHandler.
    tm.taintEvictionQueue = CreateWorkerQueue(deletePodHandler(c, tm.emitPodDeletionEvent))

    return tm

相关的代码分析见里面的代码注释。需要强调的是,我们在这里给tm.taintEvictionQueue注册了函数deletePodHandler,用来通过Taint Eviction时删除pod时调用。Taint Manager Run的时候会通过tc.taintEvictionQueue.AddWork()时创建Worker来执行deletePodHandler

func deletePodHandler(c clientset.Interface, emitEventFunc func(types.NamespacedName)) func(args *WorkArgs) error 
    return func(args *WorkArgs) error 
        ns := args.NamespacedName.Namespace
        name := args.NamespacedName.Name
        glog.V(0).Infof("NoExecuteTaintManager is deleting Pod: %v", args.NamespacedName.String())
        if emitEventFunc != nil 
            emitEventFunc(args.NamespacedName)
        
        var err error

        // 按照失败重试5次,每次间隔10s的重试机制,调用apiserver的api删除对应的Pod。
        for i := 0; i < retries; i++ 
            err = c.Core().Pods(ns).Delete(name, &metav1.DeleteOptions)
            if err == nil 
                break
            
            time.Sleep(10 * time.Millisecond)
        
        return err
    

Run

Kubernetes Node Controller源码分析之执行篇中提到,在Node Controller Run的时候,如果runTaintManager为true,则会调用nc.taintManager.Run启动Taint Manager loop。

pkg/controller/node/nodecontroller.go:550

func (nc *NodeController) Run() 
    go func() 
        ...

        if nc.runTaintManager 
            go nc.taintManager.Run(wait.NeverStop)
        

        ...
    ()

接下来,我们来看Taint Manager的Run方法。Node Controller启动的Taint Manager实例其实就是NoExecuteTaintManager,其对应的Run方法代码如下。

pkg/controller/node/taint_controller.go:179

// Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed.
func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct) 
    glog.V(0).Infof("Starting NoExecuteTaintManager")

    // Functions that are responsible for taking work items out of the workqueues and putting them into channels.
    // 从tc.nodeUpdateQueue中获取updateItem,并发送到tc.nodeUpdateChannel。
    go func(stopCh <-chan struct) 
        for 
            item, shutdown := tc.nodeUpdateQueue.Get()
            if shutdown 
                break
            
            nodeUpdate := item.(*nodeUpdateItem)
            select 
            case <-stopCh:
                break
            case tc.nodeUpdateChannel <- nodeUpdate:
            
        
    (stopCh)

    // 从tc.podUpdateQueue中获取updateItem,并发送到tc.podUpdateChannel。
    go func(stopCh <-chan struct) 
        for 
            item, shutdown := tc.podUpdateQueue.Get()
            if shutdown 
                break
            
            podUpdate := item.(*podUpdateItem)
            select 
            case <-stopCh:
                break
            case tc.podUpdateChannel <- podUpdate:
            
        
    (stopCh)

    // When processing events we want to prioritize Node updates over Pod updates,
    // as NodeUpdates that interest NoExecuteTaintManager should be handled as soon as possible -
    // we don't want user (or system) to wait until PodUpdate queue is drained before it can
    // start evicting Pods from tainted Nodes.
    for 
        select 
        case <-stopCh:
            break

        // 从tc.nodeUpdateChannel获取nodeUpdate数据,然后invoke tc.handleNodeUpdate进行处理。
        case nodeUpdate := <-tc.nodeUpdateChannel:
            tc.handleNodeUpdate(nodeUpdate)

        // 从tc.podUpdateChannel获取podUpdate数据,在invoke tc.handlePodUpdate进行处理之前,先确保tc.nodeUpdateQueue中的数据已经被处理完。
        case podUpdate := <-tc.podUpdateChannel:

        // If we found a Pod update we need to empty Node queue first.
        priority:
            for 
                select 
                case nodeUpdate := <-tc.nodeUpdateChannel:
                    tc.handleNodeUpdate(nodeUpdate)
                default:
                    break priority
                
            

            // After Node queue is emptied we process podUpdate.
            tc.handlePodUpdate(podUpdate)
        
    

可见, Run方法中分别从对应的queue中取出数据,然后调用tc.handleNodeUpdatetc.handlePodUpdate进行处理。

// pkg/controller/node/taint_controller.go:365

func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate *nodeUpdateItem) 
    // Delete
    // 如果nodeUpdate.newNode == nil,则表明该Node被删除了,那么将该Node的Taints信息从tc.taintedNodes缓存中删除。
    if nodeUpdate.newNode == nil 
        node := nodeUpdate.oldNode
        glog.V(4).Infof("Noticed node deletion: %#v", node.Name)
        tc.taintedNodesLock.Lock()
        defer tc.taintedNodesLock.Unlock()
        delete(tc.taintedNodes, node.Name)
        return
    

    // Create or Update
    // 如果是Node Create或者Node Update Event,则更新tc.taintedNodes缓存中记录的该Node的Taints信息。
    glog.V(4).Infof("Noticed node update: %#v", nodeUpdate)
    node := nodeUpdate.newNode
    taints := nodeUpdate.newTaints
    func() 
        tc.taintedNodesLock.Lock()
        defer tc.taintedNodesLock.Unlock()
        glog.V(4).Infof("Updating known taints on node %v: %v", node.Name, taints)
        if len(taints) == 0 
            delete(tc.taintedNodes, node.Name)
         else 
            tc.taintedNodes[node.Name] = taints
        
    ()

    // 然后,获取该Node上所有pods list。
    pods, err := getPodsAssignedToNode(tc.client, node.Name)
    if err != nil 
        glog.Errorf(err.Error())
        return
    
    if len(pods) == 0 
        return
    


    // Short circuit, to make this controller a bit faster.
    // 如果该Node上的Taints被删除了,则取消所有该node上的pod evictions。
    if len(taints) == 0 
        glog.V(4).Infof("All taints were removed from the Node %v. Cancelling all evictions...", node.Name)
        for i := range pods 
            tc.cancelWorkWithEvent(types.NamespacedNameNamespace: pods[i].Namespace, Name: pods[i].Name)
        
        return
    

    // 否则,调用tc.processPodOnNode根据Node Taints info和Pod Tolerations info处理该Node上的Pod Eviction。
    now := time.Now()
    for i := range pods 
        pod := &pods[i]
        podNamespacedName := types.NamespacedNameNamespace: pod.Namespace, Name: pod.Name
        tc.processPodOnNode(podNamespacedName, node.Name, pod.Spec.Tolerations, taints, now)
    

handleNodeUpdate的逻辑为:

  • 如果nodeUpdate.newNode == nil,则表明该Node被删除了,那么将该Node的Taints信息从tc.taintedNodes缓存中删除。
  • 如果是Node Create或者Node Update Event,则更新tc.taintedNodes缓存中记录的该Node的Taints信息。
    • 获取该Node上所有pods list。
    • 如果该Node上的Taints被删除了,则取消所有该node上的pod evictions。
    • 否则,遍历pods list中的每个pod,分别调用tc.processPodOnNode根据Node Taints info和Pod Tolerations info处理该Node上的Pod Eviction。
// pkg/controller/node/taint_controller.go:334

func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate *podUpdateItem) 
    // Delete
    // 如果podUpdate.newPod == nil,则表明该Pod被删除了,那么取消该Pod Evictions。
    if podUpdate.newPod == nil 
        pod := podUpdate.oldPod
        podNamespacedName := types.NamespacedNameNamespace: pod.Namespace, Name: pod.Name
        glog.V(4).Infof("Noticed pod deletion: %#v", podNamespacedName)
        tc.cancelWorkWithEvent(podNamespacedName)
        return
    

    // Create or Update
    // 如果是Pod Create或者Pod Update Event,则取出该pod的node上的Taints info。
    pod := podUpdate.newPod
    podNamespacedName := types.NamespacedNameNamespace: pod.Namespace, Name: pod.Name
    glog.V(4).Infof("Noticed pod update: %#v", podNamespacedName)
    nodeName := pod.Spec.NodeName
    if nodeName == "" 
        return
    
    taints, ok := func() ([]v1.Taint, bool) 
        tc.taintedNodesLock.Lock()
        defer tc.taintedNodesLock.Unlock()
        taints, ok := tc.taintedNodes[nodeName]
        return taints, ok
    ()
    // It's possible that Node was deleted, or Taints were removed before, which triggered
    // eviction cancelling if it was needed.
    if !ok 
        return
    

    // 然后,调用tc.processPodOnNode根据Node Taints info和Pod Tolerations info处理该Node上的Pod Eviction。
    tc.processPodOnNode(podNamespacedName, nodeName, podUpdate.newTolerations, taints, time.Now())

handlePodUpdate的逻辑为:

  • 如果podUpdate.newPod == nil,则表明该Pod被删除了,那么取消该Pod Evictions。
  • 如果是Pod Create或者Pod Update Event,则取出该pod的node上的Taints info。
    • 如果node上的Taints info信息为空,表明Taints info被删除了或者Node被删除了,那么就不需要处理该node上的pod eviction了,流程结束。
    • 否则,调用tc.processPodOnNode根据Node Taints info和Pod Tolerations info处理该Node上的Pod Eviction。

因此,不管是handlePodUpdate还是handleNodeUpdate,最终都是通过processPodOnNode来处理Pod Eviction的。

pkg/controller/node/taint_controller.go:295

func (tc *NoExecuteTaintManager) processPodOnNode(
    podNamespacedName types.NamespacedName,
    nodeName string,
    tolerations []v1.Toleration,
    taints []v1.Taint,
    now time.Time,
) 

    // 如果该node的taints info为空,则取消Taint Eviction Pods。
    if len(taints) == 0 
        tc.cancelWorkWithEvent(podNamespacedName)
    

    // 对比node的taints info和pod tolerations info,判断出node的taints是否都能被pod所能容忍。
    allTolerated, usedTolerations := v1.GetMatchingTolerations(taints, tolerations)

    // 如果不是全部都能容忍,那么调用立刻调用AddWork来创建worker,启动tc.taintEvictionQueue注册的deletePodHandler来删除该pod。
    if !allTolerated 
        glog.V(2).Infof("Not all taints are tolerated after update for Pod %v on %v", podNamespacedName.String(), nodeName)
        // We're canceling scheduled work (if any), as we're going to delete the Pod right away.
        tc.cancelWorkWithEvent(podNamespacedName)
        tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), time.Now(), time.Now())
        return
    

    // 否则,取pod的所有tolerations的TolerationSeconds的最小值作为minTolerationTime。如果某个Toleration没有设置TolerationSeconds,则表示0,如果设置的值为负数,则用0替代。
    minTolerationTime := getMinTolerationTime(usedTolerations)
    // getMinTolerationTime returns negative value to denote infinite toleration.
    if minTolerationTime < 0 
        glog.V(4).Infof("New tolerations for %v tolerate forever. Scheduled deletion won't be cancelled if already scheduled.", podNamespacedName.String())
        return
    

    startTime := now
    triggerTime := startTime.Add(minTolerationTime)

    // 从tc.taintEvictionQueue中获取Worker-scheduledEviction
    scheduledEviction := tc.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String())

    // 如果获取到不为空的scheduledEviction,则判断worker创建时间加上minTolerationTime是否达到触发时间要求,如果没达到,则不进行Taint Pod Eviction,流程结束。
    if scheduledEviction != nil 
        startTime = scheduledEviction.CreatedAt
        if startTime.Add(minTolerationTime).Before(triggerTime) 
            return
         else 
            tc.cancelWorkWithEvent(podNamespacedName)
        
    

    // 如果达到触发时间要求,则取消worker,并立刻调用AddWork来创建worker,启动tc.taintEvictionQueue注册的deletePodHandler来删除该pod。
    tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime)

processPodOnNode的逻辑为:

  • 如果该node的taints info为空,则取消Taint Eviction Pods。
  • 对比node的taints info和pod tolerations info,判断出node的taints是否都能被pod所能容忍。
  • 如果不是全部都能容忍,那么调用立刻调用AddWork来创建worker,启动tc.taintEvictionQueue注册的deletePodHandler来删除该pod。
  • 否则,取pod的所有tolerations的TolerationSeconds的最小值作为minTolerationTime。如果某个Toleration没有设置TolerationSeconds,则表示0。
    • 如果minTolerationTime小于0,则永远容忍,流程结束。
    • 从tc.taintEvictionQueue中获取Worker-scheduledEviction。
      • 如果获取到不为空的scheduledEviction,则判断worker创建时间加上minTolerationTime是否达到触发时间要求,如果没达到,则不进行Taint Pod Eviction,流程结束。
      • 如果达到触发时间要求,则取消worker,并立刻调用AddWork来创建worker,启动tc.taintEvictionQueue注册的deletePodHandler来删除该pod。

以上是关于Kubernetes Node Controller源码分析之Taint Controller的主要内容,如果未能解决你的问题,请参考以下文章

Kubernetes-集群扩容增加node节点

Kubernetes命令kubectl 在Node节点上的使用

浅谈kubernetes:master节点和node节点

Node.js & Kubernetes Graceful Shutdown

Kubernetes容器集群管理环境 - Node节点的移除与加入

Kubernetes集群Node管理