kubeedge设备添加以及mapper管理
Posted Kris_u
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kubeedge设备添加以及mapper管理相关的知识,希望对你有一定的参考价值。
kubeedge架构:包括云端和边端两部分 。
In the Cloud
- CloudHub: a web socket server responsible for watching changes at the cloud side, caching and sending messages to EdgeHub.( CloudHub是一个websocket 服务器,负责监控云边的变化、缓存以及向EdgeHub发送消息message)
- EdgeController: an extended kubernetes controller which manages edge nodes and pods metadata so that the data can be targeted to a specific edge node.(EdgeController 是一个扩展的K8S控制器:负责管理边缘节点和pods 的元数据,因此数据才能被发送到指定的边缘节点)
- DeviceController: an extended kubernetes controller which manages devices so that the device metadata/status data can be synced between edge and cloud.(DeviceController 也是一个扩展性的K8S控制器,负责管理devices, 实现device元数据/状态数据在云端与边缘端的同步.)
On the Edge
- EdgeHub: a web socket client responsible for interacting with Cloud Service for the edge computing (like Edge Controller as in the KubeEdge Architecture). This includes syncing cloud-side resource updates to the edge, and reporting edge-side host and device status changes to the cloud. (EdgeHub 是一个websocket 客户端,负责与云边服务交互实现边缘计算『类似KubeEdge 架构中的Edge Controller』。其中包括将云边资源同步更新到边缘端以及将边端主机、设备状态变化广播至云端。)
- Edged: an agent that runs on edge nodes and manages containerized applications. (Edged 是一个运行在边缘节点管理容器化应用的代理。)
- EventBus: a MQTT client to interact with MQTT servers (mosquitto), offering publish and subscribe capabilities to other components. (EventBus 是一个MQTT客户端负责与MQTT服务器mosquitto的交互,为其他组件提供发布与订阅功能。)
- ServiceBus: a HTTP client to interact with HTTP servers (REST), offering HTTP client capabilities to components of cloud to reach HTTP servers running at edge.(ServiceBus 是一个HTTP客户端与HTTP服务器使用REST进行交互,为云端组件提供HTTP客户端功能,使其请求到达运行在边缘端的HTTP服务器。)
- DeviceTwin: responsible for storing device status and syncing device status to the cloud. It also provides query interfaces for applications.(DeviceTwin 负责存储设备状态,并将设备状态同步到云端,同时也提供了了应用的查询接口。)
- MetaManager: the message processor between edged and edgehub. It is also responsible for storing/retrieving metadata to/from a lightweight database (SQLite).(MetaManager 是edged与edgehub之间的message 处理器,同时,也负责将元数据存储/查询 到/从 一个轻量级数据库SQLite。)
以examples/web-demo at master · kubeedge/examples · GitHub为例简单介绍添加device设备和添加mapper的流程。
Firstly the users open a browser, and enter the web app page by the web app link, choose the music and click the button Play
in the web page, at last the expected track is pushed to the edge node and the track is played on the speaker connected to the edge node.
大致功能就是:通过一个web页面选择要播放的歌曲名称,该HTTP客户端部署在云端,然后通过K8S 的CRD 控制将请求发送至指定节点,mapper负责从MQTT订阅云边同步到边端的massage, mapper负责控制边缘端的设备,同时将边缘端的设备状态通过MQTT的publish功能同步到云端。
CRD提供了:
- 用于从云中管理设备的API
- 在云节点和边缘节点之间同步设备更新
用户可以做到:
- 描述设备属性。
- 用户可以描述设备属性和访问机制,以便与设备交互/控制设备。
- 从云中对设备执行CRUD操作。
- 用户可以通过Kubernetes API服务器公开的CRD API从云中创建、更新和删除设备元数据。
- 用户可以通过CRD API控制设备属性的预期(desired)状态。
- 报告设备属性值。
- 在边缘上运行的Mapper应用可以报告设备属性的当前值。
官方文档: https://kubeedge.io/en/docs/developer/device_crd/
Device Model Sample:
apiVersion: devices.kubeedge.io/v1alpha2
kind: DeviceModel
metadata:
name: sensor-tag-model
namespace: default
spec:
properties:
- name: temperature
description: temperature in degree celsius
type:
int:
accessMode: ReadWrite
maximum: 100
unit: degree celsius
- name: temperature-enable
description: enable data collection of temperature sensor
type:
string:
accessMode: ReadWrite
defaultValue: 'OFF'
Device Instance Sample:
apiVersion: devices.kubeedge.io/v1alpha2
kind: Device
metadata:
name: sensor-tag-instance-01
labels:
description: TISimplelinkSensorTag
manufacturer: TexasInstruments
model: CC2650
spec:
deviceModelRef:
name: sensor-tag-model
protocol:
modbus:
slaveID: 1
common:
com:
serialPort: '1'
baudRate: 115200
dataBits: 8
parity: even
stopBits: 1
nodeSelector:
nodeSelectorTerms:
- matchExpressions:
- key: ''
operator: In
values:
- node1
propertyVisitors:
- propertyName: temperature
modbus:
register: CoilRegister
offset: 2
limit: 1
scale: 1
isSwap: true
isRegisterSwap: true
- propertyName: temperature-enable
modbus:
register: DiscreteInputRegister
offset: 3
limit: 1
scale: 1.0
isSwap: true
isRegisterSwap: true
data:
dataTopic: "$ke/events/device/+/data/update"
dataProperties:
- propertyName: pressure
metadata:
type: int
- propertyName: temperature
metadata:
type: int
status:
twins:
- propertyName: temperature
reported:
metadata:
timestamp: '1550049403598'
type: int
value: '10'
desired:
metadata:
timestamp: '1550049403598'
type: int
value: '15'
创建device-instance.yaml和deviceModel.yaml
examples/web-demo/kubeedge-web-app/deployments/kubeedge-speaker-instance.yaml
apiVersion: devices.kubeedge.io/v1alpha2
kind: Device
metadata:
name: speaker-01 //device 名称,部署之后可以通过kubectl get device speaker-01查询该设备
labels:
description: 'Speaker'
manufacturer: 'test'
spec:
deviceModelRef:
name: speaker-model
nodeSelector: //负责将指定设备所属的节点,即控制哪个边缘节点的播放器
nodeSelectorTerms:
- matchExpressions:
- key: ''
operator: In
values:
- raspberrypi //raspberrypi是边缘节点,在树梅派上面播放音乐
examples/web-demo/kubeedge-web-app/deployments/kubeedge-speaker-model.yaml
apiVersion: devices.kubeedge.io/v1alpha2
kind: DeviceModel
metadata:
name: speaker-model
namespace: default
spec:
properties:
- name: track
description: music track to play
type:
string:
accessMode: ReadWrite
defaultValue: ''
Usage of Device CRD
The following are the steps to
-
Create a device model in the cloud node.
kubectl apply -f <path to device model yaml>
-
Create a device instance in the cloud node.
kubectl apply -f <path to device instance yaml>
云端通过K8S API server修改设备状态
K8s API Server >>> 云端的CloudController >>>DeviceTwin >>>EventBus >>>MQTT Broker>>>Mapper(MQTT client.Publish/Subscribe)
web-demo 通过K8s api server 修改device status:
// DeviceStatus is used to patch device status
type DeviceStatus struct
Status v1alpha2.DeviceStatus `json:"status"`
// The device id of the speaker
var deviceID = "speaker-01"
// The default namespace in which the speaker device instance resides
var namespace = "default"
// The CRD client used to patch the device instance.
var crdClient *rest.RESTClient
func init()
// Create a client to talk to the K8S API server to patch the device CRDs
kubeConfig, err := utils.KubeConfig()
if err != nil
log.Fatalf("Failed to create KubeConfig, error : %v", err)
log.Println("Get kubeConfig successfully")
crdClient, err = utils.NewCRDClient(kubeConfig)
if err != nil
log.Fatalf("Failed to create device crd client , error : %v", err)
log.Println("Get crdClient successfully")
// UpdateDeviceTwinWithDesiredTrack patches the desired state of
// the device twin with the track to play.
func UpdateDeviceTwinWithDesiredTrack(track string) bool
status := buildStatusWithDesiredTrack(track)
deviceStatus := &DeviceStatusStatus: status
body, err := json.Marshal(deviceStatus)
if err != nil
log.Printf("Failed to marshal device status %v", deviceStatus)
return false
result := crdClient.Patch(utils.MergePatchType).Namespace(namespace).Resource(utils.ResourceTypeDevices).Name(deviceID).Body(body).Do(context.TODO())
if result.Error() != nil
log.Printf("Failed to patch device status %v of device %v in namespace %v \\n error:%+v", deviceStatus, deviceID, namespace, result.Error())
return false
else
log.Printf("Track [ %s ] will be played on speaker %s", track, deviceID)
return true
func buildStatusWithDesiredTrack(song string) v1alpha2.DeviceStatus
metadata := map[string]string"timestamp": strconv.FormatInt(time.Now().Unix()/1e6, 10),
"type": "string",
twins := []v1alpha2.TwinPropertyName: "track", Desired: v1alpha2.TwinPropertyValue: song, Metadata: metadata, Reported: v1alpha2.TwinPropertyValue: "unknown", Metadata: metadata
devicestatus := v1alpha2.DeviceStatusTwins: twins
return devicestatus
Mapper: 是一个用于连接和控制设备的应用。
mapper的主要功能如下:
- Scan and connect to the device.(描和连接设备)
- Report the actual state of twin-attributes of device.(广播设备真实状态的TWIN属性)
- Map the expected state of device-twin to actual state of device-twin.(将期望的设备状态映射到设备的真实状态。)
- Collect telemetry data from device.(收集设备的遥测数据)
- Convert readings from device to format accepted by KubeEdge.(将从设备读取的信息转换为kubeedge可以接受的格式)
- Schedule actions on the device.设备的调度
- Check health of the device.检查设备的健康状况
Message Topics
KubeEdge 使用 MQTT 作为 deviceTwin 和devices/apps之间的通信. EventBus can be started in multiple MQTT modes and acts as an interface for sending/receiving messages on relevant MQTT topics.
Subscribe Topics
On starting EventBus, it subscribes to these 5 topics:
1. "$hw/events/node/+/membership/get"
2. "$hw/events/device/+/state/update"
3. "$hw/events/device/+/twin/+"
4. "$hw/events/upload/#"
5. "SYS/dis/upload_records"
6. "$ke/events/+/device/data/update"
If the message is received on first 3 topics, the message is sent to deviceTwin, else the message is sent to cloud via edgeHub.
We will focus on the message expected on the first 3 topics.
-
"$hw/events/node/+/membership/get"
: This topic is used to get membership details of a node i.e the devices that are associated with the node. The response of the message is published on"$hw/events/node/+/membership/get/result"
topic. -
"$hw/events/device/+/state/update"
: This topic is used to update the state of the device. + symbol can be replaced with ID of the device whose state is to be updated. -
"$hw/events/device/+/twin/+"
: The two + symbols can be replaced by the deviceID on whose twin the operation is to be performed and any one of(update,cloud_updated,get) respectively. -
"$ke/events/device/+/data/update"
This topic is add in KubeEdge v1.4, and used for delivering time-serial data. This topic is not processed by edgecore, instead, they should be processed by third-party component on edge node such as EMQ Kuiper.
web-demo的mapper实现代码如下:
main.go主要实现了一下三个功能:
首先,读取音乐文件存入map
其次,创建MQTT客户端并连接
然后,从MQTT subscribe订阅来自云端的message,读取期望的设备状态,即拿到需要播放的音乐曲目
最后,调用树梅派上的音乐播放器,播放音乐。
package main
import (
...
)
var musicDir = "/home/pi/music/"
func main()
// Get music files
files, err := os.Open(musicDir)
...
//init MQTT client
cli := client.New(&client.Options
// Define the processing of the error handler.
ErrorHandler: func(err error)
fmt.Println(err)
,
)
fmt.Println("Create mqtt client successfully")
stopchan := make(chan int)//这里使用无缓冲的通道防止程序退出,没有数据写入通道,通道将一直阻塞
// Terminate the Client.
defer cli.Terminate()//这里有点bug,程序不会退出,也就不会执行defer函数
// Connect to the MQTT Server.
err = cli.Connect(&client.ConnectOptions
Network: "tcp",
Address: "localhost:1883",
ClientID: []byte("receive-client"),
)
if err != nil
panic(err)
fmt.Println("Connect mqtt client successfully")
err = cli.Subscribe(&client.SubscribeOptions
SubReqs: []*client.SubReq
TopicFilter: []byte(`$hw/events/device/speaker-01/twin/update/document`),
QoS: mqtt.QoS0,
// Define the processing of the message handler.
Handler: func(topicName, message []byte)
Update := &types.DeviceTwinDocument
err := json.Unmarshal(message, Update)
if err != nil
fmt.Println("Unmarshal error", err)
fmt.Printf("Unmarshal error: %v\\n", err)
cmd := exec.Command("pkill", "-9", "omxplayer")
cmd.Run()
trackToPlay := *Update.Twin["track"].CurrentState.Expected.Value
fmt.Printf("Receive expected track: %s\\n", trackToPlay)
// Stop music
if trackToPlay == "stop"
return
_, ok := m[trackToPlay]
if !ok
fmt.Printf("Could not find song %s in playlist\\n", trackToPlay)
trackToPlay = MapRandomKeyGet(m).(string)
fmt.Printf("Selected random track %s to play\\n", trackToPlay)
fmt.Printf("Playing track: %s\\n", musicDir+trackToPlay+".mp3")
cmd = exec.Command("omxplayer", "-o", "local", musicDir+trackToPlay+".mp3")
err = cmd.Run()
if err != nil
fmt.Printf("Error while playing track: %v\\n", err)
,
,
,
)
fmt.Println("Subscribe mqtt topic successfully")
<-stopchan
if err != nil
panic(err)
else
fmt.Println("Connection successfully")
func MapRandomKeyGet(mapI interface) interface
keys := reflect.ValueOf(mapI).MapKeys()
return keys[rand.Intn(len(keys))].Interface()
以上是关于kubeedge设备添加以及mapper管理的主要内容,如果未能解决你的问题,请参考以下文章