kubeedge源码分析课程学习

Posted Kris_u

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kubeedge源码分析课程学习相关的知识,希望对你有一定的参考价值。

kubeedge的核心理念:

支持CRI集成Containerd、CRI-O,优化runtime资源消耗。

kubeedge 架构:

 

 CSI是Container Storage Interface(容器存储接口)的简写.

1.CSI规范:

https://github.com/container-storage-interface/spec

KubeEdge 云端组件:

EdgeController 结构体: 

// EdgeController use beehive context message layer
type EdgeController struct 
	config     v1alpha1.EdgeController
	upstream   *controller.UpstreamController
	downstream *controller.DownstreamController

Start EdgeController :

func (ec *EdgeController) Start() 
	if err := ec.upstream.Start(); err != nil 
		klog.Exitf("start upstream failed with error: %s", err)
	

	if err := ec.downstream.Start(); err != nil 
		klog.Exitf("start downstream failed with error: %s", err)
	

 DeviceController包含:Upstream ControllerDownstream Controller

// DeviceController use beehive context message layer
type DeviceController struct 
	downstream *controller.DownstreamController
	upstream   *controller.UpstreamController
	enable     bool

下行streamController watch kubernetes api server 并将变更发送至边缘节点。

// DownstreamController watch kubernetes api server and send change to edge
type DownstreamController struct 
	kubeClient   kubernetes.Interface
	messageLayer messagelayer.MessageLayer

	deviceManager      *manager.DeviceManager
	deviceModelManager *manager.DeviceModelManager
	configMapManager   *manager.ConfigMapManager

 DownstreamController .Start() 中有2个goroutine:

  • go dc.syncDeviceModel()   
  • go dc.syncDevice()
    // Start DownstreamController
    func (dc *DownstreamController) Start() error 
    	klog.Info("Start downstream devicecontroller")
    
    	go dc.syncDeviceModel()
    
    	// Wait for adding all device model
    	// TODO need to think about sync
    	time.Sleep(1 * time.Second)
    	go dc.syncDevice()
    
    	return nil
    

上行streamController订阅边缘节点信息并同步至 K8S api-server:

CustomResourceDefinition (CRD):创建一个新的CRD时,Kubernetes API服务器将为你指定的每个版本创建一个新的RESTful资源路径,我们可以根据该api路径来创建一些我们自己定义的类型资源。CRD可以是命名空间的,也可以是集群范围的,由CRD的作用域(scpoe)字段中所指定的,与现有的内置对象一样,删除名称空间将删除该名称空间中的所有自定义对象。

// UpstreamController subscribe messages from edge and sync to k8s api server
type UpstreamController struct 
	crdClient    crdClientset.Interface
	messageLayer messagelayer.MessageLayer
	// message channel
	deviceStatusChan chan model.Message

	// downstream controller to update device status in cache
	dc *DownstreamController
// Start UpstreamController
func (uc *UpstreamController) Start() error 
	klog.Info("Start upstream devicecontroller")

	uc.deviceStatusChan = make(chan model.Message, config.Config.Buffer.UpdateDeviceStatus)
	go uc.dispatchMessage()

	for i := 0; i < int(config.Config.Load.UpdateDeviceStatusWorkers); i++ 
		go uc.updateDeviceStatus()
	
	return nil

cloudHub:

type cloudHub struct 
	enable               bool
	informersSyncedFuncs []cache.InformerSynced
	messageq             *channelq.ChannelMessageQueue

 DispatchMessage():从云端拿到message,并从中解析出node Id;把message放入消息队列推送给相关节点。

func (a *cloudHub) Start() 
    ...

	// start dispatch message from the cloud to edge node
	go a.messageq.DispatchMessage()

	...
	// HttpServer mainly used to issue certificates for the edge
	go httpserver.StartHTTPServer()

	servers.StartCloudHub(a.messageq)

	if hubconfig.Config.UnixSocket.Enable 
		// The uds server is only used to communicate with csi driver from kubeedge on cloud.
		// It is not used to communicate between cloud and edge.
		go udsserver.StartServer(hubconfig.Config.UnixSocket.Address)
	
// StartCloudHub starts the cloud hub service
func StartCloudHub(messageq *channelq.ChannelMessageQueue) 
	handler.InitHandler(messageq)
	// start websocket server
	if hubconfig.Config.WebSocket.Enable 
		go startWebsocketServer()
	
	// start quic server
	if hubconfig.Config.Quic.Enable 
		go startQuicServer()
	

KubeEdge 边缘组件:

1、EdgeHub:  主要通过websocket与CloudHub进行通信

EdgeHub中有两类client,分别是httpclient以及websocket/quic client,前者用于与EdgeCore与CloudCore通信所需证书的申请,后者负责与CloudCore的日常通信(资源下发、状态上传等) 

//EdgeHub defines edgehub object structure
type EdgeHub struct 
	certManager   certificate.CertManager
	chClient      clients.Adapter
	reconnectChan chan struct
	keeperLock    sync.RWMutex
	enable        bool

eh.Start():

//Start sets context and starts the controller
func (eh *EdgeHub) Start() 
	eh.certManager = certificate.NewCertManager(config.Config.EdgeHub, config.Config.NodeName)
	eh.certManager.Start()

	HasTLSTunnelCerts <- true
	close(HasTLSTunnelCerts)

	go eh.ifRotationDone()

	for 
		select 
		case <-beehiveContext.Done():
			klog.Warning("EdgeHub stop")
			return
		default:
		
		err := eh.initial()
		if err != nil 
			klog.Exitf("failed to init controller: %v", err)
			return
		

		waitTime := time.Duration(config.Config.Heartbeat) * time.Second * 2

		err = eh.chClient.Init()
		if err != nil 
			klog.Errorf("connection failed: %v, will reconnect after %s", err, waitTime.String())
			time.Sleep(waitTime)
			continue
		
		// execute hook func after connect
		eh.pubConnectInfo(true)
		go eh.routeToEdge()
		go eh.routeToCloud()
		go eh.keepalive()

		// wait the stop signal
		// stop authinfo manager/websocket connection
		<-eh.reconnectChan
		eh.chClient.UnInit()

		// execute hook fun after disconnect
		eh.pubConnectInfo(false)

		// sleep one period of heartbeat, then try to connect cloud hub again
		klog.Warningf("connection is broken, will reconnect after %s", waitTime.String())
		time.Sleep(waitTime)

		// clean channel
	clean:
		for 
			select 
			case <-eh.reconnectChan:
			default:
				break clean
			
		
	

 Start()首先设置证书,然后是 eh.initial()和 eh.chClient.Init():建立并初始化cloudHubClient

func (eh *EdgeHub) initial() (err error) 
	cloudHubClient, err := clients.GetClient()
	if err != nil 
		return err
	

	eh.chClient = cloudHubClient

	return nil

Adapter 是一个websocket客户端接口。

//Adapter is a web socket client interface
type Adapter interface 
	Init() error  //此处的clients 为 wsclient
	UnInit()
	// async mode
	Send(message model.Message) error
	Receive() (model.Message, error)

	// notify auth info
	Notify(authInfo map[string]string)

 此处的clients 为 wsclientInit()websocket的初始化 函数。

Start()函数接着执行了eh.chClient.Init(), 也就是上面的 Init, 初始化过程执行了 websocket 的连接.

连接成功后启动了三个goroutineeh.routeToEdge()eh.routeToCloud()eh.keepalive()

// execute hook func after connect
		eh.pubConnectInfo(true)

        //routeToEdge(): 接收信息chClient.Receive()并分发eh.dispatch(message)给对应的 edge 
		go eh.routeToEdge() 

        //routeToCloud(): 接收来自边缘的信息beehiveContext.Receive(ModuleNameEdgeHub), 并发到 cloudhub eh.sendToCloud(message)
		go eh.routeToCloud()


		go eh.keepalive()

eh.routeToEdge(): 从cloudHub接收massage并转发给edgeHub 。

func (eh *EdgeHub) routeToEdge() 
	for 
		select 
		case <-beehiveContext.Done():
			klog.Warning("EdgeHub RouteToEdge stop")
			return
		default:
		
		message, err := eh.chClient.Receive()
		if err != nil 
			klog.Errorf("websocket read error: %v", err)
			eh.reconnectChan <- struct
			return
		

		klog.V(4).Infof("[edgehub/routeToEdge] receive msg from cloud, msg:% +v", message)
		err = eh.dispatch(message)
		if err != nil 
			klog.Errorf("failed to dispatch message, discard: %v", err)
		
	

 eh.routeToCloud():从  EdgeHub 接收massage,转发给CloudHub

func (eh *EdgeHub) routeToCloud() 
	for 
		select 
		case <-beehiveContext.Done():
			klog.Warning("EdgeHub RouteToCloud stop")
			return
		default:
		
		message, err := beehiveContext.Receive(modules.EdgeHubModuleName)
		if err != nil 
			klog.Errorf("failed to receive message from edge: %v", err)
			time.Sleep(time.Second)
			continue
		

		// post message to cloud hub
		err = eh.sendToCloud(message)
		if err != nil 
			klog.Errorf("failed to send message to cloud: %v", err)
			eh.reconnectChan <- struct
			return
		
	

  

metaManager:在Edged和EdgeHub之间传递信息,从SQLite中存取设备元数据。

当metamanager模块启动时,会开启两个goroutine,一个用于定时(默认60s)给自己发送消息通知进行边到云的podstatus数据同步;一个用于edgehub与edged/edgemesh的数据处理。

到达memanager的数据来源于两部分,一是edgehub,此时是云到边的数据,记为①;二是edged,此时是边到云的数据,记为②。

func (m *metaManager) Start() 
	if metaserverconfig.Config.Enable 
		imitator.StorageInit()
		go metaserver.NewMetaServer().Start(beehiveContext.Done())
	
	go func() 
		period := getSyncInterval()
		timer := time.NewTimer(period)
		for 
			select 
			case <-beehiveContext.Done():
				klog.Warning("MetaManager stop")
				return
			case <-timer.C:
				timer.Reset(period)
				msg := model.NewMessage("").BuildRouter(MetaManagerModuleName, GroupResource, model.ResourceTypePodStatus, OperationMetaSync)
				beehiveContext.Send(MetaManagerModuleName, *msg)
			
		
	()

	m.runMetaManager()

func (m *metaManager) runMetaManager() 
	go func() 
		for 
			select 
			case <-beehiveContext.Done():
				klog.Warning("MetaManager main loop stop")
				return
			default:
			
			msg, err := beehiveContext.Receive(m.Name())
			if err != nil 
				klog.Errorf("get a message %+v: %v", msg, err)
				continue
			
			klog.V(2).Infof("get a message %+v", msg)
			m.process(msg)
		
	()

m.process(message):  //持久化

 Edged:

当edged启动时,首先初始化并启动各个模块,最后进行pod的sync。下面以一个pod的创建来看一下edged中各个模块是如何协作完成pod的生命周期管理的。

当edged接收到pod的insert消息时,将pod所有信息加入podmanager、probemanager,podAdditionQueue加入node-namespace/node-name信息。

启动一个goroutine,创建下发到此节点的pod。

 

 

 

 

 

 

 

 

 

 

以上是关于kubeedge源码分析课程学习的主要内容,如果未能解决你的问题,请参考以下文章

kubeedge: keadm 源码学习

技术干货EdgeMesh使用和源码分析

kubeedge: keadm 源码学习

Sedna终身学习以及KubeEdge梳理

KubeEdge发布云原生边缘计算威胁模型及安全防护技术白皮书

drg-视图简写流程分析