Kubernetes Node Controller源码分析之执行篇

Posted WaltonWang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kubernetes Node Controller源码分析之执行篇相关的知识,希望对你有一定的参考价值。

Author: xidianwangtao@gmail.com

摘要:我认为,Node Controller是Kubernetes几十个Controller中最为重要的Controller之一,其重要程度在Top3,然而这可能也是最为复杂的一个Controller,因此对其的源码分析,我将做一个系列文章,希望能帮助自己有一个深入浅出的理解。本博文从NodeController的Run方法作为入口,对其工作原理作了跟踪分析。

Node Controller的执行

Node Controller的Run方法如下,这是所有Node Controller真正处理逻辑的入口。

pkg/controller/node/nodecontroller.go:550

// Run starts an asynchronous loop that monitors the status of cluster nodes.
func (nc *NodeController) Run() 
    go func() 
        defer utilruntime.HandleCrash()

        if !cache.WaitForCacheSync(wait.NeverStop, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) 
            utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
            return
        

        // Incorporate the results of node status pushed from kubelet to master.
        go wait.Until(func() 
            if err := nc.monitorNodeStatus(); err != nil 
                glog.Errorf("Error monitoring node status: %v", err)
            
        , nc.nodeMonitorPeriod, wait.NeverStop)

        if nc.runTaintManager 
            go nc.taintManager.Run(wait.NeverStop)
        

        if nc.useTaintBasedEvictions 
            // Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
            // taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
            go wait.Until(nc.doTaintingPass, nodeEvictionPeriod, wait.NeverStop)
         else 
            // Managing eviction of nodes:
            // When we delete pods off a node, if the node was not empty at the time we then
            // queue an eviction watcher. If we hit an error, retry deletion.
            go wait.Until(nc.doEvictionPass, nodeEvictionPeriod, wait.NeverStop)
        
    ()

WaitForCacheSync(wait.NeverStop, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced)

  • Node Controller首先调用WaitForCacheSync,等待PodInformer、NodeInformer、DaemonSetInformer的HasSyncs都返回true,即这三个API Object都完成同步。
vendor/k8s.io/client-go/tools/cache/shared_informer.go:100

// WaitForCacheSync waits for caches to populate.  It returns true if it was successful, false
// if the contoller should shutdown
func WaitForCacheSync(stopCh <-chan struct, cacheSyncs ...InformerSynced) bool 

    // 每隔100ms遍历一次cacheSyncs中的InformerSynced方法,
    // 当所有要求的cacheSyncs方法都返回true,
    // 意味着所有要求的cache都已经同步后,则WaitForCacheSync返回true,
    // 否则继续遍历。
    err := wait.PollUntil(syncedPollPeriod,
        func() (bool, error) 
            for _, syncFunc := range cacheSyncs 
                if !syncFunc() 
                    return false, nil
                
            
            return true, nil
        ,
        stopCh)
    if err != nil 
        glog.V(2).Infof("stop requested")
        return false
    

    glog.V(4).Infof("caches populated")
    return true

WaitForCacheSync的实现逻辑是:

  • 每隔100ms遍历一次cacheSyncs中的InformerSynced方法,当所有要求的cacheSyncs方法都返回true,意味着所有要求的cache都已经同步后,则WaitForCacheSync返回true,

  • 否则按照100ms的周期继续遍历,知道返回true或者受到stop信号为止。

启动goruntime按照5s的周期执行monitorNodeStatus,进行Node状态监控

pkg/controller/node/nodecontroller.go:586

// monitorNodeStatus verifies node status are constantly updated by kubelet, and if not,
// post "NodeReady==ConditionUnknown". It also evicts all pods if node is not ready or
// not reachable for a long period of time.
func (nc *NodeController) monitorNodeStatus() error 

    // We are listing nodes from local cache as we can tolerate some small delays
    // comparing to state from etcd and there is eventual consistency anyway.
    nodes, err := nc.nodeLister.List(labels.Everything())
    if err != nil 
        return err
    

    // 对比knownNodeSet和nodes数据,得到对应的added和deleted Node列表
    added, deleted := nc.checkForNodeAddedDeleted(nodes)

    // 遍历added Node列表,表示Node Controller观察到一个新的Node加入集群
    for i := range added 
        ...

        // 将added node添加到knowNodeSet中
        nc.knownNodeSet[added[i].Name] = added[i]

        // When adding new Nodes we need to check if new zone appeared, and if so add new evictor.
        zone := utilnode.GetZoneKey(added[i])
        if _, found := nc.zoneStates[zone]; !found 

           // 设置该Node对应的新zone状态为“Initial”
            nc.zoneStates[zone] = stateInitial

            // 如果Node Controller的useTaintBasedEvictions为false(--feature-gates中指定,默认TaintBasedEvictions=false),
            // 则添加该zone对应的zonePodEvictor,并设置evictionLimiterQPS(--node-eviction-rate设置,默认为0.1if !nc.useTaintBasedEvictions 
                nc.zonePodEvictor[zone] =
                    NewRateLimitedTimedQueue(
                        flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, evictionRateLimiterBurst))
             else 

                // 如果Node Controller的useTaintBasedEvictions为true,
                // 则添加该zone对应的zoneNotReadyOrUnreachableTainer,并设置evictionLimiterQPS
                nc.zoneNotReadyOrUnreachableTainer[zone] =
                    NewRateLimitedTimedQueue(
                        flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, evictionRateLimiterBurst))
            
            ...
        

        // 如果Node Controller的useTaintBasedEvictions为true,调用RemoveTaintOffNode将Node上对应的Taints(node.alpha.kubernetes.io/notReady和node.alpha.kubernetes.io/unreachable)清除掉,
        // 并将其从zoneNotReadyOrUnreachableTainer Queue中Remove(如果它在这个Queue中)
        if nc.useTaintBasedEvictions 
            nc.markNodeAsHealthy(added[i])
         else 

        // 如果Node Controller的useTaintBasedEvictions为false,即使用zonePodEvictor时,
        // 将该node从对应的zonePodEvictor Queue中Remove
            nc.cancelPodEviction(added[i])
        
    

    // 遍历deleted Nodes列表,将其从knowNodeSet中删除
    for i := range deleted 
       ...
        delete(nc.knownNodeSet, deleted[i].Name)
    


    zoneToNodeConditions := map[string][]*v1.NodeCondition
    for i := range nodes 
        ...
        // PollImmediate tries a condition func until it returns true, an error, or the timeout is reached.
        // retrySleepTime为20ms,timeout为100ms。
        if err := wait.PollImmediate(retrySleepTime, retrySleepTime*nodeStatusUpdateRetry, func() (bool, error) 

          // nc.tryUpdateNodeStatus - For a given node checks its conditions and tries to update it. 
          // Returns grace period to which given node is entitled, state of current and last observed Ready Condition, and an error if it occurred.
            gracePeriod, observedReadyCondition, currentReadyCondition, err = nc.tryUpdateNodeStatus(node)
            if err == nil 
                return true, nil
            
            ...
        ); 
        ...
        

        // 对于非master节点,将node对应的NodeCondition添加到zoneToNodeConditions Map中。
        // We do not treat a master node as a part of the cluster for network disruption checking.
        if !system.IsMasterNode(node.Name) 
            zoneToNodeConditions[utilnode.GetZoneKey(node)] = append(zoneToNodeConditions[utilnode.GetZoneKey(node)], currentReadyCondition)
        


        decisionTimestamp := nc.now()
        if currentReadyCondition != nil 

            // 当观察到Node的Condition为NotReady时,根据是否useTaintBasedEvictions是否为true,分别进行处理
            // Check eviction timeout against decisionTimestamp
            if observedReadyCondition.Status == v1.ConditionFalse 

                 // useTaintBasedEvictions为true时,
                if nc.useTaintBasedEvictions 

                    // 如果该node的已经被Taint为UnreachableTaint,则将其改成NotReadyTaint
                    // We want to update the taint straight away if Node is already tainted with the UnreachableTaint
                    if v1.TaintExists(node.Spec.Taints, UnreachableTaintTemplate) 
                        taintToAdd := *NotReadyTaintTemplate
                        if !swapNodeControllerTaint(nc.kubeClient, &taintToAdd, UnreachableTaintTemplate, node) 
                            ...
                        

                    // 将node加入到Tainer Queue中,交给Taint Controller处理
                     else if nc.markNodeForTainting(node) 
                        ...
                    

                // 如果useTaintBasedEvictions为false时,表示使用Pod Eivict方式。
                 else 

                    // 注意保证readyTransitionTimestamp + podEvictionTimeout(default 5min) > decisionTimestamp(当前时间)
                    if decisionTimestamp.After(nc.nodeStatusMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) 

                       // 将node加入到PodEvictor Queue中,交给PodEvictor处理
                        if nc.evictPods(node) 
                            ...
                        
                    
                
            

            // 同理地,当观察到Node的Condition为Unknown时,根据是否useTaintBasedEvictions是否为true,分别进行处理
            if observedReadyCondition.Status == v1.ConditionUnknown 

                //  useTaintBasedEvictions为true时,
                if nc.useTaintBasedEvictions 

                    // 如果该node的已经被Taint为UnreachableTaint,则将其改成NotReadyTaint
                    // We want to update the taint straight away if Node is already tainted with the UnreachableTaint
                    if v1.TaintExists(node.Spec.Taints, NotReadyTaintTemplate) 
                        taintToAdd := *UnreachableTaintTemplate
                        if !swapNodeControllerTaint(nc.kubeClient, &taintToAdd, NotReadyTaintTemplate, node) 
                        ...
                        

                    // 将node加入到Tainer Queue中,交给Taint Controller处理
                     else if nc.markNodeForTainting(node) 
                        ...
                    

                // 如果useTaintBasedEvictions为false时,表示使用Pod Eivict方式。
                 else 

                    // 注意保证probeTimestamp + podEvictionTimeout(default 5min) > decisionTimestamp(当前时间)
                    if decisionTimestamp.After(nc.nodeStatusMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout)) 

                        // 将node加入到PodEvictor Queue中,交给PodEvictor处理
                        if nc.evictPods(node) 
                            ...
                        
                    
                
            

            // 同理地,当观察到Node的Condition为True时,根据是否useTaintBasedEvictions是否为true,分别进行处理
            if observedReadyCondition.Status == v1.ConditionTrue 

             // useTaintBasedEvictions为true时
                if nc.useTaintBasedEvictions 

                    // 并将其从zoneNotReadyOrUnreachableTainer Queue中Remove(如果它在这个Queue中)
                    removed, err := nc.markNodeAsHealthy(node)
                    ...
                 else 

                    // useTaintBasedEvictions为false时,将该node从对应的zonePodEvictor Queue中Remove
                    if nc.cancelPodEviction(node) 
                        ...
                    
                
            

          // 如果Node Status状态从Ready变为NotReady,则将给Node上的所有Pod Status设置为Not Ready
            // Report node event.
            if currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue 
                recordNodeStatusChange(nc.recorder, node, "NodeNotReady")
                if err = markAllPodsNotReady(nc.kubeClient, node); err != nil 
                    utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err))
                
            


            // Check with the cloud provider to see if the node still exists. If it
            // doesn't, delete the node immediately.
            ...
        
    

    // 处理Disruption
    nc.handleDisruption(zoneToNodeConditions, nodes)

    return nil
  • 对比knownNodeSet和nodes数据,得到对应的added和deleted Node列表
  • 遍历added Node列表,表示Node Controller观察到一个新的Node加入集群
    • 将added node添加到knowNodeSet中
    • When adding new Nodes we need to check if new zone appeared, and if so add new evictor.如果是新zone,则:
    • 设置该Node对应的新zone状态为“Initial”
    • 如果Node Controller的useTaintBasedEvictions为false(–feature-gates中指定,默认TaintBasedEvictions=false),则添加该zone对应的zonePodEvictor,并设置evictionLimiterQPS(–node-eviction-rate设置,默认为0.1)
    • 如果Node Controller的useTaintBasedEvictions为true,则添加该zone对应的zoneNotReadyOrUnreachableTainer,并设置evictionLimiterQPS
    • 如果Node Controller的useTaintBasedEvictions为true,调用RemoveTaintOffNode将Node上对应的Taints(node.alpha.kubernetes.io/notReady和node.alpha.kubernetes.io/unreachable)清除掉,并将其从zoneNotReadyOrUnreachableTainer Queue中Remove(如果它在这个Queue中)
    • 如果Node Controller的useTaintBasedEvictions为false,即使用zonePodEvictor时,将该node从对应的zonePodEvictor Queue中Remove。
  • 遍历deleted Nodes列表,将其从knowNodeSet中删除
  • 遍历所有nodes,
    • 更新Node Status,得到上一次观察到的NodeCondition和当前的NodeCondition
    • 对于非master节点,将node对应的NodeCondition添加到zoneToNodeConditions Map中。
    • 当观察到Node的Condition为NotReady时,根据是否useTaintBasedEvictions是否为true,分别进行处理。
    • useTaintBasedEvictions为true时,
      • 如果该node的已经被Taint为UnreachableTaint,则将其改成NotReadyTaint
      • 将node加入到Tainer Queue中,交给Taint Controller处理
    • 如果useTaintBasedEvictions为false时,表示使用Pod Eivict方式。
      • 注意保证readyTransitionTimestamp + podEvictionTimeout(default 5min) > decisionTimestamp(当前时间)
      • 将node加入到PodEvictor Queue中,交给PodEvictor处理
    • 同理地,当观察到Node的Condition为Unknown时,根据是否useTaintBasedEvictions是否为true,分别进行处理
    • useTaintBasedEvictions为true时,
      • 如果该node的已经被Taint为UnreachableTaint,则将其改成NotReadyTaint
      • 将node加入到Tainer Queue中,交给Taint Controller处理
    • 如果useTaintBasedEvictions为false时,表示使用Pod Eivict方式。
      • 注意保证probeTimestamp + podEvictionTimeout(default 5min) > decisionTimestamp(当前时间)
      • 将node加入到PodEvictor Queue中,交给PodEvictor处理
    • 同理地,当观察到Node的Condition为True时,根据是否useTaintBasedEvictions是否为true,分别进行处理
    • useTaintBasedEvictions为true时
      • 将其从zoneNotReadyOrUnreachableTainer Queue中Remove(如果它在这个Queue中)
    • useTaintBasedEvictions为false时,将该node从对应的zonePodEvictor Queue中Remove
    • 如果Node Status状态从Ready变为NotReady,则将给Node上的所有Pod Status设置为Not Ready
  • 执行handleDisruption

下面我们接着看handleDisruption的逻辑:

pkg/controller/node/nodecontroller.go:772


func (nc *NodeController) handleDisruption(zoneToNodeConditions map[string][]*v1.NodeCondition, nodes []*v1.Node) 
    newZoneStates := map[string]zoneState

    // 根据zoneToNodeConditions的数据,判断allAreFullyDisrupted是否为true(表示基于当前观察到的zone中nodeCondition来判断出的当前cluster所有zone是否都为FullDisruption状态)
    allAreFullyDisrupted := true
    for k, v := range zoneToNodeConditions 
        ZoneSize.WithLabelValues(k).Set(float64(len(v)))
        unhealthy, newState := nc.computeZoneStateFunc(v)
        ZoneHealth.WithLabelValues(k).Set(float64(100*(len(v)-unhealthy)) / float64(len(v)))
        UnhealthyNodes.WithLabelValues(k).Set(float64(unhealthy))
        if newState != stateFullDisruption 
            allAreFullyDisrupted = false
        
        newZoneStates[k] = newState
        if _, had := nc.zoneStates[k]; !had 
            glog.Errorf("Setting initial state for unseen zone: %v", k)
            nc.zoneStates[k] = stateInitial
        
    

    // 根据zoneStates的数据,判断allWasFullyDisrupted是否为true(表示基于上一次观察到的zone中nodeCondition来判断出的上一次cluster所有zone是否都为FullDisruption状态)
    allWasFullyDisrupted := true
    for k, v := range nc.zoneStates 
        if _, have := zoneToNodeConditions[k]; !have 
            ZoneSize.WithLabelValues(k).Set(0)
            ZoneHealth.WithLabelValues(k).Set(100)
            UnhealthyNodes.WithLabelValues(k).Set(0)
            delete(nc.zoneStates, k)
            continue
        
        if v != stateFullDisruption 
            allWasFullyDisrupted = false
            break
        
    

    // At least one node was responding in previous pass or in the current pass. Semantics is as follows:
    // - if the new state is "partialDisruption" we call a user defined function that returns a new limiter to use,
    // - if the new state is "normal" we resume normal operation (go back to default limiter settings),
    // - if new state is "fullDisruption" we restore normal eviction rate,
    //   - unless all zones in the cluster are in "fullDisruption" - in that case we stop all evictions.
    if !allAreFullyDisrupted || !allWasFullyDisrupted 

       // 如果allAreFullyDisrupted为true且allWasFullyDisrupted为false,即cluster状态从非FullDisruption变成为FullDisruption时,则遍历所有nodes:
        // We're switching to full disruption mode
        if allAreFullyDisrupted 
            glog.V(0).Info("NodeController detected that all Nodes are not-Ready. Entering master disruption mode.")
            for i := range nodes 
                 // 如果useTaintBasedEvictions为true,则标记node为Healthy状态(remove taint from node,并且从Tainter Queue中Remove该node)
                if nc.useTaintBasedEvictions 
                    _, err := nc.markNodeAsHealthy(nodes[i])
                    if err != nil 
                        glog.Errorf("Failed to remove taints from Node %v", nodes[i].Name)
                    
                 else 
                    // 如果useTaintBasedEvictions为false,则取消该node上的pod eviction(通过从zone pod Evictor queue中删除该node)
                    nc.cancelPodEviction(nodes[i])
                
            

            // 设置所有zone的对应Tainter Queue或者Pod Evictor Queue的Rate Limeter为0,即表示停止所有的evictions。
            // We stop all evictions.
            for k := range nc.zoneStates 
                if nc.useTaintBasedEvictions 
                    nc.zoneNotReadyOrUnreachableTainer[k].SwapLimiter(0)
                 else 
                    nc.zonePodEvictor[k].SwapLimiter(0)
                
            

            // 更新所有zone的状态(nc.zoneStates)为FullDisruption
            for k := range nc.zoneStates 
                nc.zoneStates[k] = stateFullDisruption
            
            // All rate limiters are updated, so we can return early here.
            return
        

        //  如果allWasFullyDisrupted为true且allAreFullyDisrupted为false,即cluster状态从FullDisruption变成为非FullDisruption时,则遍历所有nodes:
        // We're exiting full disruption mode
        if allWasFullyDisrupted 
            glog.V(0).Info("NodeController detected that some Nodes are Ready. Exiting master disruption mode.")

            // When exiting disruption mode update probe timestamps on all Nodes.
            now := nc.now()
            for i := range nodes 
                v := nc.nodeStatusMap[nodes[i].Name]
                v.probeTimestamp = now
                v.readyTransitionTimestamp = now
                nc.nodeStatusMap[nodes[i].Name] = v
            


            // 根据zone size和zone state,重新设置对应的Disruption rate limiter的值。
            // We reset all rate limiters to settings appropriate for the given state.
            for k := range nc.zoneStates 
                nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newZoneStates[k])
                nc.zoneStates[k] = newZoneStates[k]
            
            return
        


        // 如果allWasFullyDisrupted为false且allAreFullyDisrupted为false,即cluster状态保持为非FullDisruption时,则根据zone size和zone state,重新设置对应的Disruption rate limiter的值。
        // We know that there's at least one not-fully disrupted so,
        // we can use default behavior for rate limiters
        for k, v := range nc.zoneStates 
            newState := newZoneStates[k]
            if v == newState 
                continue
            
            glog.V(0).Infof("NodeController detected that zone %v is now in state %v.", k, newState)
            nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newState)
            nc.zoneStates[k] = newState
        
    

因此,handleDisruption的逻辑为:

  • 根据zoneToNodeConditions的数据,判断allAreFullyDisrupted是否为true(表示基于当前观察到的zone中nodeCondition来判断出的当前cluster所有zone是否都为FullDisruption状态)
  • 根据zoneStates的数据,判断allWasFullyDisrupted是否为true(表示基于上一次观察到的zone中nodeCondition来判断出的上一次cluster所有zone是否都为FullDisruption状态)
  • 如果allAreFullyDisrupted为true且allWasFullyDisrupted为false,即cluster状态从非FullDisruption变成为FullDisruption时,表示switching to full disruption mode,则遍历所有nodes:
    • 如果useTaintBasedEvictions为true,则标记node为Healthy状态(remove taint from node,并且从Tainter Queue中Remove该node)
    • 如果useTaintBasedEvictions为false,则取消该node上的pod eviction(通过从zone pod Evictor queue中删除该node)
    • 设置所有zone的对应Tainter Queue或者Pod Evictor Queue的Rate Limeter为0,即表示停止所有的evictions。
    • 更新所有zone的状态(nc.zoneStates)为FullDisruption
  • 如果allWasFullyDisrupted为true且allAreFullyDisrupted为false,即cluster状态从FullDisruption变成为非FullDisruption时,表示 exiting disruption mode 则遍历所有nodes:
    • update probe timestamps on all Nodes.
    • 根据zone size和zone state,调用setLimiterInZone重新设置对应的Disruption rate limiter的值。
  • 如果allWasFullyDisrupted为false且allAreFullyDisrupted为false,即cluster状态保持为非FullDisruption时,则根据zone size和zone state,调用setLimiterInZone重新设置对应的Disruption rate limiter的值。

下面接着来看setLimiterInZone的逻辑,它是如何根据zone size和zone state对应到不同的rate limiter的。

pkg/controller/node/nodecontroller.go:870

func (nc *NodeController) setLimiterInZone(zone string, zoneSize int, state zoneState) 
    switch state 

    // 如果zone state为normal,则设置对应的rate limiter为evictionLimiterQPS(默认为0.1)
    case stateNormal:
        if nc.useTaintBasedEvictions 
            nc.zoneNotReadyOrUnreachableTainer[zone].SwapLimiter(nc.evictionLimiterQPS)
         else 
            nc.zonePodEvictor[zone].SwapLimiter(nc.evictionLimiterQPS)
        

    // 如果zone state为PartialDisruption,则调用nc.enterPartialDisruptionFunc来设置对应的rate limiter。
    case statePartialDisruption:
        if nc.useTaintBasedEvictions 
            nc.zoneNotReadyOrUnreachableTainer[zone].SwapLimiter(
                nc.enterPartialDisruptionFunc(zoneSize))
         else 
            nc.zonePodEvictor[zone].SwapLimiter(
                nc.enterPartialDisruptionFunc(zoneSize))
        

    // 如果zone state为FullDisruption,则调用nc.enterFullDisruptionFunc来设置对应的rate limiter。
    case stateFullDisruption:
        if nc.useTaintBasedEvictions 
            nc.zoneNotReadyOrUnreachableTainer[zone].SwapLimiter(
                nc.enterFullDisruptionFunc(zoneSize))
         else 
            nc.zonePodEvictor[zone].SwapLimiter(
                nc.enterFullDisruptionFunc(zoneSize))
        
    

nc.enterFullDisruptionFunc和nc.enterPartialDisruptionFunc是在调用NewNodeController创建Node Controller的时候赋值注册的:

pkg/controller/node/nodecontroller.go:270
func NewNodeController(...) (*NodeController, error) 
    ...
    nc.enterPartialDisruptionFunc = nc.ReducedQPSFunc
    nc.enterFullDisruptionFunc = nc.HealthyQPSFunc
    ...
    

pkg/controller/node/nodecontroller.go:1132
// Default value for cluster eviction rate - we take nodeNum for consistency with ReducedQPSFunc.
func (nc *NodeController) HealthyQPSFunc(nodeNum int) float32 
    return nc.evictionLimiterQPS


// If the cluster is large make evictions slower, if they're small stop evictions altogether.
func (nc *NodeController) ReducedQPSFunc(nodeNum int) float32 
    if int32(nodeNum) > nc.largeClusterThreshold 
        return nc.secondaryEvictionLimiterQPS
    
    return 0

因此setLimiterInZone的逻辑为:

  • zone state为PartialDisruption时,设置Tainter Queue或者Pod Evictor Queue的rate limiter为:

    • 如果当前zone size大于nc.largeClusterThreshold(默认为50),则设置为secondaryEvictionLimiterQPS(默认为0.01)
    • 否则设置为0
  • zone state为FullDisruption时,设置Tainter Queue或者Pod Evictor Queue的rate limiter为evictionLimiterQPS(默认0.1)

  • 如果zone state为normal,则设置Tainter Queue或者Pod Evictor Queue的rate limiter为evictionLimiterQPS(默认为0.1)

Run TaintManager

在Node Controller的Run方法中,接着会启动Tainter Manager:

if nc.runTaintManager 
    go nc.taintManager.Run(wait.NeverStop)

nc.runTaintManager通过--enable-taint-manager设置,默认为true,因此默认情况下都会启动Taint Manager。

接下来,我们看看Taint Manager Run方法中都做了些什么。

pkg/controller/node/taint_controller.go:179

// Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed.
func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct) 
    glog.V(0).Infof("Starting NoExecuteTaintManager")
    // Functions that are responsible for taking work items out of the workqueues and putting them
    // into channels.
    go func(stopCh <-chan struct) 
        for 
            item, shutdown := tc.nodeUpdateQueue.Get()
            if shutdown 
                break
            
            nodeUpdate := item.(*nodeUpdateItem)
            select 
            case <-stopCh:
                break
            case tc.nodeUpdateChannel <- nodeUpdate:
            
        
    (stopCh)

    go func(stopCh <-chan struct) 
        for 
            item, shutdown := tc.podUpdateQueue.Get()
            if shutdown 
                break
            
            podUpdate := item.(*podUpdateItem)
            select 
            case <-stopCh:
                break
            case tc.podUpdateChannel <- podUpdate:
            
        
    (stopCh)

    // When processing events we want to prioritize Node updates over Pod updates,
    // as NodeUpdates that interest NoExecuteTaintManager should be handled as soon as possible -
    // we don't want user (or system) to wait until PodUpdate queue is drained before it can
    // start evicting Pods from tainted Nodes.
    for 
        select 
        case <-stopCh:
            break
        case nodeUpdate := <-tc.nodeUpdateChannel:
            tc.handleNodeUpdate(nodeUpdate)
        case podUpdate := <-tc.podUpdateChannel:
            // If we found a Pod update we need to empty Node queue first.
        priority:
            for 
                select 
                case nodeUpdate := <-tc.nodeUpdateChannel:
                    tc.handleNodeUpdate(nodeUpdate)
                default:
                    break priority
                
            
            // After Node queue is emptied we process podUpdate.
            tc.handlePodUpdate(podUpdate)
        
    
  • 启动一个goroutine从NoExecuteTaintManager的nodeUpdateQueue中获取nodeUpdate,并扔给tc.nodeUpdateChannel。
  • 启动一个goroutine从NoExecuteTaintManager的podUpdateQueue中获取podUpdate,并扔给tc.podUpdateChannel。
  • 调用tc.handleNodeUpdate处理完所有nodeUpdate后,才会去调用tc.handlePodUpdate开始处理podUpdate。

关于NoExecuteTaintManager的handleNodeUpdate和handlePodUpdate,我会在之后专门对Taint Manager写一篇博客进行分析,在此就不会再深入下去。

doTaintingPass

如果useTaintBasedEvictions为true,即--feature-gates中指定TaintBasedEvictions为true(默认TaintBasedEvictions=false)则每隔100ms调用一次doTaintingPass。

doTaintingPass就是根据Node Condition是NotReady或者Unknown,调apiserver,分别给node打上对应的Taint:node.alpha.kubernetes.io/notReadynode.alpha.kubernetes.io/unreachable

if nc.useTaintBasedEvictions 
    // Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
    // taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
    go wait.Until(nc.doTaintingPass, nodeEvictionPeriod, wait.NeverStop)

doEvictionPass

如果useTaintBasedEvictions为false(默认TaintBasedEvictions=false)则每隔100ms调用一次doEvictionPass。

else 
    // Managing eviction of nodes:
    // When we delete pods off a node, if the node was not empty at the time we then
    // queue an eviction watcher. If we hit an error, retry deletion.
    go wait.Until(nc.doEvictionPass, nodeEvictionPeriod, wait.NeverStop)

我们接着来看doEvictionPass的代码:

pkg/controller/node/nodecontroller.go:481

func (nc *NodeController) doEvictionPass() 
    ...

    // 遍历所有zone的pod Evictor,从pod Evictor queue中获取node name,然后调用deletePods删除node上的所有pods(deamonSet对应的Pod除外)
    for k := range nc.zonePodEvictor 
        // Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
        nc.zonePodEvictor[k].Try(func(value TimedValue) (bool, time.Duration) 
            node, err := nc.nodeLister.Get(value.Value)
            ...
            nodeUid, _ := value.UID.(string)
            remaining, err := deletePods(nc.kubeClient, nc.recorder, value.Value, nodeUid, nc.daemonSetStore)
            ...
            return true, 0
        )
    

doEvictionPass的逻辑:

  • 遍历所有zone的pod Evictor,从pod Evictor queue中获取node name,
  • 然后调用deletePods删除node上的所有pods(deamonSet对应的Pod除外)

deletePods的代码如下,

pkg/controller/node/controller_utils.go:49

// deletePods will delete all pods from master running on given node, and return true
// if any pods were deleted, or were found pending deletion.
func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore extensionslisters.DaemonSetLister) (bool, error) 
    ...
    // 从apiserver中获取所有的pods对象。
    pods, err := kubeClient.Core().Pods(metav1.NamespaceAll).List(options)
    ...

    // 逐个遍历pods中的pod,筛选出该node上的pods
    for _, pod := range pods.Items 
        // Defensive check, also needed for tests.
        if pod.Spec.NodeName != nodeName 
            continue
        

        ...

        // if the pod has already been marked for deletion, we still return true that there are remaining pods.
        if pod.DeletionGracePeriodSeconds != nil 
            remaining = true
            continue
        

        // if the pod is managed by a daemonset, ignore it
        _, err := daemonStore.GetPodDaemonSets(&pod)
        if err == nil  // No error means at least one daemonset was found
            continue
        

        ...

        // 调用apiserver接口删除pod
        if err := kubeClient.Core().Pods(pod.Namespace).Delete(pod.Name, nil); err != nil 
            return false, err
        
        remaining = true
    

    ...
    return remaining, nil

deletePods的主要逻辑如下:

  • 从apiserver中获取所有的pods对象。
  • 逐个遍历pods中的pod,筛选出该node上的pods
  • 如果pod已经被标记为删除(pod.DeletionGracePeriodSeconds != nil ),我们跳过这个pod.
  • 如果pod是某个daemonset的pod,我们跳过这个pod。
  • 除此之外,调用apiserver接口删除pod。

至此,Node Controller Run方法的所有主要逻辑都已分析完成。

其中涉及到Taint Manager Run的相关逻辑,在该博文没有深入进去分析,我将在后续博文中对Taint Manager做一次专门的分析。

以上是关于Kubernetes Node Controller源码分析之执行篇的主要内容,如果未能解决你的问题,请参考以下文章

Kubernetes-集群扩容增加node节点

Kubernetes命令kubectl 在Node节点上的使用

浅谈kubernetes:master节点和node节点

Node.js & Kubernetes Graceful Shutdown

Kubernetes容器集群管理环境 - Node节点的移除与加入

Kubernetes集群Node管理