kubelet启动之启动oomWatcher模块
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kubelet启动之启动oomWatcher模块相关的知识,希望对你有一定的参考价值。
参考技术A 基于 kubernetes v1.18.6oomWatcher 是一个 kubelet 子模块,在 kubelet 启动流程内启动。通过与 cAdvisor 模块交互采集主机 oom 进程信息,启动阶段如下图(红色部分)
oomWatcher 记录系统 oom 并记录到节点的 event
oomWatcher 模块在启动的时候会 fork 出两个 goroutine ,一个作为 oom 事件生产者,一个作为 oom 事件消费者。两个 goroutine 采用 channel 进行数据传输。
源码实现
cAdvisor 模块从 /dev/kmsg 读取数据解析出 oom 容器信息: pid 、 oom 时间戳、容器名称、容器内进程名称等。其中字段值如下:
过滤出 oom 信息,过滤规则:正则匹配包含 invoked oom-killer: 字符串的行。
随后利用正则匹配+字符串分割生成对应对象
cAdvisor 模块生产出的 oom 进程信息,最终被记录至节点的 event ,可通过以下方式获取相关信息。
Kubelet源码分析之volume manager源码分析
kubernetes version: v1.12.1
kubernetes采用Volume Plugins来实现存储卷的挂载等操作。
这图谁画的咋这么好呢,借来用用!!!
kubelet会调用VolumeManager,为pods准备存储设备,存储设备就绪会挂载存储设备到pod所在的节点上,并在容器启动的时候挂载在容器指定的目录中;同时,删除卸载不在使用的存储
VolumeManager接口
运行在kubelet 里让存储Ready的部件,主要是mount/unmount(attach/detach可选)
pod调度到这个node上后才会有卷的相应操作,所以它的触发端是kubelet(严格讲是kubelet里的pod manager),根据Pod Manager里pod spec里申明的存储来触发卷的挂载操作
Kubelet会监听到调度到该节点上的pod声明,会把pod缓存到Pod Manager中,VolumeManager通过Pod Manager获取PV/PVC的状态,并进行分析出具体的attach/detach、mount/umount, 操作然后调用plugin进行相应的业务处理
// VolumeManager runs a set of asynchronous loops that figure out which volumes
// need to be attached/mounted/unmounted/detached based on the pods scheduled on
// this node and makes it so.
type VolumeManager interface {
Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
WaitForAttachAndMount(pod *v1.Pod) error
GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap
GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64
GetVolumesInUse() []v1.UniqueVolumeName
ReconcilerStatesHasBeenSynced() bool
VolumeIsAttached(volumeName v1.UniqueVolumeName) bool
MarkVolumesAsReportedInUse(volumesReportedAsInUse []v1.UniqueVolumeName)
}
volumeManager结构体
路径:pkg/kubelet/volumemanager/volume_manager.go
volumeManager结构体实现了VolumeManager接口,主要有两个需要注意:
desiredStateOfWorld:预期状态,volume需要被attach,哪些pods引用这个volume
actualStateOfWorld:实际状态,volume已经被atttach哪个node,哪个pod mount volume
/ volumeManager implements the VolumeManager interface
type volumeManager struct {
// kubeClient is the kube API client used by DesiredStateOfWorldPopulator to
// communicate with the API server to fetch PV and PVC objects
kubeClient clientset.Interface
// volumePluginMgr is the volume plugin manager used to access volume
// plugins. It must be pre-initialized.
volumePluginMgr *volume.VolumePluginMgr
// desiredStateOfWorld is a data structure containing the desired state of
// the world according to the volume manager: i.e. what volumes should be
// attached and which pods are referencing the volumes).
// The data structure is populated by the desired state of the world
// populator using the kubelet pod manager.
desiredStateOfWorld cache.DesiredStateOfWorld
// actualStateOfWorld is a data structure containing the actual state of
// the world according to the manager: i.e. which volumes are attached to
// this node and what pods the volumes are mounted to.
// The data structure is populated upon successful completion of attach,
// detach, mount, and unmount actions triggered by the reconciler.
actualStateOfWorld cache.ActualStateOfWorld
// operationExecutor is used to start asynchronous attach, detach, mount,
// and unmount operations.
operationExecutor operationexecutor.OperationExecutor
// reconciler runs an asynchronous periodic loop to reconcile the
// desiredStateOfWorld with the actualStateOfWorld by triggering attach,
// detach, mount, and unmount operations using the operationExecutor.
reconciler reconciler.Reconciler
// desiredStateOfWorldPopulator runs an asynchronous periodic loop to
// populate the desiredStateOfWorld using the kubelet PodManager.
desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator
}
1. volumeManager初始化
路径: pkg/kubelet/kubelet.go文件中NewMainKubelet函数
初始化VolumeManager结构体,结构体VolumeManager如上所示:
// setup volumeManager
klet.volumeManager = volumemanager.NewVolumeManager(
kubeCfg.EnableControllerAttachDetach,
nodeName,
klet.podManager,
klet.statusManager,
klet.kubeClient,
klet.volumePluginMgr,
klet.containerRuntime,
kubeDeps.Mounter,
klet.getPodsDir(),
kubeDeps.Recorder,
experimentalCheckNodeCapabilitiesBeforeMount,
keepTerminatedPodVolumes)
1.1 启动VolumeManager的run方法
goroutine启动descriedStateOfWorldPopulator的Run函数,从apiserver同步到的pod信息,来更新DesiredStateOfWorld
goroutine启动reconciler的Run函数 ,预期状态和实际状态的协调者,负责调整实际状态至预期状态
func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
defer runtime.HandleCrash()
go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh)
glog.V(2).Infof("The desired_state_of_world populator starts")
glog.Infof("Starting Kubelet Volume Manager")
go vm.reconciler.Run(stopCh)
metrics.Register(vm.actualStateOfWorld, vm.desiredStateOfWorld, vm.volumePluginMgr)
<-stopCh
glog.Infof("Shutting down Kubelet Volume Manager")
}
1.2 desiredStateOfWorldPopulator Run函数
从apiserver同步到的pod信息,来更新DesiredStateOfWorld,主要函数为populatorLoop函数第2章节讲解
func (dswp *desiredStateOfWorldPopulator) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
// Wait for the completion of a loop that started after sources are all ready, then set hasAddedPods accordingly
glog.Infof("Desired state populator starts to run")
wait.PollUntil(dswp.loopSleepDuration, func() (bool, error) {
done := sourcesReady.AllReady()
dswp.populatorLoop()
return done, nil
}, stopCh)
dswp.hasAddedPodsLock.Lock()
dswp.hasAddedPods = true
dswp.hasAddedPodsLock.Unlock()
wait.Until(dswp.populatorLoop, dswp.loopSleepDuration, stopCh)
}
1.3 reconciler Run函数
预期状态和实际状态的协调者,负责调整实际状态至预期状态,reconcile函数第4章节讲解
func (rc *reconciler) Run(stopCh <-chan struct{}) {
wait.Until(rc.reconciliationLoopFunc(), rc.loopSleepDuration, stopCh)
}
func (rc *reconciler) reconciliationLoopFunc() func() {
return func() {
rc.reconcile()
// Sync the state with the reality once after all existing pods are added to the desired state from all sources.
// Otherwise, the reconstruct process may clean up pods' volumes that are still in use because
// desired state of world does not contain a complete list of pods.
if rc.populatorHasAddedPods() && !rc.StatesHasBeenSynced() {
glog.Infof("Reconciler: start to sync state")
rc.sync()
}
}
}
2. populatorLoop函数
findAndAddNewPods函数主要是调用podManager获取所有pods,调用processPodVolumes更新desiredStateOfWorld
func (dswp *desiredStateOfWorldPopulator) populatorLoop() {
dswp.findAndAddNewPods()
// findAndRemoveDeletedPods() calls out to the container runtime to
// determine if the containers for a given pod are terminated. This is
// an expensive operation, therefore we limit the rate that
// findAndRemoveDeletedPods() is called independently of the main
// populator loop.
if time.Since(dswp.timeOfLastGetPodStatus) < dswp.getPodStatusRetryDuration {
glog.V(5).Infof(
"Skipping findAndRemoveDeletedPods(). Not permitted until %v (getPodStatusRetryDuration %v).",
dswp.timeOfLastGetPodStatus.Add(dswp.getPodStatusRetryDuration),
dswp.getPodStatusRetryDuration)
return
}
dswp.findAndRemoveDeletedPods()
}
2.1 findAndAddNewPods函数
调用podManager获取所有的pods
调用processPodVolumes去更新desiredStateOfWorld,第3章节讲解
// Iterate through all pods and add to desired state of world if they don't
// exist but should
func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {
// Map unique pod name to outer volume name to MountedVolume.
mountedVolumesForPod := make(map[volumetypes.UniquePodName]map[string]cache.MountedVolume)
if utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes) {
for _, mountedVolume := range dswp.actualStateOfWorld.GetMountedVolumes() {
mountedVolumes, exist := mountedVolumesForPod[mountedVolume.PodName]
if !exist {
mountedVolumes = make(map[string]cache.MountedVolume)
mountedVolumesForPod[mountedVolume.PodName] = mountedVolumes
}
mountedVolumes[mountedVolume.OuterVolumeSpecName] = mountedVolume
}
}
processedVolumesForFSResize := sets.NewString()
for _, pod := range dswp.podManager.GetPods() {
if dswp.isPodTerminated(pod) {
// Do not (re)add volumes for terminated pods
continue
}
dswp.processPodVolumes(pod, mountedVolumesForPod, processedVolumesForFSResize)
}
}
3. processPodVolumes函数
更新desiredStateOfWorld
3.1 如果先前在processedPods map中,表示无需处理
uniquePodName := util.GetUniquePodName(pod)
if dswp.podPreviouslyProcessed(uniquePodName) {
return
}
3.2 对pod下所有container的volumeDevices与volumeMounts加入map中
volumeDevices <[]Object>
volumeDevices is the list of block devices to be used by the container.
This is an alpha feature and may change in the future.
volumeMounts <[]Object>
Pod volumes to mount into the container's filesystem. Cannot be updated.
func (dswp *desiredStateOfWorldPopulator) makeVolumeMap(containers []v1.Container) (map[string]bool, map[string]bool) {
volumeDevicesMap := make(map[string]bool)
volumeMountsMap := make(map[string]bool)
for _, container := range containers {
if container.VolumeMounts != nil {
for _, mount := range container.VolumeMounts {
volumeMountsMap[mount.Name] = true
}
}
// TODO: remove feature gate check after no longer needed
if utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) &&
container.VolumeDevices != nil {
for _, device := range container.VolumeDevices {
volumeDevicesMap[device.Name] = true
}
}
}
return volumeMountsMap, volumeDevicesMap
}
3.3 AddPodToVolume函数
3.3.1 调用FindPluginBySpec函数根据volume.spec找到volume plugin
volumePlugin, err := dsw.volumePluginMgr.FindPluginBySpec(volumeSpec)
if err != nil || volumePlugin == nil {
return "", fmt.Errorf(
"failed to get Plugin from volumeSpec for volume %q err=%v",
volumeSpec.Name(),
err)
}
3.3.2 isAttachableVolume函数,调用NewAttacher()对plugin初始化
// The unique volume name used depends on whether the volume is attachable
// or not.
attachable := dsw.isAttachableVolume(volumeSpec)
if attachable {
// For attachable volumes, use the unique volume name as reported by
// the plugin.
volumeName, err =
util.GetUniqueVolumeNameFromSpec(volumePlugin, volumeSpec)
if err != nil {
return "", fmt.Errorf(
"failed to GetUniqueVolumeNameFromSpec for volumeSpec %q using volume plugin %q err=%v",
volumeSpec.Name(),
volumePlugin.GetPluginName(),
err)
}
} else {
// For non-attachable volumes, generate a unique name based on the pod
// namespace and name and the name of the volume within the pod.
volumeName = util.GetUniqueVolumeNameForNonAttachableVolume(podName, volumePlugin, volumeSpec)
}
3.3.3 记录volume与pod之间的关系
if _, volumeExists := dsw.volumesToMount[volumeName]; !volumeExists {
dsw.volumesToMount[volumeName] = volumeToMount{
volumeName: volumeName,
podsToMount: make(map[types.UniquePodName]podToMount),
pluginIsAttachable: attachable,
pluginIsDeviceMountable: deviceMountable,
volumeGidValue: volumeGidValue,
reportedInUse: false,
}
}
// Create new podToMount object. If it already exists, it is refreshed with
// updated values (this is required for volumes that require remounting on
// pod update, like Downward API volumes).
dsw.volumesToMount[volumeName].podsToMount[podName] = podToMount{
podName: podName,
pod: pod,
volumeSpec: volumeSpec,
outerVolumeSpecName: outerVolumeSpecName,
}
3.3.4 对pod name标记为已处理,actual_state_of_world标记重新挂载
// some of the volume additions may have failed, should not mark this pod as fully processed
if allVolumesAdded {
dswp.markPodProcessed(uniquePodName)
// New pod has been synced. Re-mount all volumes that need it
// (e.g. DownwardAPI)
dswp.actualStateOfWorld.MarkRemountRequired(uniquePodName)
}
4. reconcile函数
路径 pkg/kubelet/volumemanager/reconciler/reconciler.go
func (rc *reconciler) reconcile() {
// Unmounts are triggered before mounts so that a volume that was
// referenced by a pod that was deleted and is now referenced by another
// pod is unmounted from the first pod before being mounted to the new
// pod.
4.1 对于实际已经挂载的与预期不一样的需要unmount
UnmountVolume函数中处理,volume分为filesystem与block,在desiredStateOfWorld不包括actualStateOfWorld的情况需要unmount
// Ensure volumes that should be unmounted are unmounted.
for _, mountedVolume := range rc.actualStateOfWorld.GetMountedVolumes() {
if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {
// Volume is mounted, unmount it
glog.V(5).Infof(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", ""))
err := rc.operationExecutor.UnmountVolume(
mountedVolume.MountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(mountedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
}
if err == nil {
glog.Infof(mountedVolume.GenerateMsgDetailed("operationExecutor.UnmountVolume started", ""))
}
}
}
4.2 从desiredStateOfWorld中获取需要mount的volomes
调用PodExistesInVolume与实际podName进行对比
attachdVolumes不存在volumeName则为volumeNotAttachedError
mountedPods中存在podName,podObj.remountRequired为 newRemountRequiredError, podObj.fsResizeRequired为newFsResizeRequiredError
// Ensure volumes that should be attached/mounted are attached/mounted.
for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
volumeToMount.DevicePath = devicePath
4.3 err为volumeNotAttachedError的情况
如果volume没有attach,actualStateOfWorld结构体中attachedVolumes没有volumeName,在则调用AttachVolume操作
if cache.IsVolumeNotAttachedError(err) {
if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
// Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait
// for controller to finish attaching volume.
glog.V(5).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.VerifyControllerAttachedVolume", ""))
err := rc.operationExecutor.VerifyControllerAttachedVolume(
volumeToMount.VolumeToMount,
rc.nodeName,
rc.actualStateOfWorld)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.VerifyControllerAttachedVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
}
if err == nil {
glog.Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume started", ""))
}
} else {
// Volume is not attached to node, kubelet attach is enabled, volume implements an attacher,
// so attach it
volumeToAttach := operationexecutor.VolumeToAttach{
VolumeName: volumeToMount.VolumeName,
VolumeSpec: volumeToMount.VolumeSpec,
NodeName: rc.nodeName,
}
glog.V(5).Infof(volumeToAttach.GenerateMsgDetailed("Starting operationExecutor.AttachVolume", ""))
err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.AttachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
}
if err == nil {
glog.Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.AttachVolume started", ""))
}
}
}
原文链接:
https://blog.csdn.net/zhonglinzhang/article/details/82800287
推荐阅读
以上是关于kubelet启动之启动oomWatcher模块的主要内容,如果未能解决你的问题,请参考以下文章