kubeedge源码分析课程学习
Posted Kris_u
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kubeedge源码分析课程学习相关的知识,希望对你有一定的参考价值。
kubeedge的核心理念:
支持CRI集成Containerd、CRI-O,优化runtime资源消耗。
kubeedge 架构:
CSI是Container Storage Interface(容器存储接口)的简写.
1.CSI规范:
https://github.com/container-storage-interface/spec
KubeEdge 云端组件:
EdgeController 结构体:
// EdgeController use beehive context message layer
type EdgeController struct
config v1alpha1.EdgeController
upstream *controller.UpstreamController
downstream *controller.DownstreamController
Start EdgeController :
func (ec *EdgeController) Start()
if err := ec.upstream.Start(); err != nil
klog.Exitf("start upstream failed with error: %s", err)
if err := ec.downstream.Start(); err != nil
klog.Exitf("start downstream failed with error: %s", err)
DeviceController包含:Upstream Controller和 Downstream Controller
// DeviceController use beehive context message layer
type DeviceController struct
downstream *controller.DownstreamController
upstream *controller.UpstreamController
enable bool
下行streamController watch kubernetes api server 并将变更发送至边缘节点。
// DownstreamController watch kubernetes api server and send change to edge
type DownstreamController struct
kubeClient kubernetes.Interface
messageLayer messagelayer.MessageLayer
deviceManager *manager.DeviceManager
deviceModelManager *manager.DeviceModelManager
configMapManager *manager.ConfigMapManager
DownstreamController .Start() 中有2个goroutine:
-
go dc.syncDeviceModel()
-
go dc.syncDevice()
// Start DownstreamController func (dc *DownstreamController) Start() error klog.Info("Start downstream devicecontroller") go dc.syncDeviceModel() // Wait for adding all device model // TODO need to think about sync time.Sleep(1 * time.Second) go dc.syncDevice() return nil
上行streamController订阅边缘节点信息并同步至 K8S api-server:
CustomResourceDefinition (CRD):创建一个新的CRD时,Kubernetes API服务器将为你指定的每个版本创建一个新的RESTful资源路径,我们可以根据该api路径来创建一些我们自己定义的类型资源。CRD可以是命名空间的,也可以是集群范围的,由CRD的作用域(scpoe)字段中所指定的,与现有的内置对象一样,删除名称空间将删除该名称空间中的所有自定义对象。
// UpstreamController subscribe messages from edge and sync to k8s api server
type UpstreamController struct
crdClient crdClientset.Interface
messageLayer messagelayer.MessageLayer
// message channel
deviceStatusChan chan model.Message
// downstream controller to update device status in cache
dc *DownstreamController
// Start UpstreamController
func (uc *UpstreamController) Start() error
klog.Info("Start upstream devicecontroller")
uc.deviceStatusChan = make(chan model.Message, config.Config.Buffer.UpdateDeviceStatus)
go uc.dispatchMessage()
for i := 0; i < int(config.Config.Load.UpdateDeviceStatusWorkers); i++
go uc.updateDeviceStatus()
return nil
cloudHub:
type cloudHub struct
enable bool
informersSyncedFuncs []cache.InformerSynced
messageq *channelq.ChannelMessageQueue
DispatchMessage():从云端拿到message,并从中解析出node Id;把message放入消息队列推送给相关节点。
func (a *cloudHub) Start()
...
// start dispatch message from the cloud to edge node
go a.messageq.DispatchMessage()
...
// HttpServer mainly used to issue certificates for the edge
go httpserver.StartHTTPServer()
servers.StartCloudHub(a.messageq)
if hubconfig.Config.UnixSocket.Enable
// The uds server is only used to communicate with csi driver from kubeedge on cloud.
// It is not used to communicate between cloud and edge.
go udsserver.StartServer(hubconfig.Config.UnixSocket.Address)
// StartCloudHub starts the cloud hub service
func StartCloudHub(messageq *channelq.ChannelMessageQueue)
handler.InitHandler(messageq)
// start websocket server
if hubconfig.Config.WebSocket.Enable
go startWebsocketServer()
// start quic server
if hubconfig.Config.Quic.Enable
go startQuicServer()
KubeEdge 边缘组件:
1、EdgeHub: 主要通过websocket与CloudHub进行通信
EdgeHub中有两类client,分别是httpclient以及websocket/quic client,前者用于与EdgeCore与CloudCore通信所需证书的申请,后者负责与CloudCore的日常通信(资源下发、状态上传等)
//EdgeHub defines edgehub object structure
type EdgeHub struct
certManager certificate.CertManager
chClient clients.Adapter
reconnectChan chan struct
keeperLock sync.RWMutex
enable bool
eh.Start():
//Start sets context and starts the controller
func (eh *EdgeHub) Start()
eh.certManager = certificate.NewCertManager(config.Config.EdgeHub, config.Config.NodeName)
eh.certManager.Start()
HasTLSTunnelCerts <- true
close(HasTLSTunnelCerts)
go eh.ifRotationDone()
for
select
case <-beehiveContext.Done():
klog.Warning("EdgeHub stop")
return
default:
err := eh.initial()
if err != nil
klog.Exitf("failed to init controller: %v", err)
return
waitTime := time.Duration(config.Config.Heartbeat) * time.Second * 2
err = eh.chClient.Init()
if err != nil
klog.Errorf("connection failed: %v, will reconnect after %s", err, waitTime.String())
time.Sleep(waitTime)
continue
// execute hook func after connect
eh.pubConnectInfo(true)
go eh.routeToEdge()
go eh.routeToCloud()
go eh.keepalive()
// wait the stop signal
// stop authinfo manager/websocket connection
<-eh.reconnectChan
eh.chClient.UnInit()
// execute hook fun after disconnect
eh.pubConnectInfo(false)
// sleep one period of heartbeat, then try to connect cloud hub again
klog.Warningf("connection is broken, will reconnect after %s", waitTime.String())
time.Sleep(waitTime)
// clean channel
clean:
for
select
case <-eh.reconnectChan:
default:
break clean
Start()首先设置证书,然后是 eh.initial()和 eh.chClient.Init():建立并初始化cloudHubClient
func (eh *EdgeHub) initial() (err error)
cloudHubClient, err := clients.GetClient()
if err != nil
return err
eh.chClient = cloudHubClient
return nil
Adapter 是一个websocket客户端接口。
//Adapter is a web socket client interface
type Adapter interface
Init() error //此处的clients 为 wsclient
UnInit()
// async mode
Send(message model.Message) error
Receive() (model.Message, error)
// notify auth info
Notify(authInfo map[string]string)
此处的clients
为 wsclient
, Init()websocket的初始化
函数。
Start()
函数接着执行了eh.chClient.Init()
, 也就是上面的 Init, 初始化过程执行了 websocket 的连接.
连接成功后启动了三个goroutine
: eh.routeToEdge()
, eh.routeToCloud()
, eh.keepalive()
// execute hook func after connect
eh.pubConnectInfo(true)
//routeToEdge(): 接收信息chClient.Receive()并分发eh.dispatch(message)给对应的 edge
go eh.routeToEdge()
//routeToCloud(): 接收来自边缘的信息beehiveContext.Receive(ModuleNameEdgeHub), 并发到 cloudhub eh.sendToCloud(message)
go eh.routeToCloud()
go eh.keepalive()
eh.routeToEdge(): 从cloudHub接收massage并转发给edgeHub 。
func (eh *EdgeHub) routeToEdge()
for
select
case <-beehiveContext.Done():
klog.Warning("EdgeHub RouteToEdge stop")
return
default:
message, err := eh.chClient.Receive()
if err != nil
klog.Errorf("websocket read error: %v", err)
eh.reconnectChan <- struct
return
klog.V(4).Infof("[edgehub/routeToEdge] receive msg from cloud, msg:% +v", message)
err = eh.dispatch(message)
if err != nil
klog.Errorf("failed to dispatch message, discard: %v", err)
eh.routeToCloud():从 EdgeHub 接收massage,转发给CloudHub
func (eh *EdgeHub) routeToCloud()
for
select
case <-beehiveContext.Done():
klog.Warning("EdgeHub RouteToCloud stop")
return
default:
message, err := beehiveContext.Receive(modules.EdgeHubModuleName)
if err != nil
klog.Errorf("failed to receive message from edge: %v", err)
time.Sleep(time.Second)
continue
// post message to cloud hub
err = eh.sendToCloud(message)
if err != nil
klog.Errorf("failed to send message to cloud: %v", err)
eh.reconnectChan <- struct
return
metaManager:在Edged和EdgeHub之间传递信息,从SQLite中存取设备元数据。
当metamanager模块启动时,会开启两个goroutine,一个用于定时(默认60s)给自己发送消息通知进行边到云的podstatus数据同步;一个用于edgehub与edged/edgemesh的数据处理。
到达memanager的数据来源于两部分,一是edgehub,此时是云到边的数据,记为①;二是edged,此时是边到云的数据,记为②。
func (m *metaManager) Start()
if metaserverconfig.Config.Enable
imitator.StorageInit()
go metaserver.NewMetaServer().Start(beehiveContext.Done())
go func()
period := getSyncInterval()
timer := time.NewTimer(period)
for
select
case <-beehiveContext.Done():
klog.Warning("MetaManager stop")
return
case <-timer.C:
timer.Reset(period)
msg := model.NewMessage("").BuildRouter(MetaManagerModuleName, GroupResource, model.ResourceTypePodStatus, OperationMetaSync)
beehiveContext.Send(MetaManagerModuleName, *msg)
()
m.runMetaManager()
func (m *metaManager) runMetaManager()
go func()
for
select
case <-beehiveContext.Done():
klog.Warning("MetaManager main loop stop")
return
default:
msg, err := beehiveContext.Receive(m.Name())
if err != nil
klog.Errorf("get a message %+v: %v", msg, err)
continue
klog.V(2).Infof("get a message %+v", msg)
m.process(msg)
()
m.process(message): //持久化
Edged:
当edged启动时,首先初始化并启动各个模块,最后进行pod的sync。下面以一个pod的创建来看一下edged中各个模块是如何协作完成pod的生命周期管理的。
当edged接收到pod的insert消息时,将pod所有信息加入podmanager、probemanager,podAdditionQueue加入node-namespace/node-name信息。
启动一个goroutine,创建下发到此节点的pod。
以上是关于kubeedge源码分析课程学习的主要内容,如果未能解决你的问题,请参考以下文章