kubelet启动之启动oomWatcher模块

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kubelet启动之启动oomWatcher模块相关的知识,希望对你有一定的参考价值。

参考技术A 基于 kubernetes v1.18.6

oomWatcher 是一个 kubelet 子模块,在 kubelet 启动流程内启动。通过与 cAdvisor 模块交互采集主机 oom 进程信息,启动阶段如下图(红色部分)

oomWatcher 记录系统 oom 并记录到节点的 event

oomWatcher 模块在启动的时候会 fork 出两个 goroutine ,一个作为 oom 事件生产者,一个作为 oom 事件消费者。两个 goroutine 采用 channel 进行数据传输。

源码实现

cAdvisor 模块从 /dev/kmsg 读取数据解析出 oom 容器信息: pid 、 oom 时间戳、容器名称、容器内进程名称等。其中字段值如下:

过滤出 oom 信息,过滤规则:正则匹配包含 invoked oom-killer: 字符串的行。

随后利用正则匹配+字符串分割生成对应对象

cAdvisor 模块生产出的 oom 进程信息,最终被记录至节点的 event ,可通过以下方式获取相关信息。

Kubelet源码分析之volume manager源码分析


 kubernetes version: v1.12.1


kubernetes采用Volume Plugins来实现存储卷的挂载等操作。


这图谁画的咋这么好呢,借来用用!!!


Kubelet源码分析之volume manager源码分析


kubelet会调用VolumeManager,为pods准备存储设备,存储设备就绪会挂载存储设备到pod所在的节点上,并在容器启动的时候挂载在容器指定的目录中;同时,删除卸载不在使用的存储


VolumeManager接口


  • 运行在kubelet 里让存储Ready的部件,主要是mount/unmount(attach/detach可选)


  • pod调度到这个node上后才会有卷的相应操作,所以它的触发端是kubelet(严格讲是kubelet里的pod manager),根据Pod Manager里pod spec里申明的存储来触发卷的挂载操作


  • Kubelet会监听到调度到该节点上的pod声明,会把pod缓存到Pod Manager中,VolumeManager通过Pod Manager获取PV/PVC的状态,并进行分析出具体的attach/detach、mount/umount, 操作然后调用plugin进行相应的业务处理


 
   
   
 
  1. // VolumeManager runs a set of asynchronous loops that figure out which volumes

  2. // need to be attached/mounted/unmounted/detached based on the pods scheduled on

  3. // this node and makes it so.

  4. type VolumeManager interface {


  5.    Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})


  6.    WaitForAttachAndMount(pod *v1.Pod) error


  7.    GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap


  8.    GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64


  9.    GetVolumesInUse() []v1.UniqueVolumeName


  10.    ReconcilerStatesHasBeenSynced() bool


  11.    VolumeIsAttached(volumeName v1.UniqueVolumeName) bool


  12.    MarkVolumesAsReportedInUse(volumesReportedAsInUse []v1.UniqueVolumeName)

  13. }

volumeManager结构体

   路径:pkg/kubelet/volumemanager/volume_manager.go

  • volumeManager结构体实现了VolumeManager接口,主要有两个需要注意:

  • desiredStateOfWorld:预期状态,volume需要被attach,哪些pods引用这个volume

  • actualStateOfWorld:实际状态,volume已经被atttach哪个node,哪个pod mount volume


   
     
     
   
  1. / volumeManager implements the VolumeManager interface

  2. type volumeManager struct {

  3.    // kubeClient is the kube API client used by DesiredStateOfWorldPopulator to

  4.    // communicate with the API server to fetch PV and PVC objects

  5.    kubeClient clientset.Interface


  6.    // volumePluginMgr is the volume plugin manager used to access volume

  7.    // plugins. It must be pre-initialized.

  8.    volumePluginMgr *volume.VolumePluginMgr


  9.    // desiredStateOfWorld is a data structure containing the desired state of

  10.    // the world according to the volume manager: i.e. what volumes should be

  11.    // attached and which pods are referencing the volumes).

  12.    // The data structure is populated by the desired state of the world

  13.    // populator using the kubelet pod manager.

  14.    desiredStateOfWorld cache.DesiredStateOfWorld


  15.    // actualStateOfWorld is a data structure containing the actual state of

  16.    // the world according to the manager: i.e. which volumes are attached to

  17.    // this node and what pods the volumes are mounted to.

  18.    // The data structure is populated upon successful completion of attach,

  19.    // detach, mount, and unmount actions triggered by the reconciler.

  20.    actualStateOfWorld cache.ActualStateOfWorld


  21.    // operationExecutor is used to start asynchronous attach, detach, mount,

  22.    // and unmount operations.

  23.    operationExecutor operationexecutor.OperationExecutor


  24.    // reconciler runs an asynchronous periodic loop to reconcile the

  25.    // desiredStateOfWorld with the actualStateOfWorld by triggering attach,

  26.    // detach, mount, and unmount operations using the operationExecutor.

  27.    reconciler reconciler.Reconciler


  28.    // desiredStateOfWorldPopulator runs an asynchronous periodic loop to

  29.    // populate the desiredStateOfWorld using the kubelet PodManager.

  30.    desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator

  31. }

1. volumeManager初始化

    路径: pkg/kubelet/kubelet.go文件中NewMainKubelet函数

    初始化VolumeManager结构体,结构体VolumeManager如上所示:

   
     
     
   
  1.    // setup volumeManager

  2.    klet.volumeManager = volumemanager.NewVolumeManager(

  3.        kubeCfg.EnableControllerAttachDetach,

  4.        nodeName,

  5.        klet.podManager,

  6.        klet.statusManager,

  7.        klet.kubeClient,

  8.        klet.volumePluginMgr,

  9.        klet.containerRuntime,

  10.        kubeDeps.Mounter,

  11.        klet.getPodsDir(),

  12.        kubeDeps.Recorder,

  13.        experimentalCheckNodeCapabilitiesBeforeMount,

  14.        keepTerminatedPodVolumes)

1.1 启动VolumeManager的run方法

  • goroutine启动descriedStateOfWorldPopulator的Run函数,从apiserver同步到的pod信息,来更新DesiredStateOfWorld

  • goroutine启动reconciler的Run函数 ,预期状态和实际状态的协调者,负责调整实际状态至预期状态

   
     
     
   
  1. func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {

  2.    defer runtime.HandleCrash()


  3.    go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh)

  4.    glog.V(2).Infof("The desired_state_of_world populator starts")


  5.    glog.Infof("Starting Kubelet Volume Manager")

  6.    go vm.reconciler.Run(stopCh)


  7.    metrics.Register(vm.actualStateOfWorld, vm.desiredStateOfWorld, vm.volumePluginMgr)


  8.    <-stopCh

  9.    glog.Infof("Shutting down Kubelet Volume Manager")

  10. }

1.2 desiredStateOfWorldPopulator Run函数

从apiserver同步到的pod信息,来更新DesiredStateOfWorld,主要函数为populatorLoop函数第2章节讲解

   
     
     
   
  1. func (dswp *desiredStateOfWorldPopulator) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {

  2.    // Wait for the completion of a loop that started after sources are all ready, then set hasAddedPods accordingly

  3.    glog.Infof("Desired state populator starts to run")

  4.    wait.PollUntil(dswp.loopSleepDuration, func() (bool, error) {

  5.        done := sourcesReady.AllReady()

  6.        dswp.populatorLoop()

  7.        return done, nil

  8.    }, stopCh)

  9.    dswp.hasAddedPodsLock.Lock()

  10.    dswp.hasAddedPods = true

  11.    dswp.hasAddedPodsLock.Unlock()

  12.    wait.Until(dswp.populatorLoop, dswp.loopSleepDuration, stopCh)

  13. }

1.3 reconciler Run函数

预期状态和实际状态的协调者,负责调整实际状态至预期状态,reconcile函数第4章节讲解

   
     
     
   
  1. func (rc *reconciler) Run(stopCh <-chan struct{}) {

  2.    wait.Until(rc.reconciliationLoopFunc(), rc.loopSleepDuration, stopCh)

  3. }


  4. func (rc *reconciler) reconciliationLoopFunc() func() {

  5.    return func() {

  6.        rc.reconcile()


  7.        // Sync the state with the reality once after all existing pods are added to the desired state from all sources.

  8.        // Otherwise, the reconstruct process may clean up pods' volumes that are still in use because

  9.        // desired state of world does not contain a complete list of pods.

  10.        if rc.populatorHasAddedPods() && !rc.StatesHasBeenSynced() {

  11.            glog.Infof("Reconciler: start to sync state")

  12.            rc.sync()

  13.        }

  14.    }

  15. }

2. populatorLoop函数

findAndAddNewPods函数主要是调用podManager获取所有pods,调用processPodVolumes更新desiredStateOfWorld

   
     
     
   
  1. func (dswp *desiredStateOfWorldPopulator) populatorLoop() {

  2.    dswp.findAndAddNewPods()


  3.    // findAndRemoveDeletedPods() calls out to the container runtime to

  4.    // determine if the containers for a given pod are terminated. This is

  5.    // an expensive operation, therefore we limit the rate that

  6.    // findAndRemoveDeletedPods() is called independently of the main

  7.    // populator loop.

  8.    if time.Since(dswp.timeOfLastGetPodStatus) < dswp.getPodStatusRetryDuration {

  9.        glog.V(5).Infof(

  10.            "Skipping findAndRemoveDeletedPods(). Not permitted until %v (getPodStatusRetryDuration %v).",

  11.            dswp.timeOfLastGetPodStatus.Add(dswp.getPodStatusRetryDuration),

  12.            dswp.getPodStatusRetryDuration)


  13.        return

  14.    }


  15.    dswp.findAndRemoveDeletedPods()

  16. }

2.1 findAndAddNewPods函数

  • 调用podManager获取所有的pods

  • 调用processPodVolumes去更新desiredStateOfWorld,第3章节讲解

   
     
     
   
  1. // Iterate through all pods and add to desired state of world if they don't

  2. // exist but should

  3. func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {

  4.    // Map unique pod name to outer volume name to MountedVolume.

  5.    mountedVolumesForPod := make(map[volumetypes.UniquePodName]map[string]cache.MountedVolume)

  6.    if utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes) {

  7.        for _, mountedVolume := range dswp.actualStateOfWorld.GetMountedVolumes() {

  8.            mountedVolumes, exist := mountedVolumesForPod[mountedVolume.PodName]

  9.            if !exist {

  10.                mountedVolumes = make(map[string]cache.MountedVolume)

  11.                mountedVolumesForPod[mountedVolume.PodName] = mountedVolumes

  12.            }

  13.            mountedVolumes[mountedVolume.OuterVolumeSpecName] = mountedVolume

  14.        }

  15.    }


  16.    processedVolumesForFSResize := sets.NewString()

  17.    for _, pod := range dswp.podManager.GetPods() {

  18.        if dswp.isPodTerminated(pod) {

  19.            // Do not (re)add volumes for terminated pods

  20.            continue

  21.        }

  22.        dswp.processPodVolumes(pod, mountedVolumesForPod, processedVolumesForFSResize)

  23.    }

  24. }

3. processPodVolumes函数

更新desiredStateOfWorld

3.1 如果先前在processedPods map中,表示无需处理


   
     
     
   
  1.    uniquePodName := util.GetUniquePodName(pod)

  2.    if dswp.podPreviouslyProcessed(uniquePodName) {

  3.        return

  4.    }

3.2 对pod下所有container的volumeDevices与volumeMounts加入map中 

volumeDevices    <[]Object>

     volumeDevices is the list of block devices to be used by the container.

     This is an alpha feature and may change in the future.

   volumeMounts    <[]Object>

     Pod volumes to mount into the container's filesystem. Cannot be updated.


   
     
     
   
  1. func (dswp *desiredStateOfWorldPopulator) makeVolumeMap(containers []v1.Container) (map[string]bool, map[string]bool) {

  2.    volumeDevicesMap := make(map[string]bool)

  3.    volumeMountsMap := make(map[string]bool)


  4.    for _, container := range containers {

  5.        if container.VolumeMounts != nil {

  6.            for _, mount := range container.VolumeMounts {

  7.                volumeMountsMap[mount.Name] = true

  8.            }

  9.        }

  10.        // TODO: remove feature gate check after no longer needed

  11.        if utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) &&

  12.            container.VolumeDevices != nil {

  13.            for _, device := range container.VolumeDevices {

  14.                volumeDevicesMap[device.Name] = true

  15.            }

  16.        }

  17.    }


  18.    return volumeMountsMap, volumeDevicesMap

  19. }

3.3 AddPodToVolume函数

3.3.1 调用FindPluginBySpec函数根据volume.spec找到volume plugin


   
     
     
   
  1.    volumePlugin, err := dsw.volumePluginMgr.FindPluginBySpec(volumeSpec)

  2.    if err != nil || volumePlugin == nil {

  3.        return "", fmt.Errorf(

  4.            "failed to get Plugin from volumeSpec for volume %q err=%v",

  5.            volumeSpec.Name(),

  6.            err)

  7.    }

3.3.2 isAttachableVolume函数,调用NewAttacher()对plugin初始化

   
     
     
   
  1.    // The unique volume name used depends on whether the volume is attachable

  2.    // or not.

  3.    attachable := dsw.isAttachableVolume(volumeSpec)

  4.    if attachable {

  5.        // For attachable volumes, use the unique volume name as reported by

  6.        // the plugin.

  7.        volumeName, err =

  8.            util.GetUniqueVolumeNameFromSpec(volumePlugin, volumeSpec)

  9.        if err != nil {

  10.            return "", fmt.Errorf(

  11.                "failed to GetUniqueVolumeNameFromSpec for volumeSpec %q using volume plugin %q err=%v",

  12.                volumeSpec.Name(),

  13.                volumePlugin.GetPluginName(),

  14.                err)

  15.        }

  16.    } else {

  17.        // For non-attachable volumes, generate a unique name based on the pod

  18.        // namespace and name and the name of the volume within the pod.

  19.        volumeName = util.GetUniqueVolumeNameForNonAttachableVolume(podName, volumePlugin, volumeSpec)

  20.    }

3.3.3 记录volume与pod之间的关系

   
     
     
   
  1.        if _, volumeExists := dsw.volumesToMount[volumeName]; !volumeExists {

  2.        dsw.volumesToMount[volumeName] = volumeToMount{

  3.            volumeName:              volumeName,

  4.            podsToMount:             make(map[types.UniquePodName]podToMount),

  5.            pluginIsAttachable:      attachable,

  6.            pluginIsDeviceMountable: deviceMountable,

  7.            volumeGidValue:          volumeGidValue,

  8.            reportedInUse:           false,

  9.        }

  10.    }


  11.    // Create new podToMount object. If it already exists, it is refreshed with

  12.    // updated values (this is required for volumes that require remounting on

  13.    // pod update, like Downward API volumes).

  14.    dsw.volumesToMount[volumeName].podsToMount[podName] = podToMount{

  15.        podName:             podName,

  16.        pod:                 pod,

  17.        volumeSpec:          volumeSpec,

  18.        outerVolumeSpecName: outerVolumeSpecName,

  19.    }

3.3.4 对pod name标记为已处理,actual_state_of_world标记重新挂载

   
     
     
   
  1.        // some of the volume additions may have failed, should not mark this pod as fully processed

  2.    if allVolumesAdded {

  3.        dswp.markPodProcessed(uniquePodName)

  4.        // New pod has been synced. Re-mount all volumes that need it

  5.        // (e.g. DownwardAPI)

  6.        dswp.actualStateOfWorld.MarkRemountRequired(uniquePodName)

  7.    }

4. reconcile函数

路径 pkg/kubelet/volumemanager/reconciler/reconciler.go


   
     
     
   
  1. func (rc *reconciler) reconcile() {

  2.    // Unmounts are triggered before mounts so that a volume that was

  3.    // referenced by a pod that was deleted and is now referenced by another

  4.    // pod is unmounted from the first pod before being mounted to the new

  5.    // pod.

4.1 对于实际已经挂载的与预期不一样的需要unmount

UnmountVolume函数中处理,volume分为filesystem与block,在desiredStateOfWorld不包括actualStateOfWorld的情况需要unmount


   
     
     
   
  1.    // Ensure volumes that should be unmounted are unmounted.

  2.    for _, mountedVolume := range rc.actualStateOfWorld.GetMountedVolumes() {

  3.        if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {

  4.            // Volume is mounted, unmount it

  5.            glog.V(5).Infof(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", ""))

  6.            err := rc.operationExecutor.UnmountVolume(

  7.                mountedVolume.MountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)

  8.            if err != nil &&

  9.                !nestedpendingoperations.IsAlreadyExists(err) &&

  10.                !exponentialbackoff.IsExponentialBackoff(err) {

  11.                // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.

  12.                // Log all other errors.

  13.                glog.Errorf(mountedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())

  14.            }

  15.            if err == nil {

  16.                glog.Infof(mountedVolume.GenerateMsgDetailed("operationExecutor.UnmountVolume started", ""))

  17.            }

  18.        }

  19.    }

4.2 从desiredStateOfWorld中获取需要mount的volomes


调用PodExistesInVolume与实际podName进行对比


attachdVolumes不存在volumeName则为volumeNotAttachedError


mountedPods中存在podName,podObj.remountRequired为 newRemountRequiredError, podObj.fsResizeRequired为newFsResizeRequiredError


   
     
     
   
  1. // Ensure volumes that should be attached/mounted are attached/mounted.

  2.    for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {

  3.        volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)

  4.        volumeToMount.DevicePath = devicePath

4.3 err为volumeNotAttachedError的情况


如果volume没有attach,actualStateOfWorld结构体中attachedVolumes没有volumeName,在则调用AttachVolume操作

   
     
     
   
  1.       if cache.IsVolumeNotAttachedError(err) {

  2.            if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {

  3.                // Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait

  4.                // for controller to finish attaching volume.

  5.                glog.V(5).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.VerifyControllerAttachedVolume", ""))

  6.                err := rc.operationExecutor.VerifyControllerAttachedVolume(

  7.                    volumeToMount.VolumeToMount,

  8.                    rc.nodeName,

  9.                    rc.actualStateOfWorld)

  10.                if err != nil &&

  11.                    !nestedpendingoperations.IsAlreadyExists(err) &&

  12.                    !exponentialbackoff.IsExponentialBackoff(err) {

  13.                    // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.

  14.                    // Log all other errors.

  15.                    glog.Errorf(volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.VerifyControllerAttachedVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())

  16.                }

  17.                if err == nil {

  18.                    glog.Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume started", ""))

  19.                }

  20.            } else {

  21.                // Volume is not attached to node, kubelet attach is enabled, volume implements an attacher,

  22.                // so attach it

  23.                volumeToAttach := operationexecutor.VolumeToAttach{

  24.                    VolumeName: volumeToMount.VolumeName,

  25.                    VolumeSpec: volumeToMount.VolumeSpec,

  26.                    NodeName:   rc.nodeName,

  27.                }

  28.                glog.V(5).Infof(volumeToAttach.GenerateMsgDetailed("Starting operationExecutor.AttachVolume", ""))

  29.                err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)

  30.                if err != nil &&

  31.                    !nestedpendingoperations.IsAlreadyExists(err) &&

  32.                    !exponentialbackoff.IsExponentialBackoff(err) {

  33.                    // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.

  34.                    // Log all other errors.

  35.                    glog.Errorf(volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.AttachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())

  36.                }

  37.                if err == nil {

  38.                    glog.Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.AttachVolume started", ""))

  39.                }

  40.            }

  41.        }

原文链接:

https://blog.csdn.net/zhonglinzhang/article/details/82800287


推荐阅读

以上是关于kubelet启动之启动oomWatcher模块的主要内容,如果未能解决你的问题,请参考以下文章

kubelet启动错误问题汇总

kubelet启动流程解析(一)启动方式及初始化解析

k8s debug记录之kubelet user.slice container monitor failure

修改kubelet启动参数

kubelet启动流程解析(三)kubelet指令初始化

Kubelet 服务启动失败