k8s client-go源码分析 informer源码分析-初始化与启动分析

Posted 格格巫 MMQ!!

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了k8s client-go源码分析 informer源码分析-初始化与启动分析相关的知识,希望对你有一定的参考价值。

informer架构
先来回忆一下informer的架构。

k8s client-go informer主要包括以下部件:
(1)Reflector:Reflector从kube-apiserver中list&watch资源对象,然后调用DeltaFIFO的Add/Update/Delete/Replace方法将资源对象及其变化包装成Delta并将其丢到DeltaFIFO中;
(2)DeltaFIFO:DeltaFIFO中存储着一个map和一个queue,即map[object key]Deltas以及object key的queue,Deltas为Delta的切片类型,Delta装有对象及对象的变化类型(Added/Updated/Deleted/Sync) ,Reflector负责DeltaFIFO的输入,Controller负责处理DeltaFIFO的输出;
(3)Controller:Controller从DeltaFIFO的queue中pop一个object key出来,并获取其关联的 Deltas出来进行处理,遍历Deltas,根据对象的变化更新Indexer中的本地内存缓存,并通知Processor,相关对象有变化事件发生;
(4)Processor:Processor根据对象的变化事件类型,调用相应的ResourceEventHandler来处理对象的变化;
(5)Indexer:Indexer中有informer维护的指定资源对象的相对于etcd数据的一份本地内存缓存,可通过该缓存获取资源对象,以减少对apiserver、对etcd的请求压力;
(6)ResourceEventHandler:用户根据自身处理逻辑需要,注册自定义的的ResourceEventHandler,当对象发生变化时,将触发调用对应类型的ResourceEventHandler来做处理。

概述

factory := informers.NewSharedInformerFactory(client, 30*time.Second)
podInformer := factory.Core().V1().Pods()
informer := podInformer.Informer()

go factory.Start(stopper)

if !cache.WaitForCacheSync(stopper, informer.HasSynced)
runtime.HandleError(fmt.Errorf(“Timed out waiting for caches to sync”))
return


上一节有列举了informer的使用代码,注意看到示例代码中的下面这段代码,做了informer初始化与启动,其中包括:
(1)informers.NewSharedInformerFactory:初始化informer factory;
(2)podInformer.Informer:初始化pod informer;
(3)factory.Start:启动informer factory;
(4)cache.WaitForCacheSync:等待list操作获取到的对象都同步到informer本地缓存Indexer中;

下面也将根据这四部分进行informer的初始化与启动分析。

基于k8s v1.17.4版本依赖的client-go
1.SharedInformerFactory的初始化
1.1 sharedInformerFactory结构体
先来看下sharedInformerFactory结构体,看下里面有哪些属性。

看到几个比较重要的属性:
(1)client:连接k8s的clientSet;
(2)informers:是个map,可以装各个对象的informer;
(3)startedInformers:记录已经启动的informer;

// staging/src/k8s.io/client-go/informers/factory.go
type sharedInformerFactory struct
client kubernetes.Interface
namespace string
tweakListOptions internalinterfaces.TweakListOptionsFunc
lock sync.Mutex
defaultResync time.Duration
customResync map[reflect.Type]time.Duration

informers map[reflect.Type]cache.SharedIndexInformer
// startedInformers is used for tracking which informers have been started.
// This allows Start() to be called multiple times safely.
startedInformers map[reflect.Type]bool


1.2 NewSharedInformerFactory
NewSharedInformerFactory方法用于初始化informer factory,主要是初始化并返回sharedInformerFactory结构体。

// staging/src/k8s.io/client-go/informers/factory.go
func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory
return NewSharedInformerFactoryWithOptions(client, defaultResync)

func NewFilteredSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerFactory
return NewSharedInformerFactoryWithOptions(client, defaultResync, WithNamespace(namespace), WithTweakListOptions(tweakListOptions))

func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options …SharedInformerOption) SharedInformerFactory
factory := &sharedInformerFactory
client: client,
namespace: v1.NamespaceAll,
defaultResync: defaultResync,
informers: make(map[reflect.Type]cache.SharedIndexInformer),
startedInformers: make(map[reflect.Type]bool),
customResync: make(map[reflect.Type]time.Duration),

// Apply all options
for _, opt := range options 
	factory = opt(factory)


return factory


2.对象informer的初始化
上一节有列举了informer的使用代码,注意看到示例代码中的下面这段代码,这里利用了工厂方法设计模式,podInformer.Informer()即初始化了sharedInformerFactory中的pod的informer,具体调用关系可自行看如下代码,比较简单,这里不再展开分析。

// 初始化informer factory以及pod informer
factory := informers.NewSharedInformerFactory(client, 30*time.Second)
podInformer := factory.Core().V1().Pods()
informer := podInformer.Informer()

2.1 podInformer.Informer
Informer方法中调用了f.factory.InformerFor方法来做pod informer的初始化。

// k8s.io/client-go/informers/core/v1/pod.go
func (f *podInformer) Informer() cache.SharedIndexInformer
return f.factory.InformerFor(&corev1.Pod, f.defaultInformer)

2.2 f.factory.InformerFor
Informer方法中调用了f.factory.InformerFor方法来做pod informer的初始化,并传入f.defaultInformer作为newFunc,而在f.factory.InformerFor方法中,调用newFunc来初始化informer。

这里也可以看到,其实informer初始化后会存储进map f.informers[informerType]中,即存储进sharedInformerFactory结构体的informers属性中,方便共享使用。

// staging/src/k8s.io/client-go/informers/factory.go
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer
f.lock.Lock()
defer f.lock.Unlock()

informerType := reflect.TypeOf(obj)
informer, exists := f.informers[informerType]
if exists 
	return informer


resyncPeriod, exists := f.customResync[informerType]
if !exists 
	resyncPeriod = f.defaultResync


informer = newFunc(f.client, resyncPeriod)
f.informers[informerType] = informer

return informer


2.3 newFunc/f.defaultInformer
defaultInformer方法中,调用了NewFilteredPodInformer方法来初始化pod informer,最终初始化并返回sharedIndexInformer结构体。

// k8s.io/client-go/informers/core/v1/pod.go
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer
return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexerscache.NamespaceIndex: cache.MetaNamespaceIndexFunc, f.tweakListOptions)

func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer
return cache.NewSharedIndexInformer(
&cache.ListWatch
ListFunc: func(options metav1.ListOptions) (runtime.Object, error)
if tweakListOptions != nil
tweakListOptions(&options)

return client.CoreV1().Pods(namespace).List(options)
,
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error)
if tweakListOptions != nil
tweakListOptions(&options)

return client.CoreV1().Pods(namespace).Watch(options)
,
,
&corev1.Pod,
resyncPeriod,
indexers,
)

func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer
realClock := &clock.RealClock
sharedIndexInformer := &sharedIndexInformer
processor: &sharedProcessorclock: realClock,
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: objType,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf(“%T”, objType)),
clock: realClock,

return sharedIndexInformer

2.4 sharedIndexInformer结构体
sharedIndexInformer结构体中重点看到以下几个属性:
(1)indexer:对应着informer中的部件Indexer,Indexer中有informer维护的指定资源对象的相对于etcd数据的一份本地内存缓存,可通过该缓存获取资源对象,以减少对apiserver、对etcd的请求压力;
(2)controller:对应着informer中的部件Controller,Controller从DeltaFIFO中pop Deltas出来处理,根据对象的变化更新Indexer中的本地内存缓存,并通知Processor,相关对象有变化事件发生;
(3)processor:对应着informer中的部件Processor,Processor根据对象的变化事件类型,调用相应的ResourceEventHandler来处理对象的变化;

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
type sharedIndexInformer struct
indexer Indexer
controller Controller

processor             *sharedProcessor
cacheMutationDetector CacheMutationDetector

// This block is tracked to handle late initialization of the controller
listerWatcher ListerWatcher
objectType    runtime.Object

// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
// shouldResync to check if any of our listeners need a resync.
resyncCheckPeriod time.Duration
// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
// value).
defaultEventHandlerResyncPeriod time.Duration
// clock allows for testability
clock clock.Clock

started, stopped bool
startedLock      sync.Mutex

// blockDeltas gives a way to stop all event distribution so that a late event handler
// can safely join the shared informer.
blockDeltas sync.Mutex


Indexer接口与cache结构体
cache结构体为Indexer接口的实现;

// staging/src/k8s.io/client-go/tools/cache/store.go
type cache struct
cacheStorage ThreadSafeStore
keyFunc KeyFunc

threadSafeMap struct是ThreadSafeStore接口的一个实现,其最重要的一个属性便是items了,items是用map构建的键值对,资源对象都存在items这个map中,key根据资源对象来算出,value为资源对象本身,这里的items即为informer的本地缓存了,而indexers与indices属性则与索引功能有关。

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
type threadSafeMap struct
lock sync.RWMutex
items map[string]interface

// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices


关于Indexer的详细分析会在后续有专门的文章做分析,这里不展开分析;

controller结构体
而controller结构体则包含了informer中的主要部件Reflector以及DeltaFIFO;
(1)Reflector:Reflector从kube-apiserver中list&watch资源对象,然后将对象的变化包装成Delta并将其丢到DeltaFIFO中;
(2)DeltaFIFO:DeltaFIFO存储着map[object key]Deltas以及object key的queue,Delta装有对象及对象的变化类型 ,Reflector负责DeltaFIFO的输入,Controller负责处理DeltaFIFO的输出;

// staging/src/k8s.io/client-go/tools/cache/controller.go
type controller struct
config Config
reflector *Reflector
reflectorMutex sync.RWMutex
clock clock.Clock

type Config struct
// The queue for your objects; either a FIFO or
// a DeltaFIFO. Your Process() function should accept
// the output of this Queue’s Pop() method.
Queue


3.启动sharedInformerFactory
sharedInformerFactory.Start为informer factory的启动方法,其主要逻辑为循环遍历informers,然后跑goroutine调用informer.Run来启动sharedInformerFactory中存储的各个informer。

// staging/src/k8s.io/client-go/informers/factory.go
func (f *sharedInformerFactory) Start(stopCh <-chan struct)
f.lock.Lock()
defer f.lock.Unlock()

for informerType, informer := range f.informers 
	if !f.startedInformers[informerType] 
		go informer.Run(stopCh)
		f.startedInformers[informerType] = true
	


sharedIndexInformer.Run
sharedIndexInformer.Run用于启动informer,主要逻辑为:
(1)调用NewDeltaFIFO,初始化DeltaFIFO;
(2)构建Config结构体,这里留意下Process属性,赋值了s.HandleDeltas,后面会分析到该方法;
(3)调用New,利用Config结构体来初始化controller;
(4)调用s.processor.run,启动processor;
(5)调用s.controller.Run,启动controller;

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) Run(stopCh <-chan struct)
defer utilruntime.HandleCrash()

// 初始化DeltaFIFO
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

// 构建Config结构体
cfg := &Config
	Queue:            fifo,
	ListerWatcher:    s.listerWatcher,
	ObjectType:       s.objectType,
	FullResyncPeriod: s.resyncCheckPeriod,
	RetryOnError:     false,
	ShouldResync:     s.processor.shouldResync,

	Process: s.HandleDeltas,


func() 
	s.startedLock.Lock()
	defer s.startedLock.Unlock()
    // 初始化controller
	s.controller = New(cfg)
	s.controller.(*controller).clock = s.clock
	s.started = true
()

// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct)
var wg wait.Group
defer wg.Wait()              // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
// 启动processor
wg.StartWithChannel(processorStopCh, s.processor.run)

defer func() 
	s.startedLock.Lock()
	defer s.startedLock.Unlock()
	s.stopped = true // Don't want any new listeners
()
// 启动controller
s.controller.Run(stopCh)


3.1 New
New函数初始化了controller并return。

// staging/src/k8s.io/client-go/tools/cache/controller.go
func New(c *Config) Controller
ctlr := &controller
config: *c,
clock: &clock.RealClock,

return ctlr

3.2 s.processor.run
s.processor.run启动了processor,其中注意到listener.run与listener.pop两个核心方法即可,暂时没有用到,等下面用到他们的时候再做分析。

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) run(stopCh <-chan struct)
func()
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners
p.wg.Start(listener.run)
p.wg.Start(listener.pop)

p.listenersStarted = true
()
<-stopCh
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners
close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop

p.wg.Wait() // Wait for all .pop() and .run() to stop

3.3 controller.Run
controller.Run为controller的启动方法,这里主要看到几个点:
(1)调用NewReflector,初始化Reflector;
(2)调用r.Run,实际上是调用了Reflector的启动方法来启动Reflector;
(3)调用c.processLoop,开始controller的核心处理;

// k8s.io/client-go/tools/cache/controller.go
func (c *controller) Run(stopCh <-chan struct)
defer utilruntime.HandleCrash()
go func()
<-stopCh
c.config.Queue.Close()
()
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock

c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()

var wg wait.Group
defer wg.Wait()

wg.StartWithChannel(stopCh, r.Run)

wait.Until(c.processLoop, time.Second, stopCh)


3.3.1 Reflector结构体
先来看到Reflector结构体,这里重点看到以下属性:
(1)expectedType:放到Store中(即DeltaFIFO中)的对象类型;
(2)store:store会赋值为DeltaFIFO,具体可以看之前的informer初始化与启动分析即可得知,这里不再展开分析;
(3)listerWatcher:存放list方法和watch方法的ListerWatcher interface实现;

// k8s.io/client-go/tools/cache/reflector.go
type Reflector struct

expectedType reflect.Type
store Store
listerWatcher ListerWatcher


3.3.2 r.Run/Reflector.Run
Reflector.Run方法中启动了Reflector,而Reflector的核心处理逻辑为从kube-apiserver处做list&watch操作,然后将得到的对象封装存储进DeltaFIFO中。

// staging/src/k8s.io/client-go/tools/cache/reflector.go
func (r *Reflector) Run(stopCh <-chan struct)
klog.V(3).Infof(“Starting reflector %v (%s) from %s”, r.expectedTypeName, r.resyncPeriod, r.name)
wait.Until(func()
if err := r.ListAndWatch(stopCh); err != nil
utilruntime.HandleError(err)

, r.period, stopCh)

3.3.3 controller.processLoop
controller的核心处理方法processLoop中,最重要的逻辑是循环调用c.config.Queue.Pop将DeltaFIFO中的队头元素给pop出来,然后调用c.config.Process方法来做处理,当处理出错时,再调用c.config.Queue.AddIfNotPresent将对象重新加入到DeltaFIFO中去。

// k8s.io/client-go/tools/cache/controller.go
func (c *controller) processLoop()
for
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil
if err == ErrFIFOClosed
return

if c.config.RetryOnError
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)




3.3.4 c.config.Process/sharedIndexInformer.HandleDeltas
根据前面sharedIndexInformer.Run方法的分析中可以得知,c.config.Process其实就是sharedIndexInformer.HandleDeltas。

HandleDeltas方法中,将从DeltaFIFO中pop出来的对象以及类型,相应的在indexer中做添加、更新、删除操作,并调用s.processor.distribute通知自定义的ResourceEventHandler。

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface) error
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()

// from oldest to newest
for _, d := range obj.(Deltas) 
	switch d.Type 
	case Sync, Added, Updated:
		isSync := d.Type == Sync
		s.cacheMutationDetector.AddObject(d.Object)
		if old, exists, err := s.indexer.Get(d.Object); err == nil && exists 
			if err := s.indexer.Update(d.Object); err != nil 
				return err
			
			s.processor.distribute(updateNotificationoldObj: old, newObj: d.Object, isSync)
		 else 
			if err := s.indexer.Add(d.Object); err != nil 
				return err
			
			s.processor.distribute(addNotificationnewObj: d.Object, isSync)
		
	case Deleted:
		if err := s.indexer.Delete(d.Object); err != nil 
			return err
		
		s.processor.distribute(deleteNotificationoldObj: d.Object, false)
	

return nil


怎么通知到自定义的ResourceEventHandler呢?继续往下看。

3.3.5 sharedIndexInformer.processor.distribute
可以看到distribute方法最终是将构造好的addNotification、updateNotification、deleteNotification对象写入到p.addCh中。

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) distribute(obj interface, sync bool)
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()

if sync 
	for _, listener := range p.syncingListeners 
		listener.add(obj)
	
 else 
	for _, listener := range p.listeners 
		listener.add(obj)
	

func (p *processorListener) add(notification interface)
p.addCh <- notification

到这里,processor中的listener.pop以及listener.run方法终于派上了用场,继续往下看。

3.3.6 listener.pop
分析processorListener的pop方法可以得知,其逻辑实际上就是将p.addCh中的对象给拿出来,然后丢进了p.nextCh中。那么谁来处理p.nextCh呢?继续往下看。

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (p *processorListener) pop()
defer utilruntime.HandleCrash()
defer close(p.nextCh) // Tell .run() to stop

var nextCh chan<- interface
var notification interface
for 
	select 
	case nextCh <- notification:
		// Notification dispatched
		var ok bool
		notification, ok = p.pendingNotifications.ReadOne()
		if !ok  // Nothing to pop
			nextCh = nil // Disable this select case
		
	case notificationToAdd, ok := <-p.addCh:
		if !ok 
			return
		
		if notification == nil  // No notification to pop (and pendingNotifications is empty)
			// Optimize the case - skip adding to pendingNotifications
			notification = notificationToAdd
			nextCh = p.nextCh
		 else  // There is already a notification waiting to be dispatched
			p.pendingNotifications.WriteOne(notificationToAdd)
		
	


3.3.7 listener.run
在processorListener的run方法中,将循环读取p.nextCh,判断对象类型,是updateNotification则调用p.handler.OnUpdate方法,是addNotification则调用p.handler.OnAdd方法,是deleteNotification则调用p.handler.OnDelete方法做处理。

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (p *processorListener) run()
// this call blocks until the channel is closed. When a panic happens during the notification
// we will catch it, the offending item will be skipped!, and after a short delay (one second)
// the next notification will be attempted. This is usually better than the alternative of never
// delivering again.
stopCh := make(chan struct)
wait.Until(func()
// this gives us a few quick retries before a long pause and then a few more quick retries
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error)
for next := range p.nextCh
switch notification := next.(type)
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf(“unrecognized notification: %T”, next))


// the only way to get here is if the p.nextCh is empty and closed
return true, nil
)

	// the only way to get here is if the p.nextCh is empty and closed
	if err == nil 
		close(stopCh)
	
, 1*time.Minute, stopCh)

而p.handler.OnUpdate、p.handler.OnAdd、p.handler.OnDelete方法实际上就是自定义的的ResourceEventHandlerFuncs了。

informer.AddEventHandler(cache.ResourceEventHandlerFuncs
AddFunc: onAdd,
UpdateFunc: onUpdate,
DeleteFunc: onDelete,
)
// staging/src/k8s.io/client-go/tools/cache/controller.go
type ResourceEventHandlerFuncs struct
AddFunc func(obj interface)
UpdateFunc func(oldObj, newObj interface)
DeleteFunc func(obj interface)

func (r ResourceEventHandlerFuncs) OnAdd(obj interface)
if r.AddFunc != nil
r.AddFunc(obj)

func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface)
if r.UpdateFunc != nil
r.UpdateFunc(oldObj, newObj)

func (r ResourceEventHandlerFuncs) OnDelete(obj interface)
if r.DeleteFunc != nil
r.DeleteFunc(obj)


4.cache.WaitForCacheSync(stopper, informer.HasSynced)
可以看出在cache.WaitForCacheSync方法中,实际上是调用方法入参cacheSyncs …InformerSynced来判断cache是否同步完成(即调用informer.HasSynced方法),而这里说的cache同步完成,意思是等待informer从kube-apiserver同步资源完成,即informer的list操作获取的对象都存入到informer中的indexer本地缓存中;

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func WaitForCacheSync(stopCh <-chan struct, cacheSyncs …InformerSynced) bool
err := wait.PollImmediateUntil(syncedPollPeriod,
func() (bool, error)
for _, syncFunc := range cacheSyncs
if !syncFunc()
return false, nil


return true, nil
,
stopCh)
if err != nil
klog.V(2).Infof(“stop requested”)
return false

klog.V(4).Infof("caches populated")
return true


4.1 informer.HasSynced
HasSynced方法实际上是调用了sharedIndexInformer.controller.HasSynced方法;

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HasSynced() bool
s.startedLock.Lock()
defer s.startedLock.Unlock()

if s.controller == nil 
	return false

return s.controller.HasSynced()


s.controller.HasSynced
这里的c.config.Queue.HasSynced()方法,实际上是指DeltaFIFO的HasSynced方法,会在DeltaFIFO的分析中再详细分析,这里只需要知道当informer的list操作获取的对象都存入到informer中的indexer本地缓存中则返回true即可;

// staging/src/k8s.io/client-go/tools/cache/controller.go
func (c *controller) HasSynced() bool
return c.config.Queue.HasSynced()

4.2 sharedInformerFactory.WaitForCacheSync
可以顺带看下sharedInformerFactory.WaitForCacheSync方法,其实际上是遍历factory中的所有informer,调用cache.WaitForCacheSync,然后传入每个informer的HasSynced方法作为入参;

// staging/src/k8s.io/client-go/informers/factory.go
func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct) map[reflect.Type]bool
informers := func() map[reflect.Type]cache.SharedIndexInformer
f.lock.Lock()
defer f.lock.Unlock()

	informers := map[reflect.Type]cache.SharedIndexInformer
	for informerType, informer := range f.informers 
		if f.startedInformers[informerType] 
			informers[informerType] = informer
		
	
	return informers
()

res := map[reflect.Type]bool
for informType, informer := range informers 
	res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)

return res


至此,整个informer的初始化与启动的分析就结束了,后面会对informer中的各个核心部件进行详细分析,敬请期待。

总结
下面用两张图片总结一下informer的初始化与启动;

informer初始化

informer启动

以上是关于k8s client-go源码分析 informer源码分析-初始化与启动分析的主要内容,如果未能解决你的问题,请参考以下文章

k8s源码Client-go中Reflector解析

client-go 源码分析

《k8s 源码分析》- Custom Controller 之 Informer

linux常用命令

client-go系列之5---Informer

Kubernetes client-go DeltaFIFO 源码分析