kubeedge源码分析
Posted Kris_u
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kubeedge源码分析相关的知识,希望对你有一定的参考价值。
kubeedge的核心理念:
优势:
-
Kubernetes 原生支持:使用 KubeEdge 用户可以在边缘节点上编排应用、管理设备并监控应用程序/设备状态,就如同在云端操作 Kubernetes 集群一样。
-
云边可靠协作:在不稳定的云边网络上,可以保证消息传递的可靠性,不会丢失。
-
边缘自治:当云边之间的网络不稳定或者边缘端离线或重启时,确保边缘节点可以自主运行,同时确保边缘端的应用正常运行。
-
边缘设备管理:通过 Kubernetes 的原生API,并由CRD来管理边缘设备。
-
极致轻量的边缘代理:在资源有限的边缘端上运行的非常轻量级的边缘代理(EdgeCore)。
kubeedge 架构:
CSI是Container Storage Interface(容器存储接口)的简写.
CSI Driver from KubeEdge: this is more like CSI Driver proxy, and it implements all of the CSI Identity and Controller interfaces. It sends messages to CloudHub which will forward to edge. Actually all of the actions about the Volume Lifecycle are executed in the CSI Driver from Vendor at edge
1.CSI规范:
https://github.com/container-storage-interface/spec
KubeEdge 云端组件:
EdgeController 结构体:
// EdgeController use beehive context message layer
type EdgeController struct
config v1alpha1.EdgeController
upstream *controller.UpstreamController
downstream *controller.DownstreamController
Start EdgeController :
func (ec *EdgeController) Start()
if err := ec.upstream.Start(); err != nil
klog.Exitf("start upstream failed with error: %s", err)
if err := ec.downstream.Start(); err != nil
klog.Exitf("start downstream failed with error: %s", err)
// Start UpstreamController
func (uc *UpstreamController) Start() error
klog.Info("start upstream controller")
go uc.dispatchMessage()
for i := 0; i < int(uc.config.Load.UpdateNodeStatusWorkers); i++
go uc.updateNodeStatus()
for i := 0; i < int(uc.config.Load.UpdatePodStatusWorkers); i++
go uc.updatePodStatus()
for i := 0; i < int(uc.config.Load.QueryConfigMapWorkers); i++
go uc.queryConfigMap()
for i := 0; i < int(uc.config.Load.QuerySecretWorkers); i++
go uc.querySecret()
for i := 0; i < int(uc.config.Load.ServiceAccountTokenWorkers); i++
go uc.processServiceAccountToken()
for i := 0; i < int(uc.config.Load.QueryPersistentVolumeWorkers); i++
go uc.queryPersistentVolume()
for i := 0; i < int(uc.config.Load.QueryPersistentVolumeClaimWorkers); i++
go uc.queryPersistentVolumeClaim()
for i := 0; i < int(uc.config.Load.QueryVolumeAttachmentWorkers); i++
go uc.queryVolumeAttachment()
for i := 0; i < int(uc.config.Load.QueryNodeWorkers); i++
go uc.queryNode()
for i := 0; i < int(uc.config.Load.UpdateNodeWorkers); i++
go uc.updateNode()
for i := 0; i < int(uc.config.Load.DeletePodWorkers); i++
go uc.deletePod()
for i := 0; i < int(uc.config.Load.UpdateRuleStatusWorkers); i++
go uc.updateRuleStatus()
return nil
// Start DownstreamController
func (dc *DownstreamController) Start() error
klog.Info("start downstream controller")
// pod
go dc.syncPod()
// configmap
go dc.syncConfigMap()
// secret
go dc.syncSecret()
// nodes
go dc.syncEdgeNodes()
// rule
go dc.syncRule()
// ruleendpoint
go dc.syncRuleEndpoint()
return nil
DeviceController
comprise:Upstream Controller和 Downstream Controller
// DeviceController use beehive context message layer
type DeviceController struct
downstream *controller.DownstreamController
upstream *controller.UpstreamController
enable bool
cloud/pkg/devicecontroller/controller/upstream.go: UpstreamController
UpstreamController subscribe messages from edge and sync to k8s api server
// UpstreamController subscribe messages from edge and sync to k8s api server
type UpstreamController struct
crdClient crdClientset.Interface
messageLayer messagelayer.MessageLayer
// message channel
deviceStatusChan chan model.Message
// downstream controller to update device status in cache
dc *DownstreamController
cloud/pkg/devicecontroller/controller/downstream.go: DownstreamController
下行streamController watch kubernetes api server 并将变更发送至边缘节点。
// DownstreamController watch kubernetes api server and send change to edge
type DownstreamController struct
kubeClient kubernetes.Interface
messageLayer messagelayer.MessageLayer
deviceManager *manager.DeviceManager
deviceModelManager *manager.DeviceModelManager
configMapManager *manager.ConfigMapManager
DownstreamController .Start() 中有2个goroutine:
-
go dc.syncDeviceModel()
-
go dc.syncDevice()
// Start DownstreamController func (dc *DownstreamController) Start() error klog.Info("Start downstream devicecontroller") go dc.syncDeviceModel() // Wait for adding all device model // TODO need to think about sync time.Sleep(1 * time.Second) go dc.syncDevice() return nil
上行streamController订阅边缘节点信息并同步至 K8S api-server:
CustomResourceDefinition (CRD):创建一个新的CRD时,Kubernetes API服务器将为你指定的每个版本创建一个新的RESTful资源路径,我们可以根据该api路径来创建一些我们自己定义的类型资源。CRD可以是命名空间的,也可以是集群范围的,由CRD的作用域(scpoe)字段中所指定的,与现有的内置对象一样,删除名称空间将删除该名称空间中的所有自定义对象。
// UpstreamController subscribe messages from edge and sync to k8s api server
type UpstreamController struct
crdClient crdClientset.Interface
messageLayer messagelayer.MessageLayer
// message channel
deviceStatusChan chan model.Message
// downstream controller to update device status in cache
dc *DownstreamController
// Start UpstreamController
func (uc *UpstreamController) Start() error
klog.Info("Start upstream devicecontroller")
uc.deviceStatusChan = make(chan model.Message, config.Config.Buffer.UpdateDeviceStatus)
go uc.dispatchMessage()
for i := 0; i < int(config.Config.Load.UpdateDeviceStatusWorkers); i++
go uc.updateDeviceStatus()
return nil
cloudHub:
type cloudHub struct
enable bool
informersSyncedFuncs []cache.InformerSynced
messageq *channelq.ChannelMessageQueue
DispatchMessage():从云端拿到message,并从中解析出node Id;把message放入消息队列推送给相关节点。
func (a *cloudHub) Start()
...
// start dispatch message from the cloud to edge node
go a.messageq.DispatchMessage()
...
// HttpServer mainly used to issue certificates for the edge
go httpserver.StartHTTPServer()
servers.StartCloudHub(a.messageq)
if hubconfig.Config.UnixSocket.Enable
// The uds server is only used to communicate with csi driver from kubeedge on cloud.
// It is not used to communicate between cloud and edge.
go udsserver.StartServer(hubconfig.Config.UnixSocket.Address)
// StartCloudHub starts the cloud hub service
func StartCloudHub(messageq *channelq.ChannelMessageQueue)
handler.InitHandler(messageq)
// start websocket server
if hubconfig.Config.WebSocket.Enable
go startWebsocketServer()
// start quic server
if hubconfig.Config.Quic.Enable
go startQuicServer()
KubeEdge 边缘组件:
1、EdgeHub: 主要通过websocket与CloudHub进行通信
EdgeHub中有两类client,分别是httpclient以及websocket/quic client,前者用于与EdgeCore与CloudCore通信所需证书的申请,后者负责与CloudCore的日常通信(资源下发、状态上传等)
//EdgeHub defines edgehub object structure
type EdgeHub struct
certManager certificate.CertManager
chClient clients.Adapter
reconnectChan chan struct
keeperLock sync.RWMutex
enable bool
eh.Start():
//Start sets context and starts the controller
func (eh *EdgeHub) Start()
eh.certManager = certificate.NewCertManager(config.Config.EdgeHub, config.Config.NodeName)
eh.certManager.Start()
HasTLSTunnelCerts <- true
close(HasTLSTunnelCerts)
go eh.ifRotationDone()
for
select
case <-beehiveContext.Done():
klog.Warning("EdgeHub stop")
return
default:
err := eh.initial()
if err != nil
klog.Exitf("failed to init controller: %v", err)
return
waitTime := time.Duration(config.Config.Heartbeat) * time.Second * 2
err = eh.chClient.Init()
if err != nil
klog.Errorf("connection failed: %v, will reconnect after %s", err, waitTime.String())
time.Sleep(waitTime)
continue
// execute hook func after connect
eh.pubConnectInfo(true)
go eh.routeToEdge()
go eh.routeToCloud()
go eh.keepalive()
// wait the stop signal
// stop authinfo manager/websocket connection
<-eh.reconnectChan
eh.chClient.UnInit()
// execute hook fun after disconnect
eh.pubConnectInfo(false)
// sleep one period of heartbeat, then try to connect cloud hub again
klog.Warningf("connection is broken, will reconnect after %s", waitTime.String())
time.Sleep(waitTime)
// clean channel
clean:
for
select
case <-eh.reconnectChan:
default:
break clean
Start()首先设置证书,然后是 eh.initial()和 eh.chClient.Init():建立并初始化cloudHubClient
func (eh *EdgeHub) initial() (err error)
cloudHubClient, err := clients.GetClient()
if err != nil
return err
eh.chClient = cloudHubClient
return nil
Adapter 是一个websocket客户端接口。
//Adapter is a web socket client interface
type Adapter interface
Init() error //此处的clients 为 wsclient
UnInit()
// async mode
Send(message model.Message) error
Receive() (model.Message, error)
// notify auth info
Notify(authInfo map[string]string)
此处的clients
为 wsclient
, Init()websocket的初始化
函数。
Start()
函数接着执行了eh.chClient.Init()
, 也就是上面的 Init, 初始化过程执行了 websocket 的连接.
连接成功后启动了三个goroutine
: eh.routeToEdge()
, eh.routeToCloud()
, eh.keepalive()
// execute hook func after connect
eh.pubConnectInfo(true)
//routeToEdge(): 接收信息chClient.Receive()并分发eh.dispatch(message)给对应的 edge
go eh.routeToEdge()
//routeToCloud(): 接收来自边缘的信息beehiveContext.Receive(ModuleNameEdgeHub), 并发到 cloudhub eh.sendToCloud(message)
go eh.routeToCloud()
go eh.keepalive()
eh.routeToEdge(): 从cloudHub接收massage并转发给edgeHub 。
func (eh *EdgeHub) routeToEdge()
for
select
case <-beehiveContext.Done():
klog.Warning("EdgeHub RouteToEdge stop")
return
default:
message, err := eh.chClient.Receive()
if err != nil
klog.Errorf("websocket read error: %v", err)
eh.reconnectChan <- struct
return
klog.V(4).Infof("[edgehub/routeToEdge] receive msg from cloud, msg:% +v", message)
err = eh.dispatch(message)
if err != nil
klog.Errorf("failed to dispatch message, discard: %v", err)
eh.routeToCloud():从 EdgeHub 接收massage,转发给CloudHub
func (eh *EdgeHub) routeToCloud()
for
select
case <-beehiveContext.Done():
klog.Warning("EdgeHub RouteToCloud stop")
return
default:
message, err := beehiveContext.Receive(modules.EdgeHubModuleName)
if err != nil
klog.Errorf("failed to receive message from edge: %v", err)
time.Sleep(time.Second)
continue
// post message to cloud hub
err = eh.sendToCloud(message)
if err != nil
klog.Errorf("failed to send message to cloud: %v", err)
eh.reconnectChan <- struct
return
metaManager:在Edged和EdgeHub之间传递信息,从SQLite中存取设备元数据。
当metamanager模块启动时,会开启两个goroutine,一个用于定时(默认60s)给自己发送消息通知进行边到云的podstatus数据同步;一个用于edgehub与edged/edgemesh的数据处理。
到达memanager的数据来源于两部分,一是edgehub,此时是云到边的数据,记为①;二是edged,此时是边到云的数据,记为②。
func (m *metaManager) Start()
if metaserverconfig.Config.Enable
imitator.StorageInit()
go metaserver.NewMetaServer().Start(beehiveContext.Done())
go func()
period := getSyncInterval()
timer := time.NewTimer(period)
for
select
case <-beehiveContext.Done():
klog.Warning("MetaManager stop")
return
case <-timer.C:
timer.Reset(period)
msg := model.NewMessage("").BuildRouter(MetaManagerModuleName, GroupResource, model.ResourceTypePodStatus, OperationMetaSync)
beehiveContext.Send(MetaManagerModuleName, *msg)
()
m.runMetaManager()
func (m *metaManager) runMetaManager()
go func()
for
select
case <-beehiveContext.Done():
klog.Warning("MetaManager main loop stop")
return
default:
msg, err := beehiveContext.Receive(m.Name())
if err != nil
klog.Errorf("get a message %+v: %v", msg, err)
continue
klog.V(2).Infof("get a message %+v", msg)
m.process(msg)
()
m.process(message): //持久化
Edged:
当edged启动时,首先初始化并启动各个模块,最后进行pod的sync。下面以一个pod的创建来看一下edged中各个模块是如何协作完成pod的生命周期管理的。
当edged接收到pod的insert消息时,将pod所有信息加入podmanager、probemanager,podAdditionQueue加入node-namespace/node-name信息。
启动一个goroutine,创建下发到此节点的pod。
以上是关于kubeedge源码分析的主要内容,如果未能解决你的问题,请参考以下文章