kubeedge设备添加以及mapper管理

Posted Kris_u

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kubeedge设备添加以及mapper管理相关的知识,希望对你有一定的参考价值。

kubeedge架构:包括云端和边端两部分 。

In the Cloud

  • CloudHub: a web socket server responsible for watching changes at the cloud side, caching and sending messages to EdgeHub.( CloudHub是一个websocket 服务器,负责监控云边的变化、缓存以及向EdgeHub发送消息message)
  • EdgeController: an extended kubernetes controller which manages edge nodes and pods metadata so that the data can be targeted to a specific edge node.(EdgeController 是一个扩展的K8S控制器:负责管理边缘节点和pods 的元数据,因此数据才能被发送到指定的边缘节点)
  • DeviceController: an extended kubernetes controller which manages devices so that the device metadata/status data can be synced between edge and cloud.DeviceController 也是一个扩展性的K8S控制器,负责管理devices, 实现device元数据/状态数据在云端与边缘端的同步.) 

On the Edge

  • EdgeHub: a web socket client responsible for interacting with Cloud Service for the edge computing (like Edge Controller as in the KubeEdge Architecture). This includes syncing cloud-side resource updates to the edge, and reporting edge-side host and device status changes to the cloud. (EdgeHub 是一个websocket 客户端,负责与云边服务交互实现边缘计算『类似KubeEdge 架构中的Edge Controller』。其中包括将云边资源同步更新到边缘端以及将边端主机、设备状态变化广播至云端。)
  • Edged: an agent that runs on edge nodes and manages containerized applications. (Edged 是一个运行在边缘节点管理容器化应用的代理。)
  • EventBus: a MQTT client to interact with MQTT servers (mosquitto), offering publish and subscribe capabilities to other components. EventBus 是一个MQTT客户端负责与MQTT服务器mosquitto的交互,为其他组件提供发布与订阅功能。)
  • ServiceBus: a HTTP client to interact with HTTP servers (REST), offering HTTP client capabilities to components of cloud to reach HTTP servers running at edge.ServiceBus 是一个HTTP客户端与HTTP服务器使用REST进行交互,为云端组件提供HTTP客户端功能,使其请求到达运行在边缘端的HTTP服务器。)
  • DeviceTwin: responsible for storing device status and syncing device status to the cloud. It also provides query interfaces for applications.(DeviceTwin 负责存储设备状态,并将设备状态同步到云端,同时也提供了了应用的查询接口。)
  • MetaManager: the message processor between edged and edgehub. It is also responsible for storing/retrieving metadata to/from a lightweight database (SQLite).(MetaManager 是edged与edgehub之间的message 处理器,同时,也负责将元数据存储/查询 到/从 一个轻量级数据库SQLite。)

 KubeEdge 借助 Kubernetes CRD 和与正在使用的设备相对应的设备映射器(Mapper)来支持设备管理。 目前,我们借助设备控制器(Device Controller)和设备孪生(device twin)模块从云端进行设备管理,并在边缘节点和云端进行设备更新的同步。

 

下面以examples/web-demo at master · kubeedge/examples · GitHub为例简单介绍添加device设备和添加mapper的流程。

Firstly the users open a browser, and enter the web app page by the web app link, choose the music and click the button Play in the web page, at last the expected track is pushed to the edge node and the track is played on the speaker connected to the edge node.

大致功能就是:通过一个web页面选择要播放的歌曲名称,该HTTP客户端部署在云端,然后通过K8S 的CRD 控制将请求发送至指定节点,mapper负责从MQTT订阅云边同步到边端的massage, mapper负责控制边缘端的设备,同时将边缘端的设备状态通过MQTT的publish功能同步到云端。

CRD提供了:

  1. 用于从云中管理设备的API
  2. 在云节点和边缘节点之间同步设备更新

用户可以做到:

  • 描述设备属性。
    • 用户可以描述设备属性和访问机制,以便与设备交互/控制设备。
  • 从云中对设备执行CRUD操作。
    • 用户可以通过Kubernetes API服务器公开的CRD API从云中创建、更新和删除设备元数据。
    • 用户可以通过CRD API控制设备属性的预期(desired)状态。
  • 报告设备属性值。
    • 在边缘上运行的Mapper应用可以报告设备属性的当前值。

官方文档:     https://kubeedge.io/en/docs/developer/device_crd/

Device Controller 利用Device Model和Device Instance实现设备管理。

Device Model描述了设备属性,例如温度压力 摄像头模型就像一个可重复使用的模板,使用它可以创建和管理许多摄像头设备。 

                         Device Twin 属性期望值从云同步至边缘侧流程图

 Device Model Sample:

apiVersion: devices.kubeedge.io/v1alpha2
kind: DeviceModel
metadata:
 name: sensor-tag-model
 namespace: default
spec:
 properties:
  - name: temperature
    description: temperature in degree celsius
    type:
     int:
      accessMode: ReadWrite
      maximum: 100
      unit: degree celsius
  - name: temperature-enable
    description: enable data collection of temperature sensor
    type:
      string:
        accessMode: ReadWrite
        defaultValue: 'OFF'

Device Instance则代表一个实际的设备对象 它就像设备模型的实例化,并引用模型中定义的属性,这些属性由属性访问者公开以访问。 设备规范是静态的,而设备状态包含动态变化的数据,例如设备属性的期望状态真实状态

设备孪生(device twin)是设备的动态属性,表示具体设备的专有实时数据,例如摄像头的开/关状态。在设备孪生(twin)中,进一步定义了desired value(期望值)”和“reported value(真实值)”。其中,“desired value”指的是控制面希望设备达到的状态

一方面,Device-Twin 负责设备状态的存储,处理设备属性以及边缘设备与边缘节点之间的关系;另一方面,Device Controller则负责设备Metadata/Status在云边间的信息同步。

Device Instance Sample:

apiVersion: devices.kubeedge.io/v1alpha2
kind: Device
metadata:
  name: sensor-tag-instance-01
  labels:
    description: TISimplelinkSensorTag
    manufacturer: TexasInstruments
    model: CC2650
spec:
  deviceModelRef:
    name: sensor-tag-model
  protocol:
    modbus:
      slaveID: 1
    common:
      com:
        serialPort: '1'
        baudRate: 115200
        dataBits: 8
        parity: even
        stopBits: 1
  nodeSelector:
    nodeSelectorTerms:
    - matchExpressions:
      - key: ''
        operator: In
        values:
        - node1
  propertyVisitors:
    - propertyName: temperature
      modbus:
        register: CoilRegister
        offset: 2
        limit: 1
        scale: 1
        isSwap: true
        isRegisterSwap: true
    - propertyName: temperature-enable
      modbus:
        register: DiscreteInputRegister
        offset: 3
        limit: 1
        scale: 1.0
        isSwap: true
        isRegisterSwap: true
  data:
    dataTopic: "$ke/events/device/+/data/update"
    dataProperties:
      - propertyName: pressure
        metadata:
          type: int
      - propertyName: temperature
        metadata:
          type: int
status:
  twins:
    - propertyName: temperature
      reported:
        metadata:
          timestamp: '1550049403598'
          type: int
        value: '10'
      desired:
        metadata:
          timestamp: '1550049403598'
          type: int
        value: '15'

创建device-instance.yaml和deviceModel.yaml

examples/web-demo/kubeedge-web-app/deployments/kubeedge-speaker-instance.yaml

apiVersion: devices.kubeedge.io/v1alpha2
kind: Device
metadata:
  name: speaker-01 //device 名称,部署之后可以通过kubectl get device speaker-01查询该设备
  labels:
    description: 'Speaker'
    manufacturer: 'test'
spec:
  deviceModelRef:
    name: speaker-model
  nodeSelector:   //负责将指定设备所属的节点,即控制哪个边缘节点的播放器
    nodeSelectorTerms:
    - matchExpressions:
      - key: ''
        operator: In
        values:
        - raspberrypi  //raspberrypi是边缘节点,在树梅派上面播放音乐

 examples/web-demo/kubeedge-web-app/deployments/kubeedge-speaker-model.yaml

apiVersion: devices.kubeedge.io/v1alpha2
kind: DeviceModel
metadata:
 name: speaker-model
 namespace: default
spec:
 properties:
  - name: track
    description: music track to play
    type:
     string:
      accessMode: ReadWrite
      defaultValue: ''

Usage of Device CRD

The following are the steps to

  1. Create a device model in the cloud node.

            kubectl apply -f <path to device model yaml>
    
  2. Create a device instance in the cloud node.

           kubectl apply -f <path to device instance yaml>
    

云端通过K8S API server修改设备状态

K8s API Server  >>> 云端的CloudController >>>DeviceTwin >>>EventBus >>>MQTT Broker>>>Mapper(MQTT client.Publish/Subscribe)

web-demo 通过K8s api server 修改device status:

// DeviceStatus is used to patch device status
type DeviceStatus struct 
	Status v1alpha2.DeviceStatus `json:"status"`


// The device id of the speaker
var deviceID = "speaker-01"

// The default namespace in which the speaker device instance resides
var namespace = "default"

// The CRD client used to patch the device instance.
var crdClient *rest.RESTClient

func init() 
	// Create a client to talk to the K8S API server to patch the device CRDs
	kubeConfig, err := utils.KubeConfig()
	if err != nil 
		log.Fatalf("Failed to create KubeConfig, error : %v", err)
	
	log.Println("Get kubeConfig successfully")

	crdClient, err = utils.NewCRDClient(kubeConfig)
	if err != nil 
		log.Fatalf("Failed to create device crd client , error : %v", err)
	
	log.Println("Get crdClient successfully")


// UpdateDeviceTwinWithDesiredTrack patches the desired state of
// the device twin with the track to play.
func UpdateDeviceTwinWithDesiredTrack(track string) bool 
	status := buildStatusWithDesiredTrack(track)
	deviceStatus := &DeviceStatusStatus: status
	body, err := json.Marshal(deviceStatus)
	if err != nil 
		log.Printf("Failed to marshal device status %v", deviceStatus)
		return false
	
	result := crdClient.Patch(utils.MergePatchType).Namespace(namespace).Resource(utils.ResourceTypeDevices).Name(deviceID).Body(body).Do(context.TODO())
	if result.Error() != nil 
		log.Printf("Failed to patch device status %v of device %v in namespace %v \\n error:%+v", deviceStatus, deviceID, namespace, result.Error())
		return false
	 else 
		log.Printf("Track [ %s ] will be played on speaker %s", track, deviceID)
	
	return true


func buildStatusWithDesiredTrack(song string) v1alpha2.DeviceStatus 
	metadata := map[string]string"timestamp": strconv.FormatInt(time.Now().Unix()/1e6, 10),
		"type": "string",
	
	twins := []v1alpha2.TwinPropertyName: "track", Desired: v1alpha2.TwinPropertyValue: song, Metadata: metadata, Reported: v1alpha2.TwinPropertyValue: "unknown", Metadata: metadata
	devicestatus := v1alpha2.DeviceStatusTwins: twins
	return devicestatus

Mapper:  是一个用于连接和控制设备的应用。

mapper的主要功能如下:

  • Scan and connect to the device.(扫描和连接设备)
  • Report the actual state of twin-attributes of device.(广播设备真实状态的TWIN属性)
  • Map the expected state of device-twin to actual state of device-twin.(将期望的设备状态映射到设备的真实状态。)
  • Collect telemetry data from device.(收集设备的遥测数据)
  • Convert readings from device to format accepted by KubeEdge.(将从设备读取的信息转换为kubeedge可以接受的格式)
  • Schedule actions on the device.设备的调度
  • Check health of the device.检查设备的健康状况

                                                 Mapper 更新设备属性同步至云端流程图

 

Message Topics

KubeEdge 使用 MQTT 作为 deviceTwin 和devices/apps之间的通信. EventBus can be started in multiple MQTT modes and acts as an interface for sending/receiving messages on relevant MQTT topics.

Subscribe Topics

On starting EventBus, it subscribes to these 5 topics:

1. "$hw/events/node/+/membership/get"
2. "$hw/events/device/+/state/update"
3. "$hw/events/device/+/twin/+"
4. "$hw/events/upload/#"
5. "SYS/dis/upload_records"
6. "$ke/events/+/device/data/update"

If the message is received on first 3 topics, the message is sent to deviceTwin, else the message is sent to cloud via edgeHub.

We will focus on the message expected on the first 3 topics.

  1. "$hw/events/node/+/membership/get": This topic is used to get membership details of a node i.e the devices that are associated with the node. The response of the message is published on "$hw/events/node/+/membership/get/result" topic.

  2. "$hw/events/device/+/state/update": This topic is used to update the state of the device. + symbol can be replaced with ID of the device whose state is to be updated.

  3. "$hw/events/device/+/twin/+": The two + symbols can be replaced by the deviceID on whose twin the operation is to be performed and any one of(update,cloud_updated,get) respectively.

  4. "$ke/events/device/+/data/update" This topic is add in KubeEdge v1.4, and used for delivering time-serial data. This topic is not processed by edgecore, instead, they should be processed by third-party component on edge node such as EMQ Kuiper.

web-demo的mapper实现代码如下:

main.go主要实现了一下三个功能:

首先,读取音乐文件存入map

其次,创建MQTT客户端并连接

然后,从MQTT subscribe订阅来自云端的message,读取期望的设备状态,即拿到需要播放的音乐曲目

最后,调用树梅派上的音乐播放器,播放音乐。

package main

import (
...
)

var musicDir = "/home/pi/music/"

func main() 
	// Get music files
	files, err := os.Open(musicDir)
    ...
    //init MQTT client
	cli := client.New(&client.Options
		// Define the processing of the error handler.
		ErrorHandler: func(err error) 
			fmt.Println(err)
		,
	)

	fmt.Println("Create mqtt client successfully")

	stopchan := make(chan int)//这里使用无缓冲的通道防止程序退出,没有数据写入通道,通道将一直阻塞
	// Terminate the Client.
	defer cli.Terminate()//这里有点bug,程序不会退出,也就不会执行defer函数

	// Connect to the MQTT Server.
	err = cli.Connect(&client.ConnectOptions
		Network:  "tcp",
		Address:  "localhost:1883",
		ClientID: []byte("receive-client"),
	)
	if err != nil 
		panic(err)
	
	fmt.Println("Connect mqtt client successfully")

	err = cli.Subscribe(&client.SubscribeOptions
		SubReqs: []*client.SubReq
			
				TopicFilter: []byte(`$hw/events/device/speaker-01/twin/update/document`),
				QoS:         mqtt.QoS0,
				// Define the processing of the message handler.
				Handler: func(topicName, message []byte) 
					Update := &types.DeviceTwinDocument
					err := json.Unmarshal(message, Update)
					if err != nil 
						fmt.Println("Unmarshal error", err)
						fmt.Printf("Unmarshal error: %v\\n", err)
					
					cmd := exec.Command("pkill", "-9", "omxplayer")
					cmd.Run()

					trackToPlay := *Update.Twin["track"].CurrentState.Expected.Value
					fmt.Printf("Receive expected track: %s\\n", trackToPlay)

					// Stop music
					if trackToPlay == "stop" 
						return
					

					_, ok := m[trackToPlay]
					if !ok 
						fmt.Printf("Could not find song %s in playlist\\n", trackToPlay)
						trackToPlay = MapRandomKeyGet(m).(string)
						fmt.Printf("Selected random track %s to play\\n", trackToPlay)
					
					fmt.Printf("Playing track: %s\\n", musicDir+trackToPlay+".mp3")
					cmd = exec.Command("omxplayer", "-o", "local", musicDir+trackToPlay+".mp3")
					err = cmd.Run()
					if err != nil 
						fmt.Printf("Error while playing track: %v\\n", err)
					
				,
			,
		,
	)
	fmt.Println("Subscribe mqtt topic successfully")

	<-stopchan
	if err != nil 
		panic(err)
	 else 
		fmt.Println("Connection successfully")
	


func MapRandomKeyGet(mapI interface) interface 
	keys := reflect.ValueOf(mapI).MapKeys()

	return keys[rand.Intn(len(keys))].Interface()

以上是关于kubeedge设备添加以及mapper管理的主要内容,如果未能解决你的问题,请参考以下文章

kubeedge设备添加以及mapper管理

玩转KubeEdge

玩转KubeEdge

KubeEdge攻略

玩转KubeEdge

KubeEdge:下一代云原生边缘设备管理标准DMI的设计与实现