kubebuilder笔记

Posted 南昌拌粉的成长

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kubebuilder笔记相关的知识,希望对你有一定的参考价值。

一、kubebuilder作用

  • 提供脚手架工具初始化 CRDs 工程,自动生成 boilerplate 代码和配置
  • 提供代码库封装底层的 K8s go-client

二、kubebuilder整体流程

  1. 用户自定义crd,将自定义的crd注册到scheme中,这样通过GVK能找到对应的go的struct,也能通过go的struct找对对应的GVK
  2. Cache监听Scheme中的GVK,同时和Api Server建立list-watch的连接
  3. 当发现某个controller需要的GVK资源发生状态的改变就reconcile调度对应的controller
  4. 对应的controller中是用户自己定义的逻辑,用来保证该类型的crd和k8s集群中声明的该资源的yaml/json中的字段一致
  5. 保证一致主要是通过调用Clients,然后Clients可以和Api Server交互

三、使用方法

1、初始化脚手架 这不创建了一个项目模板并引入一些依赖

kubebuilder init --domain ding.test.com

2、创建api 会在根目录下创建一个api目录 里头有: resourcename_types.go、groupversion_info.go、 zz_generated.deepcopy.go kubebuilder-process

kubebuilder create api --group ding.shin.com --version v1alpha666 --kind DingShinType

3、groupversion_info.go: 会声明 GroupVersion、SchemeBuilder、AddToScheme三个东西(也可以手动给改到别的文件引入) kubebuilder-catalog 每当再执行一遍craete api 如:

kubebuilder create api --group ding.shin.com --version v1alpha666 --kind DingShin888Type

就会多生成一个xxxxx_type.go

备注:

不同的自定义CRD需要调用 SchemeBuilder.Register 注册自己以及一个 list
之后在程序入口要将自定义的 CRD 们添加到 build-in(原生k8s的资源) 资源里面, 这样Cache就知道应该去Watch谁了
在程序的入口处,除了把自定义的 CRD 添加到 scheme 中,还需要使用 manager 初始化资源,如:

  // 1、init Manager
	mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.OptionsScheme: scheme, MetricsBindAddress: metricsAddr)
	if err != nil 
		setupLog.Error(err, "unable to start manager")
		os.Exit(1)
	
  // 2、init Reconciler(Controller)
  // 主要吧Clien Log Scheme 等依赖传进去
	err = (&controllers.ApplicationReconciler
		Client: mgr.GetClient(),
		Log:    ctrl.Log.WithName("controllers").WithName("Application"),
		Scheme: mgr.GetScheme(),
	).SetupWithManager(mgr)
	if err != nil 
		setupLog.Error(err, "unable to create controller", "controller", "EDASApplication")
		os.Exit(1)
  
  
  // 3、调用 SetupWithManager 方法初始化Reconciler
  func (r *DingShin888TypeReconciler) SetupWithManager(mgr ctrl.Manager) error 
    return ctrl.NewControllerManagedBy(mgr).
      For(&dingshincomv1alpha666.DingShin888Type).
      Complete(r)
  

也可以自定义setup方法去初始化,如: 

func VolumeMounterSetup(mgr ctrl.Manager, l logr.Logger) error 
	name := "sage/" + strings.ToLower(sagecorev1alpha2.VolumeMounterTraitGroupKind)
	return ctrl.NewControllerManagedBy(mgr).
		Named(name).
		For(&sagecorev1alpha2.VolumeMounterTrait).
		Complete(
		NewVolumeMounterTraitReconciler(mgr,
				core.WithLogger(l.WithValues("controller", name)),
				core.WithRecorder(event.NewAPIRecorder(mgr.GetEventRecorderFor(name)))))


// 里面主要得把 Scheme 以及 Client 和 Record 都传进去
func NewVolumeMounterTraitReconciler(m ctrl.Manager, o ...core.ReconcilerOption) *VolumeMounterTraitReconciler 
	it := &VolumeMounterTraitReconciler
	it.Client = m.GetClient()
	it.Scheme = m.GetScheme()
	it.Record = event.NewNopRecorder()
	for _, ro := range o 
		ro(&it.Reconciler)
	
	it.traitBuilder = sagetrait.NewVolumeMounterTraitBuilder(it.Client, it.Log)
	gc := sagegc.GarbageCollector
		Client: m.GetClient(),
		Log:    it.Log,
	
	it.GC = gc

	return it

四、启动自定义 CRD 以及 Controller 的大概流程

1、首先入口处 NewManager, NewManager 内部大概就是 NewClient 以及 NewCache

	mgr, err := ctrl.NewManager(cfg, ctrl.OptionsSyncPeriod: &options.SyncPeriod, MetricsBindAddress: options.MetricsAddr)

2、其中 NewCache 大概就是 new 了一个 informer 的 map,然后这个 map 以每个Kind,也就是每个 GVK 为 key,value 是它对应的 informer,每个 informer 都会创建一条 List Watch 和 Api Server 通信,监听对应的 Kind(GVK)

3、NewClient 就是创建了一个用于和 Api Server 通信的客户端,其中读操作直接去 Cache 中去读,写的话会间接调用 k8s 提供的 go-client

4、然后把自定义的 CRD 注册给scheme
先:

  // groupversion_info.go 下
  var (
    GroupVersion = schema.GroupVersionGroup: GROUP, Version: VERSION
  )

然后:

  // 可以在同一个package下调用它的 Register
  SchemeBuilder.Register(&ServerWorkload, &ServerWorkloadList)
	SchemeBuilder.Register(&TaskWorkload, &TaskWorkloadList)
  // ......其他 CRD

再:

  // 引入 SchemeBuilder 所在的那个 package
  import alpha666 "xxxxxx/yyyyyy/xxxxx"
  // 以及 client-go 提供的 schema,也就是哪些 build-in 的资源类型
  buildInScheme "k8s.io/client-go/kubernetes/scheme"
  scheme = runtime.NewScheme()
  _ = buildInScheme.AddToScheme(scheme)
	_ = alpha666.AddToScheme(scheme)

------------------------------------------------------------------

  // or:
  import (
    // 引入 runtime
    "k8s.io/apimachinery/pkg/runtime"
    // 引入 SchemeBuilder 所在的那个 package
    v1alpha1666 "xxxxxx/yyyyyy/xxxxx"
  )
  AddToSchemes = append(runtime.SchemeBuilder, v1alpha1666.SchemeBuilder.AddToScheme)
  
  mgr, err := ctrl.NewManager(cfg, ctrl.OptionsSyncPeriod: &options.SyncPeriod, MetricsBindAddress: options.MetricsAddr)

  s := mgr.GetScheme()
  AddToSchemes.AddToScheme(s)

5、直到调用完 AddToScheme 才算是这个 CRD(CRDS) 被注册完事儿,之后需要需要对所有的 CRD 进行Setup

func Setup(mgr ctrl.Manager, l logr.Logger) error 
	for _, setup := range []func(ctrl.Manager, logr.Logger) error
    // ...
    corecontroller.ServerSetup,
    // ...
	 
		if err := setup(mgr, l); err != nil 
			return err
		
	
	return nil
// 可以和 Reconcile 定义在一起
func ServerSetup(mgr ctrl.Manager, l logr.Logger) error 
	// ...
	return ctrl.NewControllerManagedBy(mgr).
		Named(name).
		For(&corev1alpha2.ServerWorkload).
		Complete(NewServerWorkloadReconciler(mgr,
			sagecore.WithLogger(l.WithValues("controller", name)),
			sagecore.WithRecorder(event.NewAPIRecorder(mgr.GetEventRecorderFor(name)))))

func NewServerWorkloadReconciler(m ctrl.Manager, o ...sagecore.ReconcilerOption) *ServerWorkloadReconciler 
	sr := &ServerWorkloadReconciler
	sr.Client = m.GetClient()
	sr.Scheme = m.GetScheme()
	sr.WorkloadRender = sagerender.SageRender
  sr.Record = event.NewNopRecorder()
	proxy := &sageproxy.ApplicatorProxy
		Client: m.GetClient(),
		Scheme: m.GetScheme(),
		Log:    sr.Log,
	
	sr.Proxy = proxy
  // 还可以处理一些GC 相关
	return sr

7、在 ctrl.NewControllerManagedBy 之后总会调用 complete 方法
8、complete 内部大概逻辑是,doController 方法,初始化一个 Controller, 这个 Controller 有 Cache 负责注册 Watch,Client 负责Kind 的 CUD 没有R,Queue 负责对 Watch 到的资源的事件做缓存,Recorder 负责事件收集,还有重要的 Do 这个就是自己写的那个 Reconcile 方法。
9、之后内部还会触发一个 doWatch 方法,该方法里头初始化了一个 handler,然后调用上面初始化的 Controller 的 Watch 方法,但是 这个 Watch 不是真的开始监听,而是先初始化该类型的 CRD 的 Watch,包括把 handler,src源,queue 等传进去
10、complete 到这儿基本就算是完事儿了,之后调用 manager.Start 方法

return errors.Wrap(mgr.Start(ctrl.SetupSignalHandler()), "can not start controllers")

11、这个方法中,主要逻辑就是启动 Cache 以及 Controller
12、启动 Cache 的话,是去找第2步中的那个 informerMap, 然后把每一个 informer 都run起来,run起来就算是真正开始和 Api Server 建立 List Watch 链接开始监听该 Kind 的 GVK 了
13、每当 Api Server 给发送资源改变的消息之后,会触发对应的第9步中的 handler,handler 中一共有 delete,create,update 三个方法,但是这三种方法干的事儿都是一样的,就是单纯地将对应的资源的 name 和所在的 namespace 放入 queue中(具体到底是 CUR 哪个操作需要在 Reconcile 中自己判断)
14、最后启动 Controller,Controller 中会启动一个 goroutine 的协程不停地查询 queue,如果 queue 中还有东西的话,就 Get 出来,然后会调用该 controller 上的 Do(第5步) 上的 Reconcile 方法,这个方法就是真正自己定义的那个 Reconcile
注意,每个 CRD 都有自己的一个 controller,因为在注册每个 CRD 的时候都会调用 complete 方法,doController 就是在这个 complete 中初始化的

深入解析 Kubebuilder:让编写 CRD 变得更简单

技术图片
作者 | 刘洋(炎寻) 阿里云高级开发工程师

导读:自定义资源 CRD(Custom Resource Definition)可以扩展 Kubernetes API,掌握 CRD 是成为 Kubernetes 高级玩家的必备技能,本文将介绍 CRD 和 Controller 的概念,并对 CRD 编写框架 Kubebuilder 进行深入分析,让您真正理解并能快速开发 CRD。

概览

控制器模式与声明式 API


在正式介绍 Kubebuidler 之前,我们需要先了解下 K8s 底层实现大量使用的控制器模式,以及让用户大呼过瘾的声明式 API,这是介绍 CRDs 和 Kubebuidler 的基础。

控制器模式


K8s 作为一个“容器编排”平台,其核心的功能是编排,Pod 作为 K8s 调度的最小单位,具备很多属性和字段,K8s 的编排正是通过一个个控制器根据被控制对象的属性和字段来实现。


下面我们看一个例子:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: test
spec:
  selector:
    matchLabels:
      app: test
  replicas: 2
  template:
    metadata:
      labels:
        app: test
    spec:
      containers:
      - name: nginx
        image: nginx:1.7.9
        ports:
        - containerPort: 80


K8s 集群在部署时包含了 Controllers 组件,里面对于每个 build-in 的资源类型(比如 Deployments, Statefulset, CronJob, ...)都有对应的 Controller,基本是 1:1 的关系。上面的例子中,Deployment 资源创建之后,对应的 Deployment Controller 编排动作很简单,确保携带了 app=test 的 Pod 个数永远等于 2,Pod 由 template 部分定义,具体来说,K8s 里面是 kube-controller-manager 这个组件在做这件事,可以看下 K8s 项目的 pkg/controller 目录,里面包含了所有控制器,都以独有的方式负责某种编排功能,但是它们都遵循一个通用编排模式,即:调谐循环(Reconcile loop),其伪代码逻辑为:

for 
actualState := GetResourceActualState(rsvc)
expectState := GetResourceExpectState(rsvc)
if actualState == expectState 
// do nothing
 else 
Reconcile(rsvc)


就是一个无限循环(实际是事件驱动+定时同步来实现,不是无脑循环)不断地对比期望状态和实际状态,如果有出入则进行 Reconcile(调谐)逻辑将实际状态调整为期望状态。期望状态就是我们的对象定义(通常是 YAML 文件),实际状态是集群里面当前的运行状态(通常来自于 K8s 集群内外相关资源的状态汇总),控制器的编排逻辑主要是第三步做的,这个操作被称为调谐(Reconcile),整个控制器调谐的过程称为“Reconcile Loop”,调谐的最终结果一般是对被控制对象的某种写操作,比如增/删/改 Pod。


在控制器中定义被控制对象是通过“模板”完成的,比如 Deployment 里面的 template 字段里的内容跟一个标准的 Pod 对象的 API 定义一样,所有被这个 Deployment 管理的 Pod 实例,都是根据这个 template 字段的创建的,这就是 PodTemplate,一个控制对象的定义一般是由上半部分的控制定义(期望状态),加上下半部分的被控制对象的模板组成。

声明式 API

所谓声明式就是“告诉 K8s 你要什么,而不是告诉它怎么做的命令”,一个很熟悉的例子就是 SQL,你“告诉 DB 根据条件和各类算子返回数据,而不是告诉它怎么遍历,过滤,聚合”。在 K8s 里面,声明式的体现就是 kubectl apply 命令,在对象创建和后续更新中一直使用相同的 apply 命令,告诉 K8s 对象的终态即可,底层是通过执行了一个对原有 API 对象的 PATCH 操作来实现的,可以一次性处理多个写操作,具备 Merge 能力 diff 出最终的 PATCH,而命令式一次只能处理一个写请求。
?
声明式 API 让 K8s 的“容器编排”世界看起来温柔美好,而控制器(以及容器运行时,存储,网络模型等)才是这太平盛世的幕后英雄。说到这里,就会有人希望也能像 build-in 资源一样构建自己的自定义资源(CRD-Customize Resource Definition),然后为自定义资源写一个对应的控制器,推出自己的声明式 API。K8s 提供了 CRD 的扩展方式来满足用户这一需求,而且由于这种扩展方式十分灵活,在最新的?1.15 版本对 CRD 做了相当大的增强。对于用户来说,实现 CRD 扩展主要做两件事:

  1. 编写 CRD 并将其部署到 K8s 集群里;

这一步的作用就是让 K8s 知道有这个资源及其结构属性,在用户提交该自定义资源的定义时(通常是 YAML 文件定义),K8s 能够成功校验该资源并创建出对应的 Go struct 进行持久化,同时触发控制器的调谐逻辑。

  1. 编写 Controller 并将其部署到 K8s 集群里。

这一步的作用就是实现调谐逻辑。


Kubebuilder 就是帮我们简化这两件事的工具,现在我们开始介绍主角。

Kubebuilder 是什么?

摘要

Kubebuilder?是一个使用 CRDs 构建 K8s API 的 SDK,主要是:

  • 提供脚手架工具初始化 CRDs 工程,自动生成 boilerplate 代码和配置;
  • 提供代码库封装底层的 K8s go-client;


方便用户从零开始开发 CRDs,Controllers 和 Admission Webhooks 来扩展 K8s。

核心概念

GVKs&GVRs

GVK = GroupVersionKind,GVR = GroupVersionResource。

API Group & Versions(GV)

API Group 是相关 API 功能的集合,每个 Group 拥有一或多个 Versions,用于接口的演进。

Kinds & Resources

每个 GV 都包含多个 API 类型,称为 Kinds,在不同的 Versions 之间同一个 Kind 定义可能不同, Resource 是 Kind 的对象标识(resource type),一般来说 Kinds 和 Resources 是 1:1 的,比如 pods Resource 对应 Pod Kind,但是有时候相同的 Kind 可能对应多个 Resources,比如 Scale Kind 可能对应很多 Resources:deployments/scale,replicasets/scale,对于 CRD 来说,只会是 1:1 的关系。
?
每一个 GVK 都关联着一个 package 中给定的 root Go type,比如 apps/v1/Deployment 就关联着 K8s 源码里面 k8s.io/api/apps/v1 package 中的 Deployment struct,我们提交的各类资源定义 YAML 文件都需要写:

  • apiVersion:这个就是 GV 。
  • kind:这个就是 K。


根据 GVK K8s 就能找到你到底要创建什么类型的资源,根据你定义的 Spec 创建好资源之后就成为了 Resource,也就是 GVR。GVK/GVR 就是 K8s 资源的坐标,是我们创建/删除/修改/读取资源的基础。

Scheme

每一组 Controllers 都需要一个 Scheme,提供了 Kinds 与对应 Go types 的映射,也就是说给定 Go type 就知道他的 GVK,给定 GVK 就知道他的 Go type,比如说我们给定一个 Scheme: "tutotial.kubebuilder.io/api/v1".CronJob 这个 Go type 映射到 batch.tutotial.kubebuilder.io/v1 的 CronJob GVK,那么从 Api Server 获取到下面的 JSON:



????"kind":?"CronJob",
????"apiVersion":?"batch.tutorial.kubebuilder.io/v1",
????...



就能构造出对应的 Go type了,通过这个 Go type 也能正确地获取 GVR 的一些信息,控制器可以通过该 Go type 获取到期望状态以及其他辅助信息进行调谐逻辑。

Manager

Kubebuilder 的核心组件,具有 3 个职责:

  • 负责运行所有的 Controllers;
  • 初始化共享 caches,包含 listAndWatch 功能;
  • 初始化 clients 用于与 Api Server 通信。

Cache

Kubebuilder 的核心组件,负责在 Controller 进程里面根据 Scheme 同步 Api Server 中所有该 Controller 关心 GVKs 的 GVRs,其核心是 GVK -> Informer 的映射,Informer 会负责监听对应 GVK 的 GVRs 的创建/删除/更新操作,以触发 Controller 的 Reconcile 逻辑。

Controller

Kubebuidler 为我们生成的脚手架文件,我们只需要实现 Reconcile 方法即可。

Clients

在实现 Controller 的时候不可避免地需要对某些资源类型进行创建/删除/更新,就是通过该 Clients 实现的,其中查询功能实际查询是本地的 Cache,写操作直接访问 Api Server。

Index

由于 Controller 经常要对 Cache 进行查询,Kubebuilder 提供 Index utility 给 Cache 加索引提升查询效率。

Finalizer

在一般情况下,如果资源被删除之后,我们虽然能够被触发删除事件,但是这个时候从 Cache 里面无法读取任何被删除对象的信息,这样一来,导致很多垃圾清理工作因为信息不足无法进行,K8s 的 Finalizer 字段用于处理这种情况。在 K8s 中,只要对象 ObjectMeta 里面的 Finalizers 不为空,对该对象的 delete 操作就会转变为 update 操作,具体说就是 update??deletionTimestamp 字段,其意义就是告诉 K8s 的 GC“在deletionTimestamp 这个时刻之后,只要 Finalizers 为空,就立马删除掉该对象”。


所以一般的使用姿势就是在创建对象时把 Finalizers 设置好(任意 string),然后处理 DeletionTimestamp 不为空的 update 操作(实际是 delete),根据?Finalizers 的值执行完所有的 pre-delete hook(此时可以在 Cache 里面读取到被删除对象的任何信息)之后将 Finalizers 置为空即可。

OwnerReference

K8s GC 在删除一个对象时,任何 ownerReference 是该对象的对象都会被清除,与此同时,Kubebuidler 支持所有对象的变更都会触发 Owner 对象 controller 的 Reconcile 方法。


所有概念集合在一起如图 1 所示:

技术图片

图 1-Kubebuilder 核心概念
?

Kubebuilder 怎么用?

1. 创建脚手架工程

kubebuilder init --domain edas.io

这一步创建了一个 Go module 工程,引入了必要的依赖,创建了一些模板文件。

2. 创建 API?

kubebuilder create api --group apps --version v1alpha1 --kind Application


这一步创建了对应的 CRD 和 Controller 模板文件,经过 1、2 两步,现有的工程结构如图 2 所示:

技术图片

图 2-Kubebuilder 生成的工程结构说明

3. 定义 CRD

在图 2 中对应的文件定义 Spec 和 Status。

4. 编写 Controller 逻辑

在图 3 中对应的文件实现 Reconcile 逻辑。

5. 测试发布

本地测试完之后使用 Kubebuilder 的 Makefile 构建镜像,部署我们的 CRDs 和 Controller 即可。

Kubebuilder 出现的意义?

让扩展 K8s 变得更简单,K8s 扩展的方式很多,Kubebuilder 目前专注于 CRD 扩展方式。
?

深入

在使用 Kubebuilder 的过程中有些问题困扰着我:

  • 如何同步自定义资源以及 K8s build-in 资源?
  • Controller 的 Reconcile 方法是如何被触发的?
  • Cache 的工作原理是什么?
  • ...


带着这些问题我们去看看源码 :D。

源码阅读

从 main.go 开始

Kubebuilder 创建的 main.go 是整个项目的入口,逻辑十分简单:

var (
    scheme   = runtime.NewScheme()
    setupLog = ctrl.Log.WithName("setup")
)
func init() 
    appsv1alpha1.AddToScheme(scheme)
    // +kubebuilder:scaffold:scheme

func main() 
    ...
        // 1、init Manager
    mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.OptionsScheme: scheme, MetricsBindAddress: metricsAddr)
    if err != nil 
        setupLog.Error(err, "unable to start manager")
        os.Exit(1)
    
        // 2、init Reconciler(Controller)
    err = (&controllers.ApplicationReconciler
        Client: mgr.GetClient(),
        Log:    ctrl.Log.WithName("controllers").WithName("Application"),
        Scheme: mgr.GetScheme(),
    ).SetupWithManager(mgr)
    if err != nil 
        setupLog.Error(err, "unable to create controller", "controller", "EDASApplication")
        os.Exit(1)
    
    // +kubebuilder:scaffold:builder
    setupLog.Info("starting manager")
        // 3、start Manager
    if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil 
        setupLog.Error(err, "problem running manager")
        os.Exit(1)
    


可以看到在 init 方法里面我们将 appsv1alpha1 注册到 Scheme 里面去了,这样一来 Cache 就知道 watch 谁了,main 方法里面的逻辑基本都是 Manager 的:

  1. 初始化了一个 Manager;
  2. 将 Manager 的 Client 传给 Controller,并且调用 SetupWithManager 方法传入 Manager 进行 Controller 的初始化;
  3. 启动 Manager。


我们的核心就是看这 3 个流程。

Manager 初始化

Manager 初始化代码如下:

// New returns a new Manager for creating Controllers.
func New(config *rest.Config, options Options) (Manager, error) 
    ...
    // Create the cache for the cached read client and registering informers
    cache, err := options.NewCache(config, cache.OptionsScheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace)
    if err != nil 
        return nil, err
    
    apiReader, err := client.New(config, client.OptionsScheme: options.Scheme, Mapper: mapper)
    if err != nil 
        return nil, err
    
    writeObj, err := options.NewClient(cache, config, client.OptionsScheme: options.Scheme, Mapper: mapper)
    if err != nil 
        return nil, err
    
    ...
    return &controllerManager
        config:           config,
        scheme:           options.Scheme,
        errChan:          make(chan error),
        cache:            cache,
        fieldIndexes:     cache,
        client:           writeObj,
        apiReader:        apiReader,
        recorderProvider: recorderProvider,
        resourceLock:     resourceLock,
        mapper:           mapper,
        metricsListener:  metricsListener,
        internalStop:     stop,
        internalStopper:  stop,
        port:             options.Port,
        host:             options.Host,
        leaseDuration:    *options.LeaseDuration,
        renewDeadline:    *options.RenewDeadline,
        retryPeriod:      *options.RetryPeriod,
    , nil


可以看到主要是创建 Cache 与 Clients:

创建 Cache

Cache 初始化代码如下:

// New initializes and returns a new Cache.
func New(config *rest.Config, opts Options) (Cache, error) 
    opts, err := defaultOpts(config, opts)
    if err != nil 
        return nil, err
    
    im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
    return &informerCacheInformersMap: im, nil

// newSpecificInformersMap returns a new specificInformersMap (like
// the generical InformersMap, except that it doesn't implement WaitForCacheSync).
func newSpecificInformersMap(...) *specificInformersMap 
    ip := &specificInformersMap
        Scheme:            scheme,
        mapper:            mapper,
        informersByGVK:    make(map[schema.GroupVersionKind]*MapEntry),
        codecs:            serializer.NewCodecFactory(scheme),
        resync:            resync,
        createListWatcher: createListWatcher,
        namespace:         namespace,
    
    return ip

// MapEntry contains the cached data for an Informer
type MapEntry struct 
    // Informer is the cached informer
    Informer cache.SharedIndexInformer
    // CacheReader wraps Informer and implements the CacheReader interface for a single type
    Reader CacheReader

func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) 
        ...
    // Create a new ListWatch for the obj
    return &cache.ListWatch
        ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) 
            if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot 
                return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).List(opts)
            
            return dynamicClient.Resource(mapping.Resource).List(opts)
        ,
        // Setup the watch function
        WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) 
            // Watch needs to be set to true separately
            opts.Watch = true
            if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot 
                return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).Watch(opts)
            
            return dynamicClient.Resource(mapping.Resource).Watch(opts)
        ,
    , nil


可以看到 Cache 主要就是创建了 InformersMap,Scheme 里面的每个 GVK 都创建了对应的 Informer,通过 informersByGVK 这个 map 做 GVK 到 Informer 的映射,每个 Informer 会根据 ListWatch 函数对对应的 GVK 进行 List 和 Watch。

创建 Clients

创建 Clients 很简单:

// defaultNewClient creates the default caching client
func defaultNewClient(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) 
    // Create the Client for Write operations.
    c, err := client.New(config, options)
    if err != nil 
        return nil, err
    
    return &client.DelegatingClient
        Reader: &client.DelegatingReader
            CacheReader:  cache,
            ClientReader: c,
        ,
        Writer:       c,
        StatusClient: c,
    , nil


读操作使用上面创建的 Cache,写操作使用 K8s go-client 直连。

Controller 初始化

下面看看 Controller 的启动:

func (r *EDASApplicationReconciler) SetupWithManager(mgr ctrl.Manager) error 
    err := ctrl.NewControllerManagedBy(mgr).
        For(&appsv1alpha1.EDASApplication).
        Complete(r)
return err


使用的是 Builder 模式,NewControllerManagerBy 和 For 方法都是给 Builder 传参,最重要的是最后一个方法 Complete,其逻辑是:

func (blder *Builder) Build(r reconcile.Reconciler) (manager.Manager, error) 
...
    // Set the Manager
    if err := blder.doManager(); err != nil 
        return nil, err
    
    // Set the ControllerManagedBy
    if err := blder.doController(r); err != nil 
        return nil, err
    
    // Set the Watch
    if err := blder.doWatch(); err != nil 
        return nil, err
    
...
    return blder.mgr, nil


主要是看看 doController 和 doWatch 方法:

doController 方法

func New(name string, mgr manager.Manager, options Options) (Controller, error) 
    if options.Reconciler == nil 
        return nil, fmt.Errorf("must specify Reconciler")
    
    if len(name) == 0 
        return nil, fmt.Errorf("must specify Name for Controller")
    
    if options.MaxConcurrentReconciles <= 0 
        options.MaxConcurrentReconciles = 1
    
    // Inject dependencies into Reconciler
    if err := mgr.SetFields(options.Reconciler); err != nil 
        return nil, err
    
    // Create controller with dependencies set
    c := &controller.Controller
        Do:                      options.Reconciler,
        Cache:                   mgr.GetCache(),
        Config:                  mgr.GetConfig(),
        Scheme:                  mgr.GetScheme(),
        Client:                  mgr.GetClient(),
        Recorder:                mgr.GetEventRecorderFor(name),
        Queue:                   workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
        MaxConcurrentReconciles: options.MaxConcurrentReconciles,
        Name:                    name,
    
    // Add the controller as a Manager components
    return c, mgr.Add(c)


该方法初始化了一个 Controller,传入了一些很重要的参数:

  • Do:Reconcile 逻辑;
  • Cache:找 Informer 注册 Watch;
  • Client:对 K8s 资源进行 CRUD;
  • Queue:Watch 资源的 CUD 事件缓存;
  • Recorder:事件收集。

doWatch 方法

func (blder *Builder) doWatch() error 
    // Reconcile type
    src := &source.KindType: blder.apiType
    hdler := &handler.EnqueueRequestForObject
    err := blder.ctrl.Watch(src, hdler, blder.predicates...)
    if err != nil 
        return err
    
    // Watches the managed types
    for _, obj := range blder.managedObjects 
        src := &source.KindType: obj
        hdler := &handler.EnqueueRequestForOwner
            OwnerType:    blder.apiType,
            IsController: true,
        
        if err := blder.ctrl.Watch(src, hdler, blder.predicates...); err != nil 
            return err
        
    
    // Do the watch requests
    for _, w := range blder.watchRequest 
        if err := blder.ctrl.Watch(w.src, w.eventhandler, blder.predicates...); err != nil 
            return err
        
    
    return nil


可以看到该方法对本 Controller 负责的 CRD 进行了 watch,同时底下还会 watch 本 CRD 管理的其他资源,这个 managedObjects 可以通过 Controller 初始化 Buidler 的 Owns 方法传入,说到 Watch 我们关心两个逻辑:

  1. 注册的 handler
type EnqueueRequestForObject struct
// Create implements EventHandler
func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) 
        ...
    q.Add(reconcile.RequestNamespacedName: types.NamespacedName
        Name:      evt.Meta.GetName(),
        Namespace: evt.Meta.GetNamespace(),
    )

// Update implements EventHandler
func (e *EnqueueRequestForObject) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) 
    if evt.MetaOld != nil 
        q.Add(reconcile.RequestNamespacedName: types.NamespacedName
            Name:      evt.MetaOld.GetName(),
            Namespace: evt.MetaOld.GetNamespace(),
        )
     else 
        enqueueLog.Error(nil, "UpdateEvent received with no old metadata", "event", evt)
    
    if evt.MetaNew != nil 
        q.Add(reconcile.RequestNamespacedName: types.NamespacedName
            Name:      evt.MetaNew.GetName(),
            Namespace: evt.MetaNew.GetNamespace(),
        )
     else 
        enqueueLog.Error(nil, "UpdateEvent received with no new metadata", "event", evt)
    

// Delete implements EventHandler
func (e *EnqueueRequestForObject) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) 
        ...
    q.Add(reconcile.RequestNamespacedName: types.NamespacedName
        Name:      evt.Meta.GetName(),
        Namespace: evt.Meta.GetNamespace(),
    )


可以看到 Kubebuidler 为我们注册的 Handler 就是将发生变更的对象的 NamespacedName 入队列,如果在 Reconcile 逻辑中需要判断创建/更新/删除,需要有自己的判断逻辑。

  1. 注册的流程
// Watch implements controller.Controller
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error 
    ...
    log.Info("Starting EventSource", "controller", c.Name, "source", src)
    return src.Start(evthdler, c.Queue, prct...)

// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
func (is *Informer) Start(handler handler.EventHandler, queue workqueue.RateLimitingInterface,
    ...
    is.Informer.AddEventHandler(internal.EventHandlerQueue: queue, EventHandler: handler, Predicates: prct)
    return nil


我们的 Handler 实际注册到 Informer 上面,这样整个逻辑就串起来了,通过 Cache 我们创建了所有 Scheme 里面 GVKs 的 Informers,然后对应 GVK 的 Controller 注册了 Watch Handler 到对应的 Informer,这样一来对应的 GVK 里面的资源有变更都会触发 Handler,将变更事件写到 Controller 的事件队列中,之后触发我们的 Reconcile 方法。

Manager 启动

func (cm *controllerManager) Start(stop <-chan struct) error 
    ...
    go cm.startNonLeaderElectionRunnables()
    ...

func (cm *controllerManager) startNonLeaderElectionRunnables() 
    ...
    // Start the Cache. Allow the function to start the cache to be mocked out for testing
    if cm.startCache == nil 
        cm.startCache = cm.cache.Start
    
    go func() 
        if err := cm.startCache(cm.internalStop); err != nil 
            cm.errChan <- err
        
    ()
        ...
        // Start Controllers
    for _, c := range cm.nonLeaderElectionRunnables 
        ctrl := c
        go func() 
            cm.errChan <- ctrl.Start(cm.internalStop)
        ()
    
    cm.started = true


主要就是启动 Cache,Controller,将整个事件流运转起来,我们下面来看看启动逻辑。

Cache 启动

func (ip *specificInformersMap) Start(stop <-chan struct) 
    func() 
        ...
        // Start each informer
        for _, informer := range ip.informersByGVK 
            go informer.Informer.Run(stop)
        
    ()

func (s *sharedIndexInformer) Run(stopCh <-chan struct) 
        ...
        // informer push resource obj CUD delta to this fifo queue
    fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
    cfg := &Config
        Queue:            fifo,
        ListerWatcher:    s.listerWatcher,
        ObjectType:       s.objectType,
        FullResyncPeriod: s.resyncCheckPeriod,
        RetryOnError:     false,
        ShouldResync:     s.processor.shouldResync,
                // handler to process delta
        Process: s.HandleDeltas,
    
    func() 
        s.startedLock.Lock()
        defer s.startedLock.Unlock()
                // this is internal controller process delta generate by reflector
        s.controller = New(cfg)
        s.controller.(*controller).clock = s.clock
        s.started = true
    ()
        ...
    wg.StartWithChannel(processorStopCh, s.processor.run)
    s.controller.Run(stopCh)

func (c *controller) Run(stopCh <-chan struct) 
    ...
    r := NewReflector(
        c.config.ListerWatcher,
        c.config.ObjectType,
        c.config.Queue,
        c.config.FullResyncPeriod,
    )
    ...
        // reflector is delta producer
    wg.StartWithChannel(stopCh, r.Run)
        // internal controller's processLoop is comsume logic
    wait.Until(c.processLoop, time.Second, stopCh)


Cache 的初始化核心是初始化所有的 Informer,Informer 的初始化核心是创建了 reflector 和内部 controller,reflector 负责监听 Api Server 上指定的 GVK,将变更写入 delta 队列中,可以理解为变更事件的生产者,内部 controller 是变更事件的消费者,他会负责更新本地 indexer,以及计算出 CUD 事件推给我们之前注册的 Watch Handler。

Controller 启动

// Start implements controller.Controller
func (c *Controller) Start(stop <-chan struct) error 
    ...
    for i := 0; i < c.MaxConcurrentReconciles; i++ 
        // Process work items
        go wait.Until(func() 
            for c.processNextWorkItem() 
            
        , c.JitterPeriod, stop)
    
    ...

func (c *Controller) processNextWorkItem() bool 
    ...
    obj, shutdown := c.Queue.Get()
    ...
    var req reconcile.Request
    var ok bool
    if req, ok = obj.(reconcile.Request); 
        ...
    // RunInformersAndControllers the syncHandler, passing it the namespace/Name string of the
    // resource to be synced.
    if result, err := c.Do.Reconcile(req); err != nil 
        c.Queue.AddRateLimited(req)
        ...
     
        ...


Controller 的初始化是启动 goroutine 不断地查询队列,如果有变更消息则触发到我们自定义的 Reconcile 逻辑。

整体逻辑串连


上面我们通过源码阅读已经十分清楚整个流程,但是正所谓一图胜千言,我制作了一张整体逻辑串连图(图 3)来帮助大家理解:

技术图片

图 3-Kubebuidler 整体逻辑串连图


Kubebuilder 作为脚手架工具已经为我们做了很多,到最后我们只需要实现 Reconcile 方法即可,这里不再赘述。

守得云开见月明

刚开始使用 Kubebuilder 的时候,因为封装程度很高,很多事情都是懵逼状态,剖析完之后很多问题就很明白了,比如开头提出的几个:

  • 如何同步自定义资源以及 K8s build-in 资源?

需要将自定义资源和想要 Watch 的 K8s build-in 资源的 GVKs 注册到 Scheme 上,Cache 会自动帮我们同步。

  • Controller 的 Reconcile 方法是如何被触发的?

通过 Cache 里面的 Informer 获取资源的变更事件,然后通过两个内置的 Controller 以生产者消费者模式传递事件,最终触发 Reconcile 方法。

  • Cache 的工作原理是什么?

GVK -> Informer 的映射,Informer 包含 Reflector 和 Indexer 来做事件监听和本地缓存。


还有很多问题我就不一一说了,总之,现在 Kubebuilder 现在不再是黑盒。

同类工具对比

Operator Framework?与 Kubebuilder 很类似,这里因为篇幅关系不再展开。

最佳实践

模式

  1. 使用 OwnerRefrence 来做资源关联,有两个特性:
  • Owner 资源被删除,被 Own 的资源会被级联删除,这利用了 K8s 的 GC;
  • 被 Own 的资源对象的事件变更可以触发 Owner 对象的 Reconcile 方法;
  1. 使用 Finalizer 来做资源的清理。

注意点

  • 不使用 Finalizer 时,资源被删除无法获取任何信息;
  • 对象的 Status 字段变化也会触发 Reconcile 方法;
  • Reconcile 逻辑需要幂等;

优化

使用 IndexFunc 来优化资源查询的效率
?

总结

通过深入分析,我们可以看到 Kubebuilder 提供的功能对于快速编写 CRD 和 Controller 是十分有帮助的,无论是 Istio、Knative 等知名项目还是各种自定义 Operators,都大量使用了 CRD,将各种组件抽象为 CRD,Kubernetes 变成控制面板将成为一个趋势,希望本文能够帮助大家理解和把握这个趋势。


“ 阿里巴巴云原生微信公众号(ID:Alicloudnative)关注微服务、Serverless、容器、Service Mesh等技术领域、聚焦云原生流行技术趋势、云原生大规模的落地实践,做最懂云原生开发者的技术公众号。”

以上是关于kubebuilder笔记的主要内容,如果未能解决你的问题,请参考以下文章

Kubebuilder 使用记录

9. kubebuilder 进阶: 源码分析

kubebuilder 实战之开发一个存储用户信息的 operator

深入解析 Kubebuilder:让编写 CRD 变得更简单

Kubebuilder介绍:使用CRD构建Kubernetes API的SDK

是否可以在运行时更新 zap 记录器的日志级别?