Kubernetes Node Controller源码分析之创建篇

Posted WaltonWang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kubernetes Node Controller源码分析之创建篇相关的知识,希望对你有一定的参考价值。

Author: xidianwangtao@gmail.com

摘要:我认为,Node Controller是Kubernetes几十个Controller中最为重要的Controller之一,其重要程度在Top3,然而这可能也是最为复杂的一个Controller,因此对其的源码分析,我将做一个系列文章,希望能帮助自己有一个深入的理解。本博文主要对Node Controller的创建流程进行源码分析。

NewNodeController入口

Controller Manager在启动时,会启动一系列的Controller,Node Controller也是在Controller Manager启动时StartControllers方法中启动的Controller之一,其对应的创建代码如下。

cmd/kube-controller-manager/app/controllermanager.go:455

nodeController, err := nodecontroller.NewNodeController(
            sharedInformers.Core().V1().Pods(),
            sharedInformers.Core().V1().Nodes(),
            sharedInformers.Extensions().V1beta1().DaemonSets(),
            cloud,
            clientBuilder.ClientOrDie("node-controller"),
            s.PodEvictionTimeout.Duration,
            s.NodeEvictionRate,
            s.SecondaryNodeEvictionRate,
            s.LargeClusterSizeThreshold,
            s.UnhealthyZoneThreshold,
            s.NodeMonitorGracePeriod.Duration,
            s.NodeStartupGracePeriod.Duration,
            s.NodeMonitorPeriod.Duration,
            clusterCIDR,
            serviceCIDR,
            int(s.NodeCIDRMaskSize),
            s.AllocateNodeCIDRs,
            s.EnableTaintManager,
            utilfeature.DefaultFeatureGate.Enabled(features.TaintBasedEvictions),
        )

可见,Node Controller主要是ListWatch sharedInformers中的如下对象:

  • Pods
  • Nodes
  • DaemonSets

另外,需要注意:

  • s.EnableTaintManager的默认值为true,即表示默认开启Taint Manager,可通过--enable-taint-manager进行设置。
  • DefaultFeatureGate.Enabled(features.TaintBasedEvictions)的默认值为false,可通过--feature-gates中添加TaintBasedEvictions=true修改为true,true即表示Node上的Pods Eviction Operation通过TaintManager来进行。

补充:关于Kubernetes的Default FeaturesGate的设置见如下代码:

pkg/features/kube_features.go:100

var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureSpec
    ExternalTrafficLocalOnly:                    Default: true, PreRelease: utilfeature.Beta,
    AppArmor:                                    Default: true, PreRelease: utilfeature.Beta,
    DynamicKubeletConfig:                        Default: false, PreRelease: utilfeature.Alpha,
    DynamicVolumeProvisioning:                   Default: true, PreRelease: utilfeature.Alpha,
    ExperimentalHostUserNamespaceDefaultingGate: Default: false, PreRelease: utilfeature.Beta,
    ExperimentalCriticalPodAnnotation:           Default: false, PreRelease: utilfeature.Alpha,
    AffinityInAnnotations:                       Default: false, PreRelease: utilfeature.Alpha,
    Accelerators:                                Default: false, PreRelease: utilfeature.Alpha,
    TaintBasedEvictions:                         Default: false, PreRelease: utilfeature.Alpha,

    // inherited features from generic apiserver, relisted here to get a conflict if it is changed
    // unintentionally on either side:
    StreamingProxyRedirects: Default: true, PreRelease: utilfeature.Beta,

NewNodeController定义


func NewNodeController(
    podInformer coreinformers.PodInformer,
    nodeInformer coreinformers.NodeInformer,
    daemonSetInformer extensionsinformers.DaemonSetInformer,
    cloud cloudprovider.Interface,
    kubeClient clientset.Interface,
    podEvictionTimeout time.Duration,
    evictionLimiterQPS float32,
    secondaryEvictionLimiterQPS float32,
    largeClusterThreshold int32,
    unhealthyZoneThreshold float32,
    nodeMonitorGracePeriod time.Duration,
    nodeStartupGracePeriod time.Duration,
    nodeMonitorPeriod time.Duration,
    clusterCIDR *net.IPNet,
    serviceCIDR *net.IPNet,
    nodeCIDRMaskSize int,
    allocateNodeCIDRs bool,
    runTaintManager bool,
    useTaintBasedEvictions bool) (*NodeController, error) 

    ...

    nc := &NodeController
        cloud:                           cloud,
        knownNodeSet:                    make(map[string]*v1.Node),
        kubeClient:                      kubeClient,
        recorder:                        recorder,
        podEvictionTimeout:              podEvictionTimeout,
        maximumGracePeriod:              5 * time.Minute,    // 不可配置,表示"The maximum duration before a pod evicted from a node can be forcefully terminated"
        zonePodEvictor:                  make(map[string]*RateLimitedTimedQueue),
        zoneNotReadyOrUnreachableTainer: make(map[string]*RateLimitedTimedQueue),
        nodeStatusMap:                   make(map[string]nodeStatusData),
        nodeMonitorGracePeriod:          nodeMonitorGracePeriod,
        nodeMonitorPeriod:               nodeMonitorPeriod,
        nodeStartupGracePeriod:          nodeStartupGracePeriod,
        lookupIP:                        net.LookupIP,
        now:                             metav1.Now,
        clusterCIDR:                     clusterCIDR,
        serviceCIDR:                     serviceCIDR,
        allocateNodeCIDRs:               allocateNodeCIDRs,
        forcefullyDeletePod:             func(p *v1.Pod) error  return forcefullyDeletePod(kubeClient, p) ,
        nodeExistsInCloudProvider:       func(nodeName types.NodeName) (bool, error)  return nodeExistsInCloudProvider(cloud, nodeName) ,
        evictionLimiterQPS:              evictionLimiterQPS,
        secondaryEvictionLimiterQPS:     secondaryEvictionLimiterQPS,
        largeClusterThreshold:           largeClusterThreshold,
        unhealthyZoneThreshold:          unhealthyZoneThreshold,
        zoneStates:                      make(map[string]zoneState),
        runTaintManager:                 runTaintManager,
        useTaintBasedEvictions:          useTaintBasedEvictions && runTaintManager,
    

    ...

    // 注册enterPartialDisruptionFunc函数为ReducedQPSFunc,当zone state为"PartialDisruption"时,将invoke ReducedQPSFunc来setLimiterInZone。
    nc.enterPartialDisruptionFunc = nc.ReducedQPSFunc

    // 注册enterFullDisruptionFunc函数为HealthyQPSFunc,当zone state为"FullDisruption"时,将invoke HealthyQPSFunc来setLimiterInZone。
    nc.enterFullDisruptionFunc = nc.HealthyQPSFunc

    // 注册computeZoneStateFunc函数为ComputeZoneState,当handleDisruption时,将invoke ComputeZoneState来计算集群中unhealthy node number及zone state。
    nc.computeZoneStateFunc = nc.ComputeZoneState


    // 注册PodInformer的Event Handler:Add,Update,Delete。
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs

        // 对于Pod Add和Update Event,都会去判断Node上kubelet的version,如果version低于1.1.0,则会通过forcefullyDeletePod直接调用apiserver接口删除etcd中该Pod object。
        // 对于Pod Add, Update, Delete Event,如果启动了TaintManager,则会对比OldPod和newPod的Tolerations信息,如果不相同,则会将该Pod的变更信息Add到NoExecuteTaintManager的podUpdateQueue中,交给Taint Controller处理。只不过对于Delete Event,newPod 为nil。
        AddFunc: func(obj interface) 
            nc.maybeDeleteTerminatingPod(obj)
            pod := obj.(*v1.Pod)
            if nc.taintManager != nil 
                nc.taintManager.PodUpdated(nil, pod)
            
        ,
        UpdateFunc: func(prev, obj interface) 
            nc.maybeDeleteTerminatingPod(obj)
            prevPod := prev.(*v1.Pod)
            newPod := obj.(*v1.Pod)
            if nc.taintManager != nil 
                nc.taintManager.PodUpdated(prevPod, newPod)
            
        ,
        DeleteFunc: func(obj interface) 
            pod, isPod := obj.(*v1.Pod)
            // We can get DeletedFinalStateUnknown instead of *v1.Node here and we need to handle that correctly. #34692
            if !isPod 
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
                if !ok 
                    glog.Errorf("Received unexpected object: %v", obj)
                    return
                
                pod, ok = deletedState.Obj.(*v1.Pod)
                if !ok 
                    glog.Errorf("DeletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj)
                    return
                
            
            if nc.taintManager != nil 
                nc.taintManager.PodUpdated(pod, nil)
            
        ,
    )

    // returns true if the shared informer's store has synced.
    nc.podInformerSynced = podInformer.Informer().HasSynced


    // 注册NodeInformer的Event Handler:Add,Update,Delete。
    nodeEventHandlerFuncs := cache.ResourceEventHandlerFuncs
    if nc.allocateNodeCIDRs 
       // --allocate-node-cidrs —— Should CIDRs for Pods be allocated and set on the cloud provider。
        ...
     else 
        nodeEventHandlerFuncs = cache.ResourceEventHandlerFuncs

          // 对于Node Add, Update, Delete Event,如果启动了TaintManager,则会对比OldNode和newNode的Taints信息,如果不相同,则会将该Node的变更信息Add到NoExecuteTaintManager的nodeUpdateQueue中,交给Taint Controller处理。只不过对于Delete Event,newNode 为nil。
            AddFunc: func(originalObj interface) 
                obj, err := api.Scheme.DeepCopy(originalObj)
                if err != nil 
                    utilruntime.HandleError(err)
                    return
                
                node := obj.(*v1.Node)
                if nc.taintManager != nil 
                    nc.taintManager.NodeUpdated(nil, node)
                
            ,
            UpdateFunc: func(oldNode, newNode interface) 
                node := newNode.(*v1.Node)
                prevNode := oldNode.(*v1.Node)
                if nc.taintManager != nil 
                    nc.taintManager.NodeUpdated(prevNode, node)

                
            ,
            DeleteFunc: func(originalObj interface) 
                obj, err := api.Scheme.DeepCopy(originalObj)
                if err != nil 
                    utilruntime.HandleError(err)
                    return
                

                node, isNode := obj.(*v1.Node)
                // We can get DeletedFinalStateUnknown instead of *v1.Node here and we need to handle that correctly. #34692
                if !isNode 
                    deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
                    if !ok 
                        glog.Errorf("Received unexpected object: %v", obj)
                        return
                    
                    node, ok = deletedState.Obj.(*v1.Node)
                    if !ok 
                        glog.Errorf("DeletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj)
                        return
                    
                
                if nc.taintManager != nil 
                    nc.taintManager.NodeUpdated(node, nil)
                
            ,
        
    

    // 注册NoExecuteTaintManager为taintManager。
    if nc.runTaintManager 
        nc.taintManager = NewNoExecuteTaintManager(kubeClient)
    
    nodeInformer.Informer().AddEventHandler(nodeEventHandlerFuncs)
    nc.nodeLister = nodeInformer.Lister()

    // returns true if the shared informer's nodeStore has synced.
    nc.nodeInformerSynced = nodeInformer.Informer().HasSynced

    // returns true if the shared informer's daemonSetStore has synced.
    nc.daemonSetStore = daemonSetInformer.Lister()
    nc.daemonSetInformerSynced = daemonSetInformer.Informer().HasSynced

    return nc, nil

因此,创建NodeController实例时,主要进行了如下工作:

  • maximumGracePeriod - The maximum duration before a pod evicted from a node can be forcefully terminated. 不可配置,代码中写死为5min。
  • 注册enterPartialDisruptionFunc函数为ReducedQPSFunc,当zone state为”PartialDisruption“时,将invoke ReducedQPSFuncsetLimiterInZone
  • 注册enterFullDisruptionFunc函数为HealthyQPSFunc,当zone state为”FullDisruption“时,将invoke HealthyQPSFuncsetLimiterInZone
  • 注册computeZoneStateFunc函数为ComputeZoneState,当handleDisruption时,将invoke ComputeZoneState来计算集群中unhealthy node number及zone state。
  • 注册PodInformer的Event Handler:Add,Update,Delete。
    • 对于Pod Add和Update Event,都会去判断Node上kubelet version,如果version低于1.1.0,则会通过forcefullyDeletePod直接调用apiserver接口删除etcd中该Pod object。
    • 对于Pod Add, Update, Delete Event,如果启动了TaintManager,则会对比OldPod和newPod的Tolerations信息,如果不相同,则会将该Pod的变更信息Add到NoExecuteTaintManagerpodUpdateQueue中,交给Taint Controller处理。只不过对于Delete Event,newPod 为nil。
  • 注册PodInformerSynced,用来检查the shared informer’s Podstore 是否已经synced.
  • 注册NodeInformer的Event Handler:Add,Update,Delete。
    • 对于Node Add, Update, Delete Event,如果启动了TaintManager,则会对比OldNode和newNode的Taints信息,如果不相同,则会将该Node的变更信息Add到NoExecuteTaintManagernodeUpdateQueue中,交给Taint Controller处理。只不过对于Delete Event,newNode 为nil。
  • 注册NoExecuteTaintManager为taintManager。
  • 注册NodeInformerSynced,用来检查the shared informer’s NodeStore 是否已经synced.
  • 注册DaemonSetInformerSynced,用来检查the shared informer’s DaemonSetStore 是否已经synced.

关于ZoneState

上面提到ZoneState,关于ZoneState是怎么来的,见如下代码:

pkg/api/v1/types.go:3277

const (
    // NodeReady means kubelet is healthy and ready to accept pods.
    NodeReady NodeConditionType = "Ready"
    // NodeOutOfDisk means the kubelet will not accept new pods due to insufficient free disk
    // space on the node.
    NodeOutOfDisk NodeConditionType = "OutOfDisk"
    // NodeMemoryPressure means the kubelet is under pressure due to insufficient available memory.
    NodeMemoryPressure NodeConditionType = "MemoryPressure"
    // NodeDiskPressure means the kubelet is under pressure due to insufficient available disk.
    NodeDiskPressure NodeConditionType = "DiskPressure"
    // NodeNetworkUnavailable means that network for the node is not correctly configured.
    NodeNetworkUnavailable NodeConditionType = "NetworkUnavailable"
    // NodeInodePressure means the kubelet is under pressure due to insufficient available inodes.
    NodeInodePressure NodeConditionType = "InodePressure"
)



pkg/controller/node/nodecontroller.go:1149

// This function is expected to get a slice of NodeReadyConditions for all Nodes in a given zone.
// The zone is considered:
// - fullyDisrupted if there're no Ready Nodes,
// - partiallyDisrupted if at least than nc.unhealthyZoneThreshold percent of Nodes are not Ready,
// - normal otherwise
func (nc *NodeController) ComputeZoneState(nodeReadyConditions []*v1.NodeCondition) (int, zoneState) 
    readyNodes := 0
    notReadyNodes := 0
    for i := range nodeReadyConditions 
        if nodeReadyConditions[i] != nil && nodeReadyConditions[i].Status == v1.ConditionTrue 
            readyNodes++
         else 
            notReadyNodes++
        
    
    switch 
    case readyNodes == 0 && notReadyNodes > 0:
        return notReadyNodes, stateFullDisruption
    case notReadyNodes > 2 && float32(notReadyNodes)/float32(notReadyNodes+readyNodes) >= nc.unhealthyZoneThreshold:
        return notReadyNodes, statePartialDisruption
    default:
        return notReadyNodes, stateNormal
    

zone state共分为如下三种类型:

  • FullDisruption:Ready状态的Nodes number为0,并且NotReady状态的Nodes number大于0。
  • PartialDisruption:NotReady状态的Nodes number大于2,并且notReadyNodes/(notReadyNodes+readyNodes) >= nc.unhealthyZoneThreshold,其中nc.unhealthyZoneThreshold通过--unhealthy-zone-threshold设置,默认为0.55。
  • Normal:除了以上两种zone state,其他都属于Normal状态。

以上是关于Kubernetes Node Controller源码分析之创建篇的主要内容,如果未能解决你的问题,请参考以下文章

Kubernetes-集群扩容增加node节点

Kubernetes命令kubectl 在Node节点上的使用

浅谈kubernetes:master节点和node节点

Node.js & Kubernetes Graceful Shutdown

Kubernetes容器集群管理环境 - Node节点的移除与加入

Kubernetes集群Node管理