kubeedge源码分析

Posted Kris_u

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kubeedge源码分析相关的知识,希望对你有一定的参考价值。

kubeedge的核心理念:

优势:

  • Kubernetes 原生支持:使用 KubeEdge 用户可以在边缘节点上编排应用、管理设备并监控应用程序/设备状态,就如同在云端操作 Kubernetes 集群一样。

  • 云边可靠协作:在不稳定的云边网络上,可以保证消息传递的可靠性,不会丢失。

  • 边缘自治:当云边之间的网络不稳定或者边缘端离线或重启时,确保边缘节点可以自主运行,同时确保边缘端的应用正常运行。

  • 边缘设备管理:通过 Kubernetes 的原生API,并由CRD来管理边缘设备。

  • 极致轻量的边缘代理:在资源有限的边缘端上运行的非常轻量级的边缘代理(EdgeCore)。

kubeedge 架构:

 

 CSI是Container Storage Interface(容器存储接口)的简写.

CSI Driver from KubeEdge: this is more like CSI Driver proxy,
and it implements all of the CSI Identity and Controller interfaces.
It sends messages to CloudHub which will forward to edge. Actually all of the actions
about the Volume Lifecycle are executed in the CSI Driver from Vendor at edge

1.CSI规范:

https://github.com/container-storage-interface/spec

KubeEdge 云端组件:

EdgeController 结构体: 

// EdgeController use beehive context message layer
type EdgeController struct 
	config     v1alpha1.EdgeController
	upstream   *controller.UpstreamController
	downstream *controller.DownstreamController

Start EdgeController :

func (ec *EdgeController) Start() 
	if err := ec.upstream.Start(); err != nil 
		klog.Exitf("start upstream failed with error: %s", err)
	

	if err := ec.downstream.Start(); err != nil 
		klog.Exitf("start downstream failed with error: %s", err)
	

// Start UpstreamController 

func (uc *UpstreamController) Start() error 
	klog.Info("start upstream controller")

	go uc.dispatchMessage()

	for i := 0; i < int(uc.config.Load.UpdateNodeStatusWorkers); i++ 
		go uc.updateNodeStatus()
	
	for i := 0; i < int(uc.config.Load.UpdatePodStatusWorkers); i++ 
		go uc.updatePodStatus()
	
	for i := 0; i < int(uc.config.Load.QueryConfigMapWorkers); i++ 
		go uc.queryConfigMap()
	
	for i := 0; i < int(uc.config.Load.QuerySecretWorkers); i++ 
		go uc.querySecret()
	
	for i := 0; i < int(uc.config.Load.ServiceAccountTokenWorkers); i++ 
		go uc.processServiceAccountToken()
	
	for i := 0; i < int(uc.config.Load.QueryPersistentVolumeWorkers); i++ 
		go uc.queryPersistentVolume()
	
	for i := 0; i < int(uc.config.Load.QueryPersistentVolumeClaimWorkers); i++ 
		go uc.queryPersistentVolumeClaim()
	
	for i := 0; i < int(uc.config.Load.QueryVolumeAttachmentWorkers); i++ 
		go uc.queryVolumeAttachment()
	
	for i := 0; i < int(uc.config.Load.QueryNodeWorkers); i++ 
		go uc.queryNode()
	
	for i := 0; i < int(uc.config.Load.UpdateNodeWorkers); i++ 
		go uc.updateNode()
	
	for i := 0; i < int(uc.config.Load.DeletePodWorkers); i++ 
		go uc.deletePod()
	
	for i := 0; i < int(uc.config.Load.UpdateRuleStatusWorkers); i++ 
		go uc.updateRuleStatus()
	
	return nil

 // Start DownstreamController

func (dc *DownstreamController) Start() error 
	klog.Info("start downstream controller")
	// pod
	go dc.syncPod()
	// configmap
	go dc.syncConfigMap()
	// secret
	go dc.syncSecret()
	// nodes
	go dc.syncEdgeNodes()
	// rule
	go dc.syncRule()
	// ruleendpoint
	go dc.syncRuleEndpoint()

	return nil

 DeviceController

comprise:Upstream ControllerDownstream Controller

// DeviceController use beehive context message layer
type DeviceController struct 
	downstream *controller.DownstreamController
	upstream   *controller.UpstreamController
	enable     bool

cloud/pkg/devicecontroller/controller/upstream.go: UpstreamController

UpstreamController subscribe messages from edge and sync to k8s api server
// UpstreamController subscribe messages from edge and sync to k8s api server
type UpstreamController struct 
	crdClient    crdClientset.Interface
	messageLayer messagelayer.MessageLayer
	// message channel
	deviceStatusChan chan model.Message

	// downstream controller to update device status in cache
	dc *DownstreamController

cloud/pkg/devicecontroller/controller/downstream.go: DownstreamController

下行streamController watch kubernetes api server 并将变更发送至边缘节点。

// DownstreamController watch kubernetes api server and send change to edge
type DownstreamController struct 
	kubeClient   kubernetes.Interface
	messageLayer messagelayer.MessageLayer

	deviceManager      *manager.DeviceManager
	deviceModelManager *manager.DeviceModelManager
	configMapManager   *manager.ConfigMapManager

 DownstreamController .Start() 中有2个goroutine:

  • go dc.syncDeviceModel()   
  • go dc.syncDevice()
    // Start DownstreamController
    func (dc *DownstreamController) Start() error 
    	klog.Info("Start downstream devicecontroller")
    
    	go dc.syncDeviceModel()
    
    	// Wait for adding all device model
    	// TODO need to think about sync
    	time.Sleep(1 * time.Second)
    	go dc.syncDevice()
    
    	return nil
    

上行streamController订阅边缘节点信息并同步至 K8S api-server:

CustomResourceDefinition (CRD):创建一个新的CRD时,Kubernetes API服务器将为你指定的每个版本创建一个新的RESTful资源路径,我们可以根据该api路径来创建一些我们自己定义的类型资源。CRD可以是命名空间的,也可以是集群范围的,由CRD的作用域(scpoe)字段中所指定的,与现有的内置对象一样,删除名称空间将删除该名称空间中的所有自定义对象。

// UpstreamController subscribe messages from edge and sync to k8s api server
type UpstreamController struct 
	crdClient    crdClientset.Interface
	messageLayer messagelayer.MessageLayer
	// message channel
	deviceStatusChan chan model.Message

	// downstream controller to update device status in cache
	dc *DownstreamController
// Start UpstreamController
func (uc *UpstreamController) Start() error 
	klog.Info("Start upstream devicecontroller")

	uc.deviceStatusChan = make(chan model.Message, config.Config.Buffer.UpdateDeviceStatus)
	go uc.dispatchMessage()

	for i := 0; i < int(config.Config.Load.UpdateDeviceStatusWorkers); i++ 
		go uc.updateDeviceStatus()
	
	return nil

cloudHub:

type cloudHub struct 
	enable               bool
	informersSyncedFuncs []cache.InformerSynced
	messageq             *channelq.ChannelMessageQueue

 DispatchMessage():从云端拿到message,并从中解析出node Id;把message放入消息队列推送给相关节点。

func (a *cloudHub) Start() 
    ...

	// start dispatch message from the cloud to edge node
	go a.messageq.DispatchMessage()

	...
	// HttpServer mainly used to issue certificates for the edge
	go httpserver.StartHTTPServer()

	servers.StartCloudHub(a.messageq)

	if hubconfig.Config.UnixSocket.Enable 
		// The uds server is only used to communicate with csi driver from kubeedge on cloud.
		// It is not used to communicate between cloud and edge.
		go udsserver.StartServer(hubconfig.Config.UnixSocket.Address)
	
// StartCloudHub starts the cloud hub service
func StartCloudHub(messageq *channelq.ChannelMessageQueue) 
	handler.InitHandler(messageq)
	// start websocket server
	if hubconfig.Config.WebSocket.Enable 
		go startWebsocketServer()
	
	// start quic server
	if hubconfig.Config.Quic.Enable 
		go startQuicServer()
	

KubeEdge 边缘组件:

1、EdgeHub:  主要通过websocket与CloudHub进行通信

EdgeHub中有两类client,分别是httpclient以及websocket/quic client,前者用于与EdgeCore与CloudCore通信所需证书的申请,后者负责与CloudCore的日常通信(资源下发、状态上传等) 

//EdgeHub defines edgehub object structure
type EdgeHub struct 
	certManager   certificate.CertManager
	chClient      clients.Adapter
	reconnectChan chan struct
	keeperLock    sync.RWMutex
	enable        bool

eh.Start():

//Start sets context and starts the controller
func (eh *EdgeHub) Start() 
	eh.certManager = certificate.NewCertManager(config.Config.EdgeHub, config.Config.NodeName)
	eh.certManager.Start()

	HasTLSTunnelCerts <- true
	close(HasTLSTunnelCerts)

	go eh.ifRotationDone()

	for 
		select 
		case <-beehiveContext.Done():
			klog.Warning("EdgeHub stop")
			return
		default:
		
		err := eh.initial()
		if err != nil 
			klog.Exitf("failed to init controller: %v", err)
			return
		

		waitTime := time.Duration(config.Config.Heartbeat) * time.Second * 2

		err = eh.chClient.Init()
		if err != nil 
			klog.Errorf("connection failed: %v, will reconnect after %s", err, waitTime.String())
			time.Sleep(waitTime)
			continue
		
		// execute hook func after connect
		eh.pubConnectInfo(true)
		go eh.routeToEdge()
		go eh.routeToCloud()
		go eh.keepalive()

		// wait the stop signal
		// stop authinfo manager/websocket connection
		<-eh.reconnectChan
		eh.chClient.UnInit()

		// execute hook fun after disconnect
		eh.pubConnectInfo(false)

		// sleep one period of heartbeat, then try to connect cloud hub again
		klog.Warningf("connection is broken, will reconnect after %s", waitTime.String())
		time.Sleep(waitTime)

		// clean channel
	clean:
		for 
			select 
			case <-eh.reconnectChan:
			default:
				break clean
			
		
	

 Start()首先设置证书,然后是 eh.initial()和 eh.chClient.Init():建立并初始化cloudHubClient

func (eh *EdgeHub) initial() (err error) 
	cloudHubClient, err := clients.GetClient()
	if err != nil 
		return err
	

	eh.chClient = cloudHubClient

	return nil

Adapter 是一个websocket客户端接口。

//Adapter is a web socket client interface
type Adapter interface 
	Init() error  //此处的clients 为 wsclient
	UnInit()
	// async mode
	Send(message model.Message) error
	Receive() (model.Message, error)

	// notify auth info
	Notify(authInfo map[string]string)

 此处的clients 为 wsclientInit()websocket的初始化 函数。

Start()函数接着执行了eh.chClient.Init(), 也就是上面的 Init, 初始化过程执行了 websocket 的连接.

连接成功后启动了三个goroutineeh.routeToEdge()eh.routeToCloud()eh.keepalive()

// execute hook func after connect
		eh.pubConnectInfo(true)

        //routeToEdge(): 接收信息chClient.Receive()并分发eh.dispatch(message)给对应的 edge 
		go eh.routeToEdge() 

        //routeToCloud(): 接收来自边缘的信息beehiveContext.Receive(ModuleNameEdgeHub), 并发到 cloudhub eh.sendToCloud(message)
		go eh.routeToCloud()


		go eh.keepalive()

eh.routeToEdge(): 从cloudHub接收massage并转发给edgeHub 。

func (eh *EdgeHub) routeToEdge() 
	for 
		select 
		case <-beehiveContext.Done():
			klog.Warning("EdgeHub RouteToEdge stop")
			return
		default:
		
		message, err := eh.chClient.Receive()
		if err != nil 
			klog.Errorf("websocket read error: %v", err)
			eh.reconnectChan <- struct
			return
		

		klog.V(4).Infof("[edgehub/routeToEdge] receive msg from cloud, msg:% +v", message)
		err = eh.dispatch(message)
		if err != nil 
			klog.Errorf("failed to dispatch message, discard: %v", err)
		
	

 eh.routeToCloud():从  EdgeHub 接收massage,转发给CloudHub

func (eh *EdgeHub) routeToCloud() 
	for 
		select 
		case <-beehiveContext.Done():
			klog.Warning("EdgeHub RouteToCloud stop")
			return
		default:
		
		message, err := beehiveContext.Receive(modules.EdgeHubModuleName)
		if err != nil 
			klog.Errorf("failed to receive message from edge: %v", err)
			time.Sleep(time.Second)
			continue
		

		// post message to cloud hub
		err = eh.sendToCloud(message)
		if err != nil 
			klog.Errorf("failed to send message to cloud: %v", err)
			eh.reconnectChan <- struct
			return
		
	

  

metaManager:在Edged和EdgeHub之间传递信息,从SQLite中存取设备元数据。

当metamanager模块启动时,会开启两个goroutine,一个用于定时(默认60s)给自己发送消息通知进行边到云的podstatus数据同步;一个用于edgehub与edged/edgemesh的数据处理。

到达memanager的数据来源于两部分,一是edgehub,此时是云到边的数据,记为①;二是edged,此时是边到云的数据,记为②。

func (m *metaManager) Start() 
	if metaserverconfig.Config.Enable 
		imitator.StorageInit()
		go metaserver.NewMetaServer().Start(beehiveContext.Done())
	
	go func() 
		period := getSyncInterval()
		timer := time.NewTimer(period)
		for 
			select 
			case <-beehiveContext.Done():
				klog.Warning("MetaManager stop")
				return
			case <-timer.C:
				timer.Reset(period)
				msg := model.NewMessage("").BuildRouter(MetaManagerModuleName, GroupResource, model.ResourceTypePodStatus, OperationMetaSync)
				beehiveContext.Send(MetaManagerModuleName, *msg)
			
		
	()

	m.runMetaManager()

func (m *metaManager) runMetaManager() 
	go func() 
		for 
			select 
			case <-beehiveContext.Done():
				klog.Warning("MetaManager main loop stop")
				return
			default:
			
			msg, err := beehiveContext.Receive(m.Name())
			if err != nil 
				klog.Errorf("get a message %+v: %v", msg, err)
				continue
			
			klog.V(2).Infof("get a message %+v", msg)
			m.process(msg)
		
	()

m.process(message):  //持久化

 Edged:

当edged启动时,首先初始化并启动各个模块,最后进行pod的sync。下面以一个pod的创建来看一下edged中各个模块是如何协作完成pod的生命周期管理的。

当edged接收到pod的insert消息时,将pod所有信息加入podmanager、probemanager,podAdditionQueue加入node-namespace/node-name信息。

启动一个goroutine,创建下发到此节点的pod。

 

 

 

 

 

 

 

 

 

以上是关于kubeedge源码分析的主要内容,如果未能解决你的问题,请参考以下文章

技术干货EdgeMesh使用和源码分析

kubeedge: keadm 源码学习

kubeedge: keadm 源码学习

KubeEdge发布云原生边缘计算威胁模型及安全防护技术白皮书

drg-视图简写流程分析

The New Stack:KubeEdge将Kubernetes的能力延伸至边缘