9. kubebuilder 进阶: 源码分析

Posted mohuishou

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了9. kubebuilder 进阶: 源码分析相关的知识,希望对你有一定的参考价值。

在前面的文章当中我们已经完整的完成了一个 Operator 的开发,涉及到了 CURD、预删除、Status、Event、OwnerReference、WebHook,也算是将一个 Operator 开发中会涉及到的点大部分都了解了一下。kubebuilder 帮我们做了很多事情,让我们的开发基本上只需要关注一个 Reconcile 函数就可以了,但是从另外一个方面来讲,kubebuilder 目前对我们来说它还是一个黑盒,会产生很多的疑问:

  • Reconcile 方法是怎么被触发的?
  • 怎么识别到不同的资源?
  • 整体是如何进行工作的?
  • ......

架构

我们先来看一下来自官方文档的这个架构图[^1]

arch
  • Process 进程通过 main.go启动,一般来说一个 Controller 只有一个进程,如果做了高可用的话,会有多个
  • Manager 每个进程会有一个 Manager,这是核心组件,主要负责
    • metrics 的暴露
    • webhook 证书
    • 初始化共享的 cache
    • 初始化共享的 clients 用于和 APIServer 进行通信
    • 所有的  Controller 的运行
  • Client 一般来说,我们 创建、更新、删除某个资源的时候会直接调用 Client 和 APIServer 进行通信
  • Cache 负责同步 Controller 关心的资源,其核心是 GVK -> Informer 的映射,一般我们的 Get 和 List 操作都会从 Cache 中获取数据
  • Controller 控制器的业务逻辑所在的地方,一个 Manager 可能会有多个 Controller,我们一般只需要实现 Reconcile 方法就行。图上的 Predicate 是事件过滤器,我们可以在 Controller 中过滤掉我们不关心的事件信息
  • WebHook 就是我们准入控制实现的地方了,主要是有两类接口,一个是 MutatingAdmissionWebhook 需要实现  Defaulter 接口,一个是 ValidatingAdmissionWebhook 需要实现 Validator 接口

源码分析

了解了基本的架构之后,我们就从入口 main.go 开始,看一看 kubebuilder 究竟在后面偷偷的做了哪些事情吧。

main.go

 // 省略了参数绑定和 error check 的代码
func main() {
 var metricsAddr string
 var enableLeaderElection bool
 var probeAddr string

 ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))

 mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
  Scheme:                 scheme,
  MetricsBindAddress:     metricsAddr,
  Port:                   9443,
  HealthProbeBindAddress: probeAddr,
  LeaderElection:         enableLeaderElection,
  LeaderElectionID:       "97acaccf.lailin.xyz",
  // CertDir:                "config/cert/", // 手动指定证书位置用于测试
 })
 

 (&controllers.NodePoolReconciler{
  Client:   mgr.GetClient(),
  Log:      ctrl.Log.WithName("controllers").WithName("NodePool"),
  Scheme:   mgr.GetScheme(),
  Recorder: mgr.GetEventRecorderFor("NodePool"),
 }).SetupWithManager(mgr)

 (&nodesv1.NodePool{}).SetupWebhookWithManager(mgr)
  
 //+kubebuilder:scaffold:builder

 mgr.AddHealthzCheck("healthz", healthz.Ping)
 mgr.AddReadyzCheck("readyz", healthz.Ping)

 setupLog.Info("starting manager")
 mgr.Start(ctrl.SetupSignalHandler())
}

可以看到 main.go 主要是做了一些启动的工作包括:

  • 创建一个 Manager
  • 使用刚刚创建的 Manager 创建了一个 Controller
  • 启动 WebHook
  • 添加健康检查
  • 启动 Manager

下面我们就顺着  main 函数里面的逻辑一步步的往下看看

NewManger

// New returns a new Manager for creating Controllers.
func New(config *rest.Config, options Options) (Manager, error) {
 // 省略配置初始化相关代码

 // 创建 cache
 cache, err := options.NewCache(config, 
                                 cache.Options{
                                   Scheme: options.Scheme, // main 中传入的 scheme
                                   Mapper: mapper,         // k8s api 和 go type 的转换器
                                   Resync: options.SyncPeriod, // 默认 10 小时,一般不要改
                                   Namespace: options.Namespace, // 需要监听的 namespace
                                 })

  // 创建和 APIServer 交互的 client,读写分离
 clientOptions := client.Options{Scheme: options.Scheme, Mapper: mapper}
 apiReader, err := client.New(config, clientOptions)


 writeObj, err := options.ClientBuilder.
  WithUncached(options.ClientDisableCacheFor...).
  Build(cache, config, clientOptions)

 if options.DryRunClient {
  writeObj = client.NewDryRunClient(writeObj)
 }

 // 创建事件记录器
 recorderProvider, err := options.newRecorderProvider(config, options.Scheme, options.Logger.WithName("events"), options.makeBroadcaster)

 // 需要需要高可用的话,创建选举相关的配置
 leaderConfig := config
 if options.LeaderElectionConfig != nil {
  leaderConfig = options.LeaderElectionConfig
 }
 resourceLock, err := options.newResourceLock(leaderConfig, recorderProvider, leaderelection.Options{
  LeaderElection:             options.LeaderElection,
  LeaderElectionResourceLock: options.LeaderElectionResourceLock,
  LeaderElectionID:           options.LeaderElectionID,
  LeaderElectionNamespace:    options.LeaderElectionNamespace,
 })

 // 创建 metric 和 健康检查的接口
 metricsListener, err := options.newMetricsListener(options.MetricsBindAddress)

 // By default we have no extra endpoints to expose on metrics http server.
 metricsExtraHandlers := make(map[string]http.Handler)

 // Create health probes listener. This will throw an error if the bind
 // address is invalid or already in use.
 healthProbeListener, err := options.newHealthProbeListener(options.HealthProbeBindAddress)
 if err != nil {
  return nil, err
 }

  // 最后将这些配置放到 manager 中
 return &controllerManager{
  config:                  config,
  scheme:                  options.Scheme,
  cache:                   cache,
  fieldIndexes:            cache,
  client:                  writeObj,
  apiReader:               apiReader,
  recorderProvider:        recorderProvider,
  resourceLock:            resourceLock,
  mapper:                  mapper,
  metricsListener:         metricsListener,
  metricsExtraHandlers:    metricsExtraHandlers,
  logger:                  options.Logger,
  elected:                 make(chan struct{}),
  port:                    options.Port,
  host:                    options.Host,
  certDir:                 options.CertDir,
  leaseDuration:           *options.LeaseDuration,
  renewDeadline:           *options.RenewDeadline,
  retryPeriod:             *options.RetryPeriod,
  healthProbeListener:     healthProbeListener,
  readinessEndpointName:   options.ReadinessEndpointName,
  livenessEndpointName:    options.LivenessEndpointName,
  gracefulShutdownTimeout: *options.GracefulShutdownTimeout,
  internalProceduresStop:  make(chan struct{}),
 }, nil
}

创建 Cache

func New(config *rest.Config, opts Options) (Cache, error) {
 opts, err := defaultOpts(config, opts)
 if err != nil {
  return nil, err
 }
 im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
 return &informerCache{InformersMap: im}, nil
}

这里主要是调用 NewInformersMap方法创建 Informer 的映射

func NewInformersMap(config *rest.Config,
 scheme *runtime.Scheme,
 mapper meta.RESTMapper,
 resync time.Duration,
 namespace string)
 *InformersMap
 {

 return &InformersMap{
  structured:   newStructuredInformersMap(config, scheme, mapper, resync, namespace),
  unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace),
  metadata:     newMetadataInformersMap(config, scheme, mapper, resync, namespace),

  Scheme: scheme,
 }
}

NewInformersMap会去分别创建,结构化、非结构化以及 metadata 的 InformerMap 而这些方法最后都会去调用 newSpecificInformersMap方法,区别就是不同的方法传入的 createListWatcherFunc 参数不同

func newSpecificInformersMap(config *rest.Config,
 scheme *runtime.Scheme,
 mapper meta.RESTMapper,
 resync time.Duration,
 namespace string,
 createListWatcher createListWatcherFunc)
 *specificInformersMap
 {
 ip := &specificInformersMap{
  config:            config,
  Scheme:            scheme,
  mapper:            mapper,
  informersByGVK:    make(map[schema.GroupVersionKind]*MapEntry),
  codecs:            serializer.NewCodecFactory(scheme),
  paramCodec:        runtime.NewParameterCodec(scheme),
  resync:            resync,
  startWait:         make(chan struct{}),
  createListWatcher: createListWatcher,
  namespace:         namespace,
 }
 return ip
}

newSpecificInformersMap 和常规的  InformersMap 类似,区别是没实现 WaitForCacheSync方法

以结构化的传入的 createStructuredListWatch 为例,主要是返回一个用于创建 SharedIndexInformer 的 ListWatch 对象

func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
 // Kubernetes APIs work against Resources, not GroupVersionKinds.  Map the
 // groupVersionKind to the Resource API we will use.
 mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
 if err != nil {
  return nil, err
 }

 client, err := apiutil.RESTClientForGVK(gvk, false, ip.config, ip.codecs)
 if err != nil {
  return nil, err
 }
 listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List")
 listObj, err := ip.Scheme.New(listGVK)
 if err != nil {
  return nil, err
 }

 // TODO: the functions that make use of this ListWatch should be adapted to
 //  pass in their own contexts instead of relying on this fixed one here.
 ctx := context.TODO()
 // Create a new ListWatch for the obj
 return &cache.ListWatch{
  ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
   res := listObj.DeepCopyObject()
   isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
   err := client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do(ctx).Into(res)
   return res, err
  },
  // Setup the watch function
  WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
   // Watch needs to be set to true separately
   opts.Watch = true
   isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
   return client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch(ctx)
  },
 }, nil
}

小结: cache 主要是创建了一些 InformerMap,完成了 GVK 到 Informer 的映射,每个 Informer 会根据 ListWatch 函数对对应的 GVK 进行 List 和 Watch。

创建 Client

func New(config *rest.Config, options Options) (Client, error) {
 if config == nil {
  return nil, fmt.Errorf("must provide non-nil rest.Config to client.New")
 }

 // Init a scheme if none provided
 if options.Scheme == nil {
  options.Scheme = scheme.Scheme
 }

 // Init a Mapper if none provided
 if options.Mapper == nil {
  var err error
  options.Mapper, err = apiutil.NewDynamicRESTMapper(config)
  if err != nil {
   return nil, err
  }
 }

 clientcache := &clientCache{
  config: config,
  scheme: options.Scheme,
  mapper: options.Mapper,
  codecs: serializer.NewCodecFactory(options.Scheme),

  structuredResourceByType:   make(map[schema.GroupVersionKind]*resourceMeta),
  unstructuredResourceByType: make(map[schema.GroupVersionKind]*resourceMeta),
 }

 rawMetaClient, err := metadata.NewForConfig(config)
 if err != nil {
  return nil, fmt.Errorf("unable to construct metadata-only client for use as part of client: %w", err)
 }

 c := &client{
  typedClient: typedClient{
   cache:      clientcache,
   paramCodec: runtime.NewParameterCodec(options.Scheme),
  },
  unstructuredClient: unstructuredClient{
   cache:      clientcache,
   paramCodec: noConversionParamCodec{},
  },
  metadataClient: metadataClient{
   client:     rawMetaClient,
   restMapper: options.Mapper,
  },
  scheme: options.Scheme,
  mapper: options.Mapper,
 }

 return c, nil
}

client 创建了两个一个用于读,一个用于写,用于读的会直接使用上面的 cache,用于写的才会直接和 APIServer 进行交互

Controller

下面我们看一下核心的 Controller 是怎么初始化和工作的

if err = (&controllers.NodePoolReconciler{
  Client:   mgr.GetClient(),
  Log:      ctrl.Log.WithName("controllers").WithName("NodePool"),
  Scheme:   mgr.GetScheme(),
  Recorder: mgr.GetEventRecorderFor("NodePool"),
}).SetupWithManager(mgr); err != nil {
  setupLog.Error(err, "unable to create controller""controller""NodePool")
  os.Exit(1)
}

main.go 的方法里面主要是初始化了 Controller 的结构体,然后调用了 SetupWithManager方法

// SetupWithManager sets up the controller with the Manager.
func (r *NodePoolReconciler) SetupWithManager(mgr ctrl.Manager) error {
 return ctrl.NewControllerManagedBy(mgr).
  For(&nodesv1.NodePool{}).
  Watches(&source.Kind{Type: &corev1.Node{}}, handler.Funcs{UpdateFunc: r.nodeUpdateHandler}).
  Complete(r)
}

SetupWithManager之前有讲到过,主要是使用了建造者模式,去构建了我们需要监听的对象,只有这些对象的相关事件才会触发我们的 Reconcile 逻辑。这里面的 Complete 最后其实是调用了 Build 方法

func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) {
 // 省略参数校验

 // Set the Config
 blder.loadRestConfig()

 // Set the ControllerManagedBy
 if err := blder.doController(r); err != nil {
  return nil, err
 }

 // Set the Watch
 if err := blder.doWatch(); err != nil {
  return nil, err
 }

 return blder.ctrl, nil
}

Build主要调用 doControllerdoWatch两个方法

func (blder *Builder) doController(r reconcile.Reconciler) error {
 ctrlOptions := blder.ctrlOptions
 if ctrlOptions.Reconciler == nil {
  ctrlOptions.Reconciler = r
 }

 // Retrieve the GVK from the object we're reconciling
 // to prepopulate logger information, and to optionally generate a default name.
 gvk, err := getGvk(blder.forInput.object, blder.mgr.GetScheme())
 if err != nil {
  return err
 }

 // Setup the logger.
 if ctrlOptions.Log == nil {
  ctrlOptions.Log = blder.mgr.GetLogger()
 }
 ctrlOptions.Log = ctrlOptions.Log.WithValues("reconciler group", gvk.Group, "reconciler kind", gvk.Kind)

 // Build the controller and return.
 blder.ctrl, err = newController(blder.getControllerName(gvk), blder.mgr, ctrlOptions)
 return err
}

doController主要是初始化了一个 Controller,这里面传入了我们实现 的Reconciler以及获取到我们的 GVK 的名称

func (blder *Builder) doWatch() error {
 // Reconcile type
 typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
 if err != nil {
  return err
 }
 src := &source.Kind{Type: typeForSrc}
 hdler := &handler.EnqueueRequestForObject{}
 allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
 if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
  return err
 }

 // Watches the managed types
 for _, own := range blder.ownsInput {
  typeForSrc, err := blder.project(own.object, own.objectProjection)
  if err != nil {
   return err
  }
  src := &source.Kind{Type: typeForSrc}
  hdler := &handler.EnqueueRequestForOwner{
   OwnerType:    blder.forInput.object,
   IsController: true,
  }
  allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
  allPredicates = append(allPredicates, own.predicates...)
  if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
   return err
  }
 }

 // Do the watch requests
 for _, w := range blder.watchesInput {
  allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
  allPredicates = append(allPredicates, w.predicates...)

  // If the source of this watch is of type *source.Kind, project it.
  if srckind, ok := w.src.(*source.Kind); ok {
   typeForSrc, err := blder.project(srckind.Type, w.objectProjection)
   if err != nil {
    return err
   }
   srckind.Type = typeForSrc
  }

  if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil {
   return err
  }
 }
 return nil
}

Watch 主要是监听我们想要的资源变化,blder.ctrl.Watch(src, hdler, allPredicates...)通过过滤源事件的变化,allPredicates是过滤器,只有所有的过滤器都返回 true 时,才会将事件传递给 EventHandler hdler,这里会将 Handler 注册到 Informer 上

启动

func (cm *controllerManager) Start(ctx context.Context) (err error) {
 cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)

 // 这个用来表示所有的协程都已经退出了,
 stopComplete := make(chan struct{})
 defer close(stopComplete)
 
  // ......

 // 用于保存错误
 cm.errChan = make(chan error)

 // 如果需要 metric 就启动 metric 服务
 if cm.metricsListener != nil {
  go cm.serveMetrics()
 }

 // 启动健康检查服务
 if cm.healthProbeListener != nil {
  go cm.serveHealthProbes()
 }

  
 go cm.startNonLeaderElectionRunnables()

 go func() {
  if cm.resourceLock != nil {
   err := cm.startLeaderElection()
   if err != nil {
    cm.errChan <- err
   }
  } else {
   // Treat not having leader election enabled the same as being elected.
   close(cm.elected)
   go cm.startLeaderElectionRunnables()
  }
 }()

  // 判断是否需要退出
 select {
 case <-ctx.Done():
  // We are done
  return nil
 case err := <-cm.errChan:
  // Error starting or running a runnable
  return err
 }
}

无论是不是 leader 最后都会使用 startRunnable 启动 Controller

func (cm *controllerManager) startNonLeaderElectionRunnables() {
 cm.mu.Lock()
 defer cm.mu.Unlock()

 cm.waitForCache(cm.internalCtx)

 // Start the non-leaderelection Runnables after the cache has synced
 for _, c := range cm.nonLeaderElectionRunnables {
  // Controllers block, but we want to return an error if any have an error starting.
  // Write any Start errors to a channel so we can return them
  cm.startRunnable(c)
 }
}

实际上是调用了 Controller 的 Start方法

// Start implements controller.Controller
func (c *Controller) Start(ctx context.Context) error {

  // Controller 只能被执行一次
 c.mu.Lock()
 if c.Started {
  return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times")
 }

 // Set the internal context.
 c.ctx = ctx

  // 获取队列
 c.Queue = c.MakeQueue()
 defer c.Queue.ShutDown()

 err := func() error {
  defer c.mu.Unlock()

  defer utilruntime.HandleCrash()

  // 尝试等待缓存
  for _, watch := range c.startWatches {
   c.Log.Info("Starting EventSource""source", watch.src)
   if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
    return err
   }
  }

  // 启动 Controller
  c.Log.Info("Starting Controller")

    
  for _, watch := range c.startWatches {
   syncingSource, ok := watch.src.(source.SyncingSource)
   if !ok {
    continue
   }
   if err := syncingSource.WaitForSync(ctx); err != nil {
    // This code is unreachable in case of kube watches since WaitForCacheSync will never return an error
    // Leaving it here because that could happen in the future
    err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
    c.Log.Error(err, "Could not wait for Cache to sync")
    return err
   }
  }

  // All the watches have been started, we can reset the local slice.
  //
  // We should never hold watches more than necessary, each watch source can hold a backing cache,
  // which won't be garbage collected if we hold a reference to it.
  c.startWatches = nil

  if c.JitterPeriod == 0 {
   c.JitterPeriod = 1 * time.Second
  }

  // Launch workers to process resources
  c.Log.Info("Starting workers""worker count", c.MaxConcurrentReconciles)
  ctrlmetrics.WorkerCount.WithLabelValues(c.Name).
              Set(float64(c.MaxConcurrentReconciles))
  for i := 0; i < c.MaxConcurrentReconciles; i++ {
   go wait.UntilWithContext(ctx, func(ctx context.Context) {
    // 查询队列中有没有关注的事件,有的话就触发我们的 reconcile 逻辑
    for c.processNextWorkItem(ctx) {
    }
   }, c.JitterPeriod)
  }

  c.Started = true
  return nil
 }()
 if err != nil {
  return err
 }

 <-ctx.Done()
 c.Log.Info("Stopping workers")
 return nil
}

// attempt to process it, by calling the reconcileHandler.
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
 obj, shutdown := c.Queue.Get()
 if shutdown {
  // Stop working
  return false
 }

 // We call Done here so the workqueue knows we have finished
 // processing this item. We also must remember to call Forget if we
 // do not want this work item being re-queued. For example, we do
 // not call Forget if a transient error occurs, instead the item is
 // put back on the workqueue and attempted again after a back-off
 // period.
 defer c.Queue.Done(obj)

 ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1)
 defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1)

 c.reconcileHandler(ctx, obj)
 return true
}

总结

Reconcile 方法的触发是通过 Cache 中的 Informer 获取到资源的变更事件,然后再通过生产者消费者的模式触发我们自己实现的 Reconcile 方法的。

Kubebuilder 是一个非常好用的 Operator 开发框架,不仅极大的简化了 Operator 的开发过程,并且充分的利用了 go interface 的特性留下了足够的扩展性,这个我们可以学习,如果我们的业务代码开发框架能够做到这个地步,我觉得也就不错了。

参考文献

[^1]: 架构图 https://master.book.kubebuilder.io/architecture.html