Kubernetes Node Controller源码分析之执行篇
Posted WaltonWang
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kubernetes Node Controller源码分析之执行篇相关的知识,希望对你有一定的参考价值。
Author: xidianwangtao@gmail.com
摘要:我认为,Node Controller是Kubernetes几十个Controller中最为重要的Controller之一,其重要程度在Top3,然而这可能也是最为复杂的一个Controller,因此对其的源码分析,我将做一个系列文章,希望能帮助自己有一个深入浅出的理解。本博文从NodeController的Run方法作为入口,对其工作原理作了跟踪分析。
Node Controller的执行
Node Controller的Run方法如下,这是所有Node Controller真正处理逻辑的入口。
pkg/controller/node/nodecontroller.go:550
// Run starts an asynchronous loop that monitors the status of cluster nodes.
func (nc *NodeController) Run()
go func()
defer utilruntime.HandleCrash()
if !cache.WaitForCacheSync(wait.NeverStop, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced)
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
// Incorporate the results of node status pushed from kubelet to master.
go wait.Until(func()
if err := nc.monitorNodeStatus(); err != nil
glog.Errorf("Error monitoring node status: %v", err)
, nc.nodeMonitorPeriod, wait.NeverStop)
if nc.runTaintManager
go nc.taintManager.Run(wait.NeverStop)
if nc.useTaintBasedEvictions
// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
go wait.Until(nc.doTaintingPass, nodeEvictionPeriod, wait.NeverStop)
else
// Managing eviction of nodes:
// When we delete pods off a node, if the node was not empty at the time we then
// queue an eviction watcher. If we hit an error, retry deletion.
go wait.Until(nc.doEvictionPass, nodeEvictionPeriod, wait.NeverStop)
()
WaitForCacheSync(wait.NeverStop, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced)
- Node Controller首先调用WaitForCacheSync,等待PodInformer、NodeInformer、DaemonSetInformer的HasSyncs都返回true,即这三个API Object都完成同步。
vendor/k8s.io/client-go/tools/cache/shared_informer.go:100
// WaitForCacheSync waits for caches to populate. It returns true if it was successful, false
// if the contoller should shutdown
func WaitForCacheSync(stopCh <-chan struct, cacheSyncs ...InformerSynced) bool
// 每隔100ms遍历一次cacheSyncs中的InformerSynced方法,
// 当所有要求的cacheSyncs方法都返回true,
// 意味着所有要求的cache都已经同步后,则WaitForCacheSync返回true,
// 否则继续遍历。
err := wait.PollUntil(syncedPollPeriod,
func() (bool, error)
for _, syncFunc := range cacheSyncs
if !syncFunc()
return false, nil
return true, nil
,
stopCh)
if err != nil
glog.V(2).Infof("stop requested")
return false
glog.V(4).Infof("caches populated")
return true
WaitForCacheSync的实现逻辑是:
每隔100ms遍历一次cacheSyncs中的InformerSynced方法,当所有要求的cacheSyncs方法都返回true,意味着所有要求的cache都已经同步后,则WaitForCacheSync返回true,
否则按照100ms的周期继续遍历,知道返回true或者受到stop信号为止。
启动goruntime按照5s的周期执行monitorNodeStatus,进行Node状态监控
pkg/controller/node/nodecontroller.go:586
// monitorNodeStatus verifies node status are constantly updated by kubelet, and if not,
// post "NodeReady==ConditionUnknown". It also evicts all pods if node is not ready or
// not reachable for a long period of time.
func (nc *NodeController) monitorNodeStatus() error
// We are listing nodes from local cache as we can tolerate some small delays
// comparing to state from etcd and there is eventual consistency anyway.
nodes, err := nc.nodeLister.List(labels.Everything())
if err != nil
return err
// 对比knownNodeSet和nodes数据,得到对应的added和deleted Node列表
added, deleted := nc.checkForNodeAddedDeleted(nodes)
// 遍历added Node列表,表示Node Controller观察到一个新的Node加入集群
for i := range added
...
// 将added node添加到knowNodeSet中
nc.knownNodeSet[added[i].Name] = added[i]
// When adding new Nodes we need to check if new zone appeared, and if so add new evictor.
zone := utilnode.GetZoneKey(added[i])
if _, found := nc.zoneStates[zone]; !found
// 设置该Node对应的新zone状态为“Initial”
nc.zoneStates[zone] = stateInitial
// 如果Node Controller的useTaintBasedEvictions为false(--feature-gates中指定,默认TaintBasedEvictions=false),
// 则添加该zone对应的zonePodEvictor,并设置evictionLimiterQPS(--node-eviction-rate设置,默认为0.1)
if !nc.useTaintBasedEvictions
nc.zonePodEvictor[zone] =
NewRateLimitedTimedQueue(
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, evictionRateLimiterBurst))
else
// 如果Node Controller的useTaintBasedEvictions为true,
// 则添加该zone对应的zoneNotReadyOrUnreachableTainer,并设置evictionLimiterQPS
nc.zoneNotReadyOrUnreachableTainer[zone] =
NewRateLimitedTimedQueue(
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, evictionRateLimiterBurst))
...
// 如果Node Controller的useTaintBasedEvictions为true,调用RemoveTaintOffNode将Node上对应的Taints(node.alpha.kubernetes.io/notReady和node.alpha.kubernetes.io/unreachable)清除掉,
// 并将其从zoneNotReadyOrUnreachableTainer Queue中Remove(如果它在这个Queue中)
if nc.useTaintBasedEvictions
nc.markNodeAsHealthy(added[i])
else
// 如果Node Controller的useTaintBasedEvictions为false,即使用zonePodEvictor时,
// 将该node从对应的zonePodEvictor Queue中Remove
nc.cancelPodEviction(added[i])
// 遍历deleted Nodes列表,将其从knowNodeSet中删除
for i := range deleted
...
delete(nc.knownNodeSet, deleted[i].Name)
zoneToNodeConditions := map[string][]*v1.NodeCondition
for i := range nodes
...
// PollImmediate tries a condition func until it returns true, an error, or the timeout is reached.
// retrySleepTime为20ms,timeout为100ms。
if err := wait.PollImmediate(retrySleepTime, retrySleepTime*nodeStatusUpdateRetry, func() (bool, error)
// nc.tryUpdateNodeStatus - For a given node checks its conditions and tries to update it.
// Returns grace period to which given node is entitled, state of current and last observed Ready Condition, and an error if it occurred.
gracePeriod, observedReadyCondition, currentReadyCondition, err = nc.tryUpdateNodeStatus(node)
if err == nil
return true, nil
...
);
...
// 对于非master节点,将node对应的NodeCondition添加到zoneToNodeConditions Map中。
// We do not treat a master node as a part of the cluster for network disruption checking.
if !system.IsMasterNode(node.Name)
zoneToNodeConditions[utilnode.GetZoneKey(node)] = append(zoneToNodeConditions[utilnode.GetZoneKey(node)], currentReadyCondition)
decisionTimestamp := nc.now()
if currentReadyCondition != nil
// 当观察到Node的Condition为NotReady时,根据是否useTaintBasedEvictions是否为true,分别进行处理
// Check eviction timeout against decisionTimestamp
if observedReadyCondition.Status == v1.ConditionFalse
// useTaintBasedEvictions为true时,
if nc.useTaintBasedEvictions
// 如果该node的已经被Taint为UnreachableTaint,则将其改成NotReadyTaint
// We want to update the taint straight away if Node is already tainted with the UnreachableTaint
if v1.TaintExists(node.Spec.Taints, UnreachableTaintTemplate)
taintToAdd := *NotReadyTaintTemplate
if !swapNodeControllerTaint(nc.kubeClient, &taintToAdd, UnreachableTaintTemplate, node)
...
// 将node加入到Tainer Queue中,交给Taint Controller处理
else if nc.markNodeForTainting(node)
...
// 如果useTaintBasedEvictions为false时,表示使用Pod Eivict方式。
else
// 注意保证readyTransitionTimestamp + podEvictionTimeout(default 5min) > decisionTimestamp(当前时间)
if decisionTimestamp.After(nc.nodeStatusMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout))
// 将node加入到PodEvictor Queue中,交给PodEvictor处理
if nc.evictPods(node)
...
// 同理地,当观察到Node的Condition为Unknown时,根据是否useTaintBasedEvictions是否为true,分别进行处理
if observedReadyCondition.Status == v1.ConditionUnknown
// useTaintBasedEvictions为true时,
if nc.useTaintBasedEvictions
// 如果该node的已经被Taint为UnreachableTaint,则将其改成NotReadyTaint
// We want to update the taint straight away if Node is already tainted with the UnreachableTaint
if v1.TaintExists(node.Spec.Taints, NotReadyTaintTemplate)
taintToAdd := *UnreachableTaintTemplate
if !swapNodeControllerTaint(nc.kubeClient, &taintToAdd, NotReadyTaintTemplate, node)
...
// 将node加入到Tainer Queue中,交给Taint Controller处理
else if nc.markNodeForTainting(node)
...
// 如果useTaintBasedEvictions为false时,表示使用Pod Eivict方式。
else
// 注意保证probeTimestamp + podEvictionTimeout(default 5min) > decisionTimestamp(当前时间)
if decisionTimestamp.After(nc.nodeStatusMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout))
// 将node加入到PodEvictor Queue中,交给PodEvictor处理
if nc.evictPods(node)
...
// 同理地,当观察到Node的Condition为True时,根据是否useTaintBasedEvictions是否为true,分别进行处理
if observedReadyCondition.Status == v1.ConditionTrue
// useTaintBasedEvictions为true时
if nc.useTaintBasedEvictions
// 并将其从zoneNotReadyOrUnreachableTainer Queue中Remove(如果它在这个Queue中)
removed, err := nc.markNodeAsHealthy(node)
...
else
// useTaintBasedEvictions为false时,将该node从对应的zonePodEvictor Queue中Remove
if nc.cancelPodEviction(node)
...
// 如果Node Status状态从Ready变为NotReady,则将给Node上的所有Pod Status设置为Not Ready
// Report node event.
if currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue
recordNodeStatusChange(nc.recorder, node, "NodeNotReady")
if err = markAllPodsNotReady(nc.kubeClient, node); err != nil
utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err))
// Check with the cloud provider to see if the node still exists. If it
// doesn't, delete the node immediately.
...
// 处理Disruption
nc.handleDisruption(zoneToNodeConditions, nodes)
return nil
- 对比knownNodeSet和nodes数据,得到对应的added和deleted Node列表
- 遍历added Node列表,表示Node Controller观察到一个新的Node加入集群
- 将added node添加到knowNodeSet中
- When adding new Nodes we need to check if new zone appeared, and if so add new evictor.如果是新zone,则:
- 设置该Node对应的新zone状态为“Initial”
- 如果Node Controller的useTaintBasedEvictions为false(–feature-gates中指定,默认TaintBasedEvictions=false),则添加该zone对应的zonePodEvictor,并设置evictionLimiterQPS(–node-eviction-rate设置,默认为0.1)
- 如果Node Controller的useTaintBasedEvictions为true,则添加该zone对应的zoneNotReadyOrUnreachableTainer,并设置evictionLimiterQPS
- 如果Node Controller的useTaintBasedEvictions为true,调用RemoveTaintOffNode将Node上对应的Taints(node.alpha.kubernetes.io/notReady和node.alpha.kubernetes.io/unreachable)清除掉,并将其从zoneNotReadyOrUnreachableTainer Queue中Remove(如果它在这个Queue中)
- 如果Node Controller的useTaintBasedEvictions为false,即使用zonePodEvictor时,将该node从对应的zonePodEvictor Queue中Remove。
- 遍历deleted Nodes列表,将其从knowNodeSet中删除
- 遍历所有nodes,
- 更新Node Status,得到上一次观察到的NodeCondition和当前的NodeCondition
- 对于非master节点,将node对应的NodeCondition添加到zoneToNodeConditions Map中。
- 当观察到Node的Condition为NotReady时,根据是否useTaintBasedEvictions是否为true,分别进行处理。
- useTaintBasedEvictions为true时,
- 如果该node的已经被Taint为UnreachableTaint,则将其改成NotReadyTaint
- 将node加入到Tainer Queue中,交给Taint Controller处理
- 如果useTaintBasedEvictions为false时,表示使用Pod Eivict方式。
- 注意保证readyTransitionTimestamp + podEvictionTimeout(default 5min) > decisionTimestamp(当前时间)
- 将node加入到PodEvictor Queue中,交给PodEvictor处理
- 同理地,当观察到Node的Condition为Unknown时,根据是否useTaintBasedEvictions是否为true,分别进行处理
- useTaintBasedEvictions为true时,
- 如果该node的已经被Taint为UnreachableTaint,则将其改成NotReadyTaint
- 将node加入到Tainer Queue中,交给Taint Controller处理
- 如果useTaintBasedEvictions为false时,表示使用Pod Eivict方式。
- 注意保证probeTimestamp + podEvictionTimeout(default 5min) > decisionTimestamp(当前时间)
- 将node加入到PodEvictor Queue中,交给PodEvictor处理
- 同理地,当观察到Node的Condition为True时,根据是否useTaintBasedEvictions是否为true,分别进行处理
- useTaintBasedEvictions为true时
- 将其从zoneNotReadyOrUnreachableTainer Queue中Remove(如果它在这个Queue中)
- useTaintBasedEvictions为false时,将该node从对应的zonePodEvictor Queue中Remove
- 如果Node Status状态从Ready变为NotReady,则将给Node上的所有Pod Status设置为Not Ready
- 执行handleDisruption
下面我们接着看handleDisruption
的逻辑:
pkg/controller/node/nodecontroller.go:772
func (nc *NodeController) handleDisruption(zoneToNodeConditions map[string][]*v1.NodeCondition, nodes []*v1.Node)
newZoneStates := map[string]zoneState
// 根据zoneToNodeConditions的数据,判断allAreFullyDisrupted是否为true(表示基于当前观察到的zone中nodeCondition来判断出的当前cluster所有zone是否都为FullDisruption状态)
allAreFullyDisrupted := true
for k, v := range zoneToNodeConditions
ZoneSize.WithLabelValues(k).Set(float64(len(v)))
unhealthy, newState := nc.computeZoneStateFunc(v)
ZoneHealth.WithLabelValues(k).Set(float64(100*(len(v)-unhealthy)) / float64(len(v)))
UnhealthyNodes.WithLabelValues(k).Set(float64(unhealthy))
if newState != stateFullDisruption
allAreFullyDisrupted = false
newZoneStates[k] = newState
if _, had := nc.zoneStates[k]; !had
glog.Errorf("Setting initial state for unseen zone: %v", k)
nc.zoneStates[k] = stateInitial
// 根据zoneStates的数据,判断allWasFullyDisrupted是否为true(表示基于上一次观察到的zone中nodeCondition来判断出的上一次cluster所有zone是否都为FullDisruption状态)
allWasFullyDisrupted := true
for k, v := range nc.zoneStates
if _, have := zoneToNodeConditions[k]; !have
ZoneSize.WithLabelValues(k).Set(0)
ZoneHealth.WithLabelValues(k).Set(100)
UnhealthyNodes.WithLabelValues(k).Set(0)
delete(nc.zoneStates, k)
continue
if v != stateFullDisruption
allWasFullyDisrupted = false
break
// At least one node was responding in previous pass or in the current pass. Semantics is as follows:
// - if the new state is "partialDisruption" we call a user defined function that returns a new limiter to use,
// - if the new state is "normal" we resume normal operation (go back to default limiter settings),
// - if new state is "fullDisruption" we restore normal eviction rate,
// - unless all zones in the cluster are in "fullDisruption" - in that case we stop all evictions.
if !allAreFullyDisrupted || !allWasFullyDisrupted
// 如果allAreFullyDisrupted为true且allWasFullyDisrupted为false,即cluster状态从非FullDisruption变成为FullDisruption时,则遍历所有nodes:
// We're switching to full disruption mode
if allAreFullyDisrupted
glog.V(0).Info("NodeController detected that all Nodes are not-Ready. Entering master disruption mode.")
for i := range nodes
// 如果useTaintBasedEvictions为true,则标记node为Healthy状态(remove taint from node,并且从Tainter Queue中Remove该node)
if nc.useTaintBasedEvictions
_, err := nc.markNodeAsHealthy(nodes[i])
if err != nil
glog.Errorf("Failed to remove taints from Node %v", nodes[i].Name)
else
// 如果useTaintBasedEvictions为false,则取消该node上的pod eviction(通过从zone pod Evictor queue中删除该node)
nc.cancelPodEviction(nodes[i])
// 设置所有zone的对应Tainter Queue或者Pod Evictor Queue的Rate Limeter为0,即表示停止所有的evictions。
// We stop all evictions.
for k := range nc.zoneStates
if nc.useTaintBasedEvictions
nc.zoneNotReadyOrUnreachableTainer[k].SwapLimiter(0)
else
nc.zonePodEvictor[k].SwapLimiter(0)
// 更新所有zone的状态(nc.zoneStates)为FullDisruption
for k := range nc.zoneStates
nc.zoneStates[k] = stateFullDisruption
// All rate limiters are updated, so we can return early here.
return
// 如果allWasFullyDisrupted为true且allAreFullyDisrupted为false,即cluster状态从FullDisruption变成为非FullDisruption时,则遍历所有nodes:
// We're exiting full disruption mode
if allWasFullyDisrupted
glog.V(0).Info("NodeController detected that some Nodes are Ready. Exiting master disruption mode.")
// When exiting disruption mode update probe timestamps on all Nodes.
now := nc.now()
for i := range nodes
v := nc.nodeStatusMap[nodes[i].Name]
v.probeTimestamp = now
v.readyTransitionTimestamp = now
nc.nodeStatusMap[nodes[i].Name] = v
// 根据zone size和zone state,重新设置对应的Disruption rate limiter的值。
// We reset all rate limiters to settings appropriate for the given state.
for k := range nc.zoneStates
nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newZoneStates[k])
nc.zoneStates[k] = newZoneStates[k]
return
// 如果allWasFullyDisrupted为false且allAreFullyDisrupted为false,即cluster状态保持为非FullDisruption时,则根据zone size和zone state,重新设置对应的Disruption rate limiter的值。
// We know that there's at least one not-fully disrupted so,
// we can use default behavior for rate limiters
for k, v := range nc.zoneStates
newState := newZoneStates[k]
if v == newState
continue
glog.V(0).Infof("NodeController detected that zone %v is now in state %v.", k, newState)
nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newState)
nc.zoneStates[k] = newState
因此,handleDisruption的逻辑为:
- 根据zoneToNodeConditions的数据,判断allAreFullyDisrupted是否为true(表示基于当前观察到的zone中nodeCondition来判断出的当前cluster所有zone是否都为FullDisruption状态)
- 根据zoneStates的数据,判断allWasFullyDisrupted是否为true(表示基于上一次观察到的zone中nodeCondition来判断出的上一次cluster所有zone是否都为FullDisruption状态)
- 如果allAreFullyDisrupted为true且allWasFullyDisrupted为false,即cluster状态从非FullDisruption变成为FullDisruption时,表示switching to full disruption mode,则遍历所有nodes:
- 如果useTaintBasedEvictions为true,则标记node为Healthy状态(remove taint from node,并且从Tainter Queue中Remove该node)
- 如果useTaintBasedEvictions为false,则取消该node上的pod eviction(通过从zone pod Evictor queue中删除该node)
- 设置所有zone的对应Tainter Queue或者Pod Evictor Queue的Rate Limeter为0,即表示停止所有的evictions。
- 更新所有zone的状态(nc.zoneStates)为FullDisruption
- 如果allWasFullyDisrupted为true且allAreFullyDisrupted为false,即cluster状态从FullDisruption变成为非FullDisruption时,表示 exiting disruption mode 则遍历所有nodes:
- update probe timestamps on all Nodes.
- 根据zone size和zone state,调用
setLimiterInZone
重新设置对应的Disruption rate limiter的值。
- 如果allWasFullyDisrupted为false且allAreFullyDisrupted为false,即cluster状态保持为非FullDisruption时,则根据zone size和zone state,调用
setLimiterInZone
重新设置对应的Disruption rate limiter的值。
下面接着来看setLimiterInZone的逻辑,它是如何根据zone size和zone state对应到不同的rate limiter的。
pkg/controller/node/nodecontroller.go:870
func (nc *NodeController) setLimiterInZone(zone string, zoneSize int, state zoneState)
switch state
// 如果zone state为normal,则设置对应的rate limiter为evictionLimiterQPS(默认为0.1)
case stateNormal:
if nc.useTaintBasedEvictions
nc.zoneNotReadyOrUnreachableTainer[zone].SwapLimiter(nc.evictionLimiterQPS)
else
nc.zonePodEvictor[zone].SwapLimiter(nc.evictionLimiterQPS)
// 如果zone state为PartialDisruption,则调用nc.enterPartialDisruptionFunc来设置对应的rate limiter。
case statePartialDisruption:
if nc.useTaintBasedEvictions
nc.zoneNotReadyOrUnreachableTainer[zone].SwapLimiter(
nc.enterPartialDisruptionFunc(zoneSize))
else
nc.zonePodEvictor[zone].SwapLimiter(
nc.enterPartialDisruptionFunc(zoneSize))
// 如果zone state为FullDisruption,则调用nc.enterFullDisruptionFunc来设置对应的rate limiter。
case stateFullDisruption:
if nc.useTaintBasedEvictions
nc.zoneNotReadyOrUnreachableTainer[zone].SwapLimiter(
nc.enterFullDisruptionFunc(zoneSize))
else
nc.zonePodEvictor[zone].SwapLimiter(
nc.enterFullDisruptionFunc(zoneSize))
nc.enterFullDisruptionFunc和nc.enterPartialDisruptionFunc是在调用NewNodeController创建Node Controller的时候赋值注册的:
pkg/controller/node/nodecontroller.go:270
func NewNodeController(...) (*NodeController, error)
...
nc.enterPartialDisruptionFunc = nc.ReducedQPSFunc
nc.enterFullDisruptionFunc = nc.HealthyQPSFunc
...
pkg/controller/node/nodecontroller.go:1132
// Default value for cluster eviction rate - we take nodeNum for consistency with ReducedQPSFunc.
func (nc *NodeController) HealthyQPSFunc(nodeNum int) float32
return nc.evictionLimiterQPS
// If the cluster is large make evictions slower, if they're small stop evictions altogether.
func (nc *NodeController) ReducedQPSFunc(nodeNum int) float32
if int32(nodeNum) > nc.largeClusterThreshold
return nc.secondaryEvictionLimiterQPS
return 0
因此setLimiterInZone的逻辑为:
zone state为PartialDisruption时,设置Tainter Queue或者Pod Evictor Queue的rate limiter为:
- 如果当前zone size大于
nc.largeClusterThreshold
(默认为50),则设置为secondaryEvictionLimiterQPS
(默认为0.01) - 否则设置为0
- 如果当前zone size大于
zone state为FullDisruption时,设置Tainter Queue或者Pod Evictor Queue的rate limiter为
evictionLimiterQPS
(默认0.1)如果zone state为normal,则设置Tainter Queue或者Pod Evictor Queue的rate limiter为
evictionLimiterQPS
(默认为0.1)
Run TaintManager
在Node Controller的Run方法中,接着会启动Tainter Manager:
if nc.runTaintManager
go nc.taintManager.Run(wait.NeverStop)
nc.runTaintManager通过--enable-taint-manager
设置,默认为true,因此默认情况下都会启动Taint Manager。
接下来,我们看看Taint Manager Run方法中都做了些什么。
pkg/controller/node/taint_controller.go:179
// Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed.
func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct)
glog.V(0).Infof("Starting NoExecuteTaintManager")
// Functions that are responsible for taking work items out of the workqueues and putting them
// into channels.
go func(stopCh <-chan struct)
for
item, shutdown := tc.nodeUpdateQueue.Get()
if shutdown
break
nodeUpdate := item.(*nodeUpdateItem)
select
case <-stopCh:
break
case tc.nodeUpdateChannel <- nodeUpdate:
(stopCh)
go func(stopCh <-chan struct)
for
item, shutdown := tc.podUpdateQueue.Get()
if shutdown
break
podUpdate := item.(*podUpdateItem)
select
case <-stopCh:
break
case tc.podUpdateChannel <- podUpdate:
(stopCh)
// When processing events we want to prioritize Node updates over Pod updates,
// as NodeUpdates that interest NoExecuteTaintManager should be handled as soon as possible -
// we don't want user (or system) to wait until PodUpdate queue is drained before it can
// start evicting Pods from tainted Nodes.
for
select
case <-stopCh:
break
case nodeUpdate := <-tc.nodeUpdateChannel:
tc.handleNodeUpdate(nodeUpdate)
case podUpdate := <-tc.podUpdateChannel:
// If we found a Pod update we need to empty Node queue first.
priority:
for
select
case nodeUpdate := <-tc.nodeUpdateChannel:
tc.handleNodeUpdate(nodeUpdate)
default:
break priority
// After Node queue is emptied we process podUpdate.
tc.handlePodUpdate(podUpdate)
- 启动一个goroutine从NoExecuteTaintManager的nodeUpdateQueue中获取nodeUpdate,并扔给tc.nodeUpdateChannel。
- 启动一个goroutine从NoExecuteTaintManager的podUpdateQueue中获取podUpdate,并扔给tc.podUpdateChannel。
- 调用
tc.handleNodeUpdate
处理完所有nodeUpdate后,才会去调用tc.handlePodUpdate
开始处理podUpdate。
关于NoExecuteTaintManager的handleNodeUpdate和handlePodUpdate,我会在之后专门对Taint Manager写一篇博客进行分析,在此就不会再深入下去。
doTaintingPass
如果useTaintBasedEvictions为true,即--feature-gates
中指定TaintBasedEvictions为true(默认TaintBasedEvictions=false)则每隔100ms调用一次doTaintingPass。
doTaintingPass就是根据Node Condition是NotReady或者Unknown,调apiserver,分别给node打上对应的Taint:node.alpha.kubernetes.io/notReady
和node.alpha.kubernetes.io/unreachable
。
if nc.useTaintBasedEvictions
// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
go wait.Until(nc.doTaintingPass, nodeEvictionPeriod, wait.NeverStop)
doEvictionPass
如果useTaintBasedEvictions为false(默认TaintBasedEvictions=false)则每隔100ms调用一次doEvictionPass。
else
// Managing eviction of nodes:
// When we delete pods off a node, if the node was not empty at the time we then
// queue an eviction watcher. If we hit an error, retry deletion.
go wait.Until(nc.doEvictionPass, nodeEvictionPeriod, wait.NeverStop)
我们接着来看doEvictionPass
的代码:
pkg/controller/node/nodecontroller.go:481
func (nc *NodeController) doEvictionPass()
...
// 遍历所有zone的pod Evictor,从pod Evictor queue中获取node name,然后调用deletePods删除node上的所有pods(deamonSet对应的Pod除外)
for k := range nc.zonePodEvictor
// Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
nc.zonePodEvictor[k].Try(func(value TimedValue) (bool, time.Duration)
node, err := nc.nodeLister.Get(value.Value)
...
nodeUid, _ := value.UID.(string)
remaining, err := deletePods(nc.kubeClient, nc.recorder, value.Value, nodeUid, nc.daemonSetStore)
...
return true, 0
)
doEvictionPass的逻辑:
- 遍历所有zone的pod Evictor,从pod Evictor queue中获取node name,
- 然后调用
deletePods
删除node上的所有pods(deamonSet对应的Pod除外)
deletePods
的代码如下,
pkg/controller/node/controller_utils.go:49
// deletePods will delete all pods from master running on given node, and return true
// if any pods were deleted, or were found pending deletion.
func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore extensionslisters.DaemonSetLister) (bool, error)
...
// 从apiserver中获取所有的pods对象。
pods, err := kubeClient.Core().Pods(metav1.NamespaceAll).List(options)
...
// 逐个遍历pods中的pod,筛选出该node上的pods
for _, pod := range pods.Items
// Defensive check, also needed for tests.
if pod.Spec.NodeName != nodeName
continue
...
// if the pod has already been marked for deletion, we still return true that there are remaining pods.
if pod.DeletionGracePeriodSeconds != nil
remaining = true
continue
// if the pod is managed by a daemonset, ignore it
_, err := daemonStore.GetPodDaemonSets(&pod)
if err == nil // No error means at least one daemonset was found
continue
...
// 调用apiserver接口删除pod
if err := kubeClient.Core().Pods(pod.Namespace).Delete(pod.Name, nil); err != nil
return false, err
remaining = true
...
return remaining, nil
deletePods
的主要逻辑如下:
- 从apiserver中获取所有的pods对象。
- 逐个遍历pods中的pod,筛选出该node上的pods
- 如果pod已经被标记为删除(pod.DeletionGracePeriodSeconds != nil ),我们跳过这个pod.
- 如果pod是某个daemonset的pod,我们跳过这个pod。
- 除此之外,调用apiserver接口删除pod。
至此,Node Controller Run方法的所有主要逻辑都已分析完成。
其中涉及到Taint Manager Run的相关逻辑,在该博文没有深入进去分析,我将在后续博文中对Taint Manager做一次专门的分析。
以上是关于Kubernetes Node Controller源码分析之执行篇的主要内容,如果未能解决你的问题,请参考以下文章
Kubernetes命令kubectl 在Node节点上的使用
Node.js & Kubernetes Graceful Shutdown