client-go gin的简单整合四-list-watch初探

Posted 对你无可奈何

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了client-go gin的简单整合四-list-watch初探相关的知识,希望对你有一定的参考价值。

背景:

完成了client-go gin的简单整合三(list列表相关再进阶关于Pods),恩如果有代理是可以看到每次的请求都要访问后端服务的,如何避免频繁调用后端apiserver呢?list-watch监听机制可以使用一下?

关于list-watch:

参照:https://blog.51cto.com/u_15127559/3377812(错别字好多?最后还是引用了沈老师的ppt上面的概念!),

  • list http短链接调用资源的api,获取列表。
  • 使用http长连接持续监听资源,有变化则返回一个WatchEvent

    client-go informer

    client-go k8s.io/client-go/tools/cache包informer对象对list-watch机制进行了封装

  • 初始化调用List api获得全量list 缓存(本地缓存)
  • 调用watch api watch资源,当资源发生变更通过一定机制维护缓存,减少访问apiserver的压力

个人觉得不错的文章Client-go源码分析之Reflector,华为云不错的视频list-watch机制原理详解,
client-go(kubernetes)的ListerWatcher解析.

client-go gin的简单整合四-list-watch

以deployment简单例子的开始

文件名 /src/service/test.go,监控deployment的变化开始其实是不是可以跟java是的弄一个单独的测试包?这里就简单操作了偷懒......

cache.NewInformer()开始:

cache.NewInformer()


goland查看源码功能:



实现Handler方法:

只实现了OnUpdate方法(仅仅打印deployment名字),OnAdd,OnDelete是空的:

type DepHandler struct 


func (d *DepHandler) OnAdd(obj interface) 
func (d *DepHandler) OnUpdate(oldObj, newObj interface) 
    if dep, ok := newObj.(*v1.Deployment); ok 
        fmt.Println(dep.Name)
    

func (d *DepHandler) OnDelete(obj interface) 

最终test.go如下:

package main

import (
    "fmt"
    "k8s-demo1/src/lib"
    v1 "k8s.io/api/apps/v1"
    "k8s.io/apimachinery/pkg/fields"
    "k8s.io/apimachinery/pkg/util/wait"
    "k8s.io/client-go/tools/cache"
)

type DepHandler struct 


func (d *DepHandler) OnAdd(obj interface) 
func (d *DepHandler) OnUpdate(oldObj, newObj interface) 
    if dep, ok := newObj.(*v1.Deployment); ok 
        fmt.Println(dep.Name)
    

func (d *DepHandler) OnDelete(obj interface) 

func main() 
    s, c := cache.NewInformer(cache.NewListWatchFromClient(lib.K8sClient.AppsV1().RESTClient(),
        "deployments", "default", fields.Everything()),
        &v1.Deployment,
        0,
        &DepHandler,
    )
    c.Run(wait.NeverStop)
    s.List()

关于s c 源码中Store, Controller

运行test.go

go run test.go
手动修改nginx deployment副本数量,查看 goland输出:

[zhangpeng@zhangpeng ~]$ kubectl get deployments
NAME    READY   UP-TO-DATE   AVAILABLE   AGE
nginx   2/2     2            2           10d
[zhangpeng@zhangpeng ~]$ kubectl get deployment
NAME    READY   UP-TO-DATE   AVAILABLE   AGE
nginx   2/2     2            2           10d
[zhangpeng@zhangpeng ~]$ kubectl scale deployment/nginx --replicas=3
deployment.apps/nginx scaled
[zhangpeng@zhangpeng ~]$ 


注:resource 是deployments,不能是deployment, *DepHandler 为什么加指针运算符?

实现一个pod的list-watch?

首先停止test.go,并注释掉test.go中代码。

创建了一个/src/service/test1.go,照着test.go来一遍:

package main

import (
    "fmt"
    "k8s-demo1/src/lib"
    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/fields"
    "k8s.io/apimachinery/pkg/util/wait"
    "k8s.io/client-go/tools/cache"
)

type PodHandler struct 


func (p *PodHandler) OnAdd(obj interface) 
func (p *PodHandler) OnUpdate(oldObj, newObj interface) 
    if pods, ok := newObj.(*corev1.Pod); ok 
        fmt.Println(pods.Name)
    

func (p *PodHandler) OnDelete(obj interface) 

func main() 
    s, c := cache.NewInformer(cache.NewListWatchFromClient(lib.K8sClient.CoreV1().RESTClient(),
        "pods", "default", fields.Everything()),
        &corev1.Pod,
        0,
        &PodHandler,
    )
    c.Run(wait.NeverStop)
    s.List()

注意:pod 的api是 corev1。参照:https://github.com/kubernetes/client-go/blob/release-1.23/informers/core/v1/pod.go#L58
运行test1.go,修改default 下nginx deployment副本数:

[zhangpeng@zhangpeng ~]$ kubectl scale deployment/nginx --replicas=3
deployment.apps/nginx scaled

SharedInformerFactory工厂模式

思考一下为什么要使用工厂模式呢?
关于SharedInformerFactory参考:https://stackoverflow.com/questions/40975307/how-to-watch-events-on-a-kubernetes-service-using-its-go-client,https://qiankunli.github.io/2020/07/20/client_go.html

先test1.go的pod开始:



src/service/test1.go

package main

import (
    "fmt"
    "k8s-demo1/src/lib"
    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/util/wait"
    "k8s.io/client-go/informers"
)

type PodHandler struct 


func (p *PodHandler) OnAdd(obj interface) 
func (p *PodHandler) OnUpdate(oldObj, newObj interface) 
    if pods, ok := newObj.(*corev1.Pod); ok 
        fmt.Println(pods.Name)
    

func (p *PodHandler) OnDelete(obj interface) 

func main() 
    factory := informers.NewSharedInformerFactory(lib.K8sClient, 0)
    podinformer := factory.Core().V1().Pods()
    podinformer.Informer().AddEventHandler(&PodHandler)
    factory.Start(wait.NeverStop)
    select 

写的时候以为直接corev1......发现是core().v1(),为什么要用select呢?参照:https://blog.csdn.net/cbmljs/article/details/93497415。阻塞,防止程序退出!运行test1.go程序,修改nginx deployment副本:

[zhangpeng@zhangpeng ~]$ kubectl scale deployment/nginx --replicas=4
deployment.apps/nginx scaled


可不可以deployment pod list-watch搞在一起呢?现在写了一个test.go,test1.go?貌似也是可以的......先搞在一起,如下:

package main

import (
    "fmt"
    "k8s-demo1/src/lib"
    v1 "k8s.io/api/apps/v1"
    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/util/wait"
    "k8s.io/client-go/informers"
)

type PodHandler struct 


func (p *PodHandler) OnAdd(obj interface) 
func (p *PodHandler) OnUpdate(oldObj, newObj interface) 
    if pods, ok := newObj.(*corev1.Pod); ok 
        fmt.Println(pods.Name)
    

func (p *PodHandler) OnDelete(obj interface) 


type DepHandler struct 


func (d *DepHandler) OnAdd(obj interface) 
func (d *DepHandler) OnUpdate(oldObj, newObj interface) 
    if dep, ok := newObj.(*v1.Deployment); ok 
        fmt.Println(dep.Name)
    

func (d *DepHandler) OnDelete(obj interface) 

func main() 
    factory := informers.NewSharedInformerFactory(lib.K8sClient, 0)
    podinformer := factory.Core().V1().Pods()
    podinformer.Informer().AddEventHandler(&PodHandler)
    depinformer := factory.Apps().V1().Deployments()
    depinformer.Informer().AddEventHandler(&DepHandler)
    factory.Start(wait.NeverStop)
    select 

运行test1.go ,更改nginx deployment副本数量,打印了deployment name 和pod name!

Handler OnAdd

补全一下OnAdd方法,打印一下pod deployment列表:

func (p *PodHandler) OnAdd(obj interface) 
    fmt.Println(obj.(*corev1.Pod).Name)

func (d *DepHandler) OnAdd(obj interface) 
    fmt.Println(obj.(*v1.Deployment).Name)
package main

import (
    "fmt"
    "k8s-demo1/src/lib"
    v1 "k8s.io/api/apps/v1"
    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/util/wait"
    "k8s.io/client-go/informers"
)

type PodHandler struct 


func (p *PodHandler) OnAdd(obj interface) 
    fmt.Println(obj.(*corev1.Pod).Name)

func (p *PodHandler) OnUpdate(oldObj, newObj interface) 
    if pods, ok := newObj.(*corev1.Pod); ok 
        fmt.Println(pods.Name)
    

func (p *PodHandler) OnDelete(obj interface) 


type DepHandler struct 


func (d *DepHandler) OnAdd(obj interface) 
    fmt.Println(obj.(*v1.Deployment).Name)

func (d *DepHandler) OnUpdate(oldObj, newObj interface) 
    if dep, ok := newObj.(*v1.Deployment); ok 
        fmt.Println(dep.Name)
    

func (d *DepHandler) OnDelete(obj interface) 

func main() 
    factory := informers.NewSharedInformerFactory(lib.K8sClient, 0)
    podinformer := factory.Core().V1().Pods()
    podinformer.Informer().AddEventHandler(&PodHandler)
    depinformer := factory.Apps().V1().Deployments()
    depinformer.Informer().AddEventHandler(&DepHandler)
    factory.Start(wait.NeverStop)
    select 

sync.Map

为什么引用sync.Map呢?Go语言sync.Map(在并发环境中使用的map),还是考虑并发原因!
拿deployment为例,打印一下develop namespace命名空间下的deployment列表:
test1.go

package main

import (
    "context"
    "fmt"
    "github.com/gin-gonic/gin"
    "k8s-demo1/src/lib"
    v1 "k8s.io/api/apps/v1"
    "k8s.io/apimachinery/pkg/util/wait"
    "k8s.io/client-go/informers"
    "log"
    "sync"
    "time"
)

type DeploymentMap struct 
    data sync.Map


func (depmap *DeploymentMap) Add(dep *v1.Deployment) 
    if list, ok := depmap.data.Load(dep.Namespace); ok 
        list = append(list.([]*v1.Deployment), dep)
        depmap.data.Store(dep.Namespace, list)
     else 
        depmap.data.Store(dep.Namespace, []*v1.Deploymentdep)
    


type DepHandler struct 


func (d *DepHandler) OnAdd(obj interface) 
    //fmt.Println(obj.(*v1.Deployment).Name)
    DepMap.Add(obj.(*v1.Deployment))

func (d *DepHandler) OnUpdate(oldObj, newObj interface) 
    if dep, ok := newObj.(*v1.Deployment); ok 
        fmt.Println(dep.Name)
    

func (d *DepHandler) OnDelete(obj interface) 


var DepMap *DeploymentMap

func init() 
    DepMap = &DeploymentMap

func main() 
    factory := informers.NewSharedInformerFactory(lib.K8sClient, 0)
    depinformer := factory.Apps().V1().Deployments()
    depinformer.Informer().AddEventHandler(&DepHandler)
    factory.Start(wait.NeverStop)
    c, _ := context.WithTimeout(context.Background(), time.Second*3)
    select 
    case <-c.Done():
        log.Fatal("time out")
    default:
        r := gin.New()
        r.GET("/", func(c *gin.Context) 
            var res []string
            DepMap.data.Range(func(key, value interface) bool 
                if key == "develop" 
                    for _, item := range value.([]*v1.Deployment) 
                        res = append(res, item.Name)
                    
                
                return true

            )
            c.JSON(200, res)

        )
        r.Run(":8080")
    

没有作具体的路由,直接测试一下访问:http://127.0.0.1:8080/

总结

  1. list-watch机制
  2. cache informer,informer工厂模式。
  3. handler实现
  4. sync.map
  5. 断言......

以上是关于client-go gin的简单整合四-list-watch初探的主要内容,如果未能解决你的问题,请参考以下文章

client-go gin的简单整合四-list-watch初探

client-go gin的简单整合五-list-watch deployment应用

client-go gin的简单整合五-list-watch deployment应用

client-go gin的简单整合二(list列表相关进一步操作)

client-go gin的简单整合三(list列表相关再进阶关于Pods)

client-go gin的简单整合三(list列表相关再进阶关于Pods)