golang 将CRD与动态go客户端一起使用的示例

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了golang 将CRD与动态go客户端一起使用的示例相关的知识,希望对你有一定的参考价值。

package main

import (
	"fmt"
	"log"
	"os/user"
	"path/filepath"
	"strings"

	apixv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
	apixv1beta1client "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1beta1"
	"k8s.io/apimachinery/pkg/api/errors"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/client-go/dynamic"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"

	_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
)

var (
	runtimeClassGVR = schema.GroupVersionResource{
		Group:    "node.k8s.io",
		Version:  "v1alpha1",
		Resource: "runtimeclasses",
	}
)

func main() {
	log.Print("Loading client config")
	config, err := clientcmd.BuildConfigFromFlags("", userConfig())
	errExit("Failed to load client conifg", err)

	log.Print("Loading dynamic client")
	client, err := dynamic.NewForConfig(config)
	errExit("Failed to create client", err)

	RegisterRuntimeClassCRD(config)
	CreateSampleRuntimeClasses(client)
	PrintRuntimeHandlers(client)
}

func RegisterRuntimeClassCRD(config *rest.Config) {
	apixClient, err := apixv1beta1client.NewForConfig(config)
	errExit("Failed to load apiextensions client", err)

	crds := apixClient.CustomResourceDefinitions()

	const (
		dns1123LabelFmt        string = "[a-z0-9]([-a-z0-9]*[a-z0-9])?"
		dns1123SubdomainFmt    string = dns1123LabelFmt + "(\\." + dns1123LabelFmt + ")*"
		dns1123SubdomainRegexp string = "^" + dns1123SubdomainFmt + "$"
	)
	runtimeClassCRD := &apixv1beta1.CustomResourceDefinition{
		ObjectMeta: metav1.ObjectMeta{
			Name: "runtimeclasses.node.k8s.io",
		},
		Spec: apixv1beta1.CustomResourceDefinitionSpec{
			Group:   "node.k8s.io",
			Version: "v1alpha1",
			Versions: []apixv1beta1.CustomResourceDefinitionVersion{{
				Name:    "v1alpha1",
				Served:  true,
				Storage: true,
			}},
			Names: apixv1beta1.CustomResourceDefinitionNames{
				Plural:   "runtimeclasses",
				Singular: "runtimeclass",
				Kind:     "RuntimeClass",
			},
			Scope: apixv1beta1.ClusterScoped,
			Validation: &apixv1beta1.CustomResourceValidation{
				OpenAPIV3Schema: &apixv1beta1.JSONSchemaProps{
					Properties: map[string]apixv1beta1.JSONSchemaProps{
						"spec": {
							Properties: map[string]apixv1beta1.JSONSchemaProps{
								"runtimeHandler": {
									Type:    "string",
									Pattern: dns1123SubdomainRegexp,
								},
							},
						},
					},
				},
			},
		},
	}
	log.Print("Registering RuntimeClass CRD")
	_, err = crds.Create(runtimeClassCRD)
	if err != nil {
		if errors.IsAlreadyExists(err) {
			log.Print("RuntimeClass CRD already registered")
		} else {
			errExit("Failed to create RuntimeClass CRD", err)
		}
	}
}

func CreateSampleRuntimeClasses(client dynamic.Interface) {
	res := client.Resource(runtimeClassGVR)

	rcs := map[string]string{
		"native":  "runc",
		"sandbox": "gvisor",
		"vm":      "kata-containers",
		"foo":     "bar",
	}
	for name, handler := range rcs {
		log.Printf("Creating RuntimeClass %s", name)
		rc := NewRuntimeClass(name, handler)
		_, err := res.Create(rc, metav1.CreateOptions{})
		errExit(fmt.Sprintf("Failed to create RuntimeClass %#v", rc), err)
	}
}

func NewRuntimeClass(name, handler string) *unstructured.Unstructured {
	return &unstructured.Unstructured{
		Object: map[string]interface{}{
			"kind":       "RuntimeClass",
			"apiVersion": runtimeClassGVR.Group + "/v1alpha1",
			"metadata": map[string]interface{}{
				"name": name,
			},
			"spec": map[string]interface{}{
				"runtimeHandler": handler,
			},
		},
	}
}

func PrintRuntimeHandlers(client dynamic.Interface) {
	PrintResourceField(client, runtimeClassGVR, "spec", "runtimeHandler")
}

func PrintResourceField(client dynamic.Interface, gvr schema.GroupVersionResource, fldPath ...string) {
	rs := fmt.Sprintf("%s/%s", gvr.Group, gvr.Resource)
	log.Printf("Listing %s objects", rs)
	res := client.Resource(gvr)
	list, err := res.List(metav1.ListOptions{})
	errExit("Failed to list "+rs+" objects", err)

	log.Printf("Printing %s.%s", rs, strings.Join(fldPath, "."))
	output := make(map[string]string)
	for _, item := range list.Items {
		name := item.GetName()
		fld, exists, err := unstructured.NestedString(item.Object, fldPath...)
		if err != nil {
			log.Printf("Error reading %s for %s: %v", strings.Join(fldPath, "."), name, err)
			continue
		}
		if !exists {
			fld = "[NOT FOUND]"
		}
		output[name] = fld
	}

	for name, fld := range output {
		fmt.Printf("  %-10s  -->  %-10s\n", name, fld)
	}
}

func errExit(msg string, err error) {
	if err != nil {
		log.Fatalf("%s: %#v", msg, err)
	}
}

func userConfig() string {
	usr, err := user.Current()
	errExit("Failed to get current user", err)

	return filepath.Join(usr.HomeDir, ".kube", "config")
}

将频道与 google pubsub poll 订阅者一起使用

【中文标题】将频道与 google pubsub poll 订阅者一起使用【英文标题】:using channels with google pubsub poll subscriber 【发布时间】:2019-01-14 00:05:00 【问题描述】:

我正在尝试在 golang 中创建一个 google pubsub 订阅者,我一次接收 100 条消息,然后将它们写入 influx。我正在尝试使用频道来执行此操作:

package main

import (
    "os"
    "fmt"
    "cloud.google.com/go/pubsub"
    "log"
    "sync"
    "golang.org/x/net/context"
    "encoding/json"
    clnt "github.com/influxdata/influxdb/client/v2"
    "time"
)

type SensorData struct 
    Pressure      float64 `json:"pressure"`
    Temperature   float64 `json:"temperature"`
    Dewpoint      float64 `json:"dewpoint"`
    Timecollected int64   `json:"timecollected"`
    Latitude      float64 `json:"latitude"`
    Longitude     float64 `json:"longitude"`
    Humidity      float64 `json:"humidity"`
    SensorID      string  `json:"sensorId"`
    Zipcode       int     `json:"zipcode"`
    Warehouse     string  `json:"warehouse"`
    Area          string  `json:"area"`


type SensorPoints struct 
    SensorData      []SensorData


func main () 

    messages := make(chan SensorData, 100)

    // Create a new Influx HTTPClient
    c, err := clnt.NewHTTPClient(clnt.HTTPConfig
        Addr:     "http://localhost:8086",
        Username: "user",
        Password: "pass",
    )
    if err != nil 
        log.Fatal(err)
    


    // Create pubsub subscriber
    ctx := context.Background()
    proj := os.Getenv("GOOGLE_CLOUD_PROJECT")
    if proj == "" 
        fmt.Fprintf(os.Stderr, "GOOGLE_CLOUD_PROJECT environment variable must be set.\n")
        os.Exit(1)
    
    client, err := pubsub.NewClient(ctx, proj)
    if err != nil 
        log.Fatalf("Could not create pubsub Client: %v", err)
    
    const sub = "influxwriter"


    //create influx a blank batchpoint set
    bp, err := clnt.NewBatchPoints(clnt.BatchPointsConfig
        Database:  "sensordata",
        Precision: "s",
    )
    if err != nil 
        log.Fatal(err)
    



    // Pull messages via the subscription.
    go pullMsgs(client, sub, messages)
    if err != nil 
        log.Fatal(err)
    

    writeInflux(messages, bp)

    c.Close()




func pullMsgs(client *pubsub.Client, name string, messages chan<- SensorData) 
    ctx := context.Background()

    // [START pubsub_subscriber_async_pull]
    // [START pubsub_quickstart_subscriber]
    // Consume 10 messages.

    var mu sync.Mutex
    var sensorinfos SensorPoints
    sensorinfo := &SensorData

    received := 0
    sub := client.Subscription(name)
    cctx, _ := context.WithCancel(ctx)
    err := sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) 
        msg.Ack()

        json.Unmarshal(msg.Data, sensorinfo)

        //fmt.Println(string(msg.Data))
        //fmt.Println(sensorinfo.SensorID)

        sensorinfos.SensorData = append(sensorinfos.SensorData, *sensorinfo)

        mu.Lock()
        defer mu.Unlock()
        received++
        fmt.Println("rcv: ", received)
        messages <- *sensorinfo

    )
    if err != nil 
        fmt.Println(err)
    
    // [END pubsub_subscriber_async_pull]
    // [END pubsub_quickstart_subscriber]


func writeInflux(sensorpoints <- chan SensorData, bp clnt.BatchPoints) 

    for p := range sensorpoints 

        // Create a point and add to batch
        tags := map[string]string
            "sensorId": p.SensorID,
            "warehouse": p.Warehouse,
            "area": p.Area,
            "zipcode": string(p.Zipcode),
        

        fields := map[string]interface
            "pressure":   p.Pressure,
            "humidity": p.Humidity,
            "temperature":   p.Temperature,
            "dewpoint":   p.Dewpoint,
            "longitude":   p.Longitude,
            "latitude":   p.Latitude,
        

        pt, err := clnt.NewPoint("sensordata", tags, fields, time.Unix(p.Timecollected, 0))
        if err != nil 
            log.Fatal(err)
        
        bp.AddPoint(pt)


    



但它并没有看到每个人都能通过最初的 pullMsgs 函数,只是继续在那里打印输出:

rcv:  1
rcv:  2
rcv:  3
rcv:  4
rcv:  5
rcv:  6
rcv:  7

我认为一旦通道满了,它应该阻塞直到通道被清空

这是我用作参考的 pubsub 拉取代码。

【问题讨论】:

回复:“我认为一旦通道满了,它应该阻塞直到通道被清空”......几乎。一旦通道已满,发送操作将阻塞,直到通道未满——不必为空。那么你看到了什么,rcv: 100 它卡住了?程序是否退出?您是否将打印语句作为writeInflux 函数的第一行并确认它永远不会被调用?顺便说一句,errpullMsgs 调用之后被检查之前似乎没有更新。 @jrefior,naw,程序没有退出...随着消息数不断增加,它会继续运行rcv: n...再看一下谷歌文档(@987654321 @),看起来订阅者本身是非阻塞的,但我不确定如何真正处理它? 【参考方案1】:

当您在频道上发送了所需数量的消息后,请关闭频道并取消上下文。尝试使用the documentation 中演示的技术,在收到一定数量的消息后取消。由于您的缓冲区为 100,并且您尝试一次使用 100 条消息,所以这就是数字。如果您希望程序退出,请关闭通道,以便 writeInflux 中的 for e := range ch 循环到达停止点并且不会阻塞等待更多元素添加到通道中。

请在the Go pubsub API doc 中注意这一点:

要终止对 Receive 的调用,请取消其上下文。

这不是阻碍你的主 goroutine 的原因,但你的 pullMsgs goroutine 不会在没有取消的情况下自行退出。

另外,检查Unmarshal 上的错误。如果此时您不想在代码中处理解组错误,请考虑更改通道类型并改为发送msgmsg.Data,并在收到通道时解组。

cctx, cancel := context.WithCancel(ctx)
err := sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) 
    msg.Ack()
    err := json.Unmarshal(msg.Data, sensorinfo)
    if err != nil 
         fmt.Printf("Failed to unmarshal: %s\n", err)
    
    mu.Lock()
    defer mu.Unlock()
    received++
    fmt.Println("rcv: ", received)
    messages <- *sensorinfo
    if received == 100 
        close(messages)  // no more messages will be sent on channel
        cancel()
    

【讨论】:

所以如果我只想编程只是在消息被放入 pubsub 时永远运行我该怎么办..我试图删除 cancel() 但后来得到了这个:panic: send on closed channel 我也尝试像这样重新创建频道:` for var messages = make(chan SensorData, 100) // 通过订阅拉取消息。 pullMsgs(client, sub, messages) if err != nil log.Fatal(err) writeInflux(messages, bp) ` 但这似乎也不起作用......我如何保持这个运行?跨度>

以上是关于golang 将CRD与动态go客户端一起使用的示例的主要内容,如果未能解决你的问题,请参考以下文章

将 pubsub 与 golang 一起使用:ocgrpc.NewClientStatsHandler

GoLang接口---下

一文带您了解 Client-Go 的四种客户端

Go GRPC 入门

Golang yaml与toml解析

快速提升Golang编程能力:那就一起用Go做项目吧