当 EMQ X 遇见一维卷积神经网络:探索 AIoT 应用新可能
Posted EMQX
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了当 EMQ X 遇见一维卷积神经网络:探索 AIoT 应用新可能相关的知识,希望对你有一定的参考价值。
提起物联网(IoT)和人工智能(AI),人们并不陌生。作为当今时代十分热门的科技概念,它们其实都与「数据」有关:IoT 解决了数据从哪里来,AI 则解决了数据去往何方、用于何处。一个将两者结合的新概念「AIoT」也应运而生:IoT 通过万物连接与通信为 AI 提供海量数据,AI 则通过对数据的不断学习与分析,将其转化为有效信息,为实际领域提供效用。
在本文中,我们将提出 AIoT 的一个简单融合应用:利用物联网消息中间件 EMQ X Broker 收集液压系统温度传感器数据,并将其转发到一维卷积神经网络 (1D CNN) ,利用这一 AI 深度学习的代表算法预测液压系统冷却器状态。
在一维卷积神经网络上,时间将被看做一个空间纬度,每个输出时间步都是利用输入序列在时间维度上的一小段得到的,为此我们可以利用该特性实现时序数据的预测。我们将使用 Python 代码模拟温度传感器时序数据,通过 MQTT 协议传输到 EMQ X Broker,并利用其灵活的规则引擎将数据转发到 webhook,依据输入的温度传感器时序数据实现当前液压系统冷却器的状态预测。
在本文中我们将使用 UCI 机器学习与智能系统中心提供的液压系统状态监测数据集,该数据集是在液压试验台上实验获得。该试验台由一级工作回路和二级冷却过滤回路组成,二级冷却过滤回路通过油箱相连。该系统周期性地重复负载循环 (60秒) ,通过改变四个液压元件(冷却器、阀门、泵和蓄能器)的状态,获取压力、体积流量和温度等过程值。
在该数据集中,TS1.txt, TS2.txt, TS3.txt, TS4.txt 分别为 4 个液压系统的冷却器温度传感器以 60 秒一个周期所获取到的温度数据,第一个周期传感器温度数据如下图:
profile.txt 第一列表示当前周期内液压系统冷却器状态
3:接近故障 (close to total failure)
20:低效率 (reduced efficiency)
100:全效率 (full efficiency)
一维卷积神经网络模型构建
-
num_sensors = 4
TIME_PERIODS = 60
BATCH_SIZE = 16
EPOCHS = 10
model_m = Sequential()
model_m.add(Conv1D(100, 6, activation='relu', input_shape=(TIME_PERIODS, num_sensors)))
model_m.add(Conv1D(100, 6, activation='relu'))
model_m.add(MaxPooling1D(3))
model_m.add(Conv1D(160, 6, activation='relu'))
model_m.add(Conv1D(160, 6, activation='relu'))
model_m.add(GlobalAveragePooling1D(name='G_A_P_1D'))
model_m.add(Dropout(0.5))
model_m.add(Dense(3, activation='softmax'))
print(model_m.summary())
model_m.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
history = model_m.fit(X_train, y_train, batch_size=BATCH_SIZE, epochs=EPOCHS, validation_split=0.2, verbose=2)
模型分类指标的报告
从报告中可以看出通过温度数据预测冷却器状态 3 (接近故障),20 (低效率),100 (全效率) 准确率分别为 95%,80%,89%
模拟数据输入
在本文中我们将模拟生产环境下冷却器温度传感器数据上报,为此我们将使用 Python 代码读取数据集中温度数据,并通过 MQTT 协议上报到 EMQ X Broker。
在下面代码中我们首先使用 pandas 读取数据集中温度数据 ('TS1.txt', 'TS2.txt', 'TS3.txt', 'TS4.txt'),并对数据做简单处理,然后将数据每秒上报到 EMQ X Broker。
import json
import time
import pandas as pd
from paho.mqtt import client as mqtt_client
broker = '127.0.0.1'
port = 1883
topic = "/1dcnn"
client_id = f'1dcnn-client'
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.on_connect = on_connect
client.connect(broker, port)
return client
def load_data():
names = ['TS1.txt', 'TS2.txt', 'TS3.txt', 'TS4.txt']
df = pd.DataFrame()
for name in names:
data_file = f'./dataset/{name}'
read_df = pd.read_csv(data_file, sep='\t', header=None)
df = df.append(read_df)
df = df.sort_index()
df_values = df.values
df = df_values.reshape(-1, 4, len(df.columns))
data = df.transpose(0, 2, 1)
return data
def publish(client):
data = load_data()
for x_data in data[-10:]:
for y_data in x_data:
t_1, t_2, t_3, t_4 = tuple(y_data)
msg = {
't1': round(t_1, 3),
't2': round(t_2, 3),
't3': round(t_3, 3),
't4': round(t_4, 3)
}
time.sleep(1)
result = client.publish(topic, json.dumps(msg))
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
def run():
client = connect_mqtt()
client.loop_start()
publish(client)
if __name__ == '__main__':
run()
故障预测
在本文中我们将使用 EMQ X Broker 规则引擎将温度传感器数据转发到 webhook,并通过温度传感器采集的数据,实现对冷却器状态预测。
Webhook 代码编写
import asyncio
import json
import numpy as np
import uvicorn
from keras.models import load_model
from sklearn.preprocessing import StandardScaler
from starlette.applications import Starlette
from starlette.background import BackgroundTask
from starlette.responses import JSONResponse
app = Starlette()
queue = asyncio.Queue()
model = load_model('./1d-cnn.h5')
async def on_startup():
print('startup webhook')
async def webhook(request):
request_dict = await request.json()
payload = request_dict['payload']
data = json.loads(payload)
values = list(data.values())
if queue.qsize() == 60:
items = clear_queue(queue)
task = BackgroundTask(predictive, data=items)
else:
task = None
queue.put_nowait(values)
record = {'status': 'success'}
return JSONResponse(record, status_code=201, background=task)
async def predictive(data):
y_label = {
0: 3,
1: 20,
2: 100
}
y_status = {
3: 'close to total failure',
20: 'reduced efficiency',
100: 'full efficiency'
}
x_test = np.array(data)
scaler = StandardScaler()
x_test = scaler.fit_transform(x_test.reshape(-1, x_test.shape[-1])).reshape(x_test.shape)
x_test = x_test.reshape(-1, x_test.shape[0], x_test.shape[1])
results = model.predict(x_test)
msg = "Current cooler state probability: "
for i, probability in enumerate(results[0]):
status = y_status[y_label[i]]
msg += f"{probability * 100:.2f}% {status}({y_label[i]}), "
print(msg)
def clear_queue(q):
items = []
while not q.empty():
items.append(q.get_nowait())
return items
if __name__ == '__main__':
uvicorn.run(
app,
host='127.0.0.1',
port=8080,
loop='uvloop',
log_level='warning'
)
2. EMQ X Broker 资源创建
访问 EMQ X Dashboard,登录用户名和密码为 admin, public,点击左侧菜单栏规则 -> 资源,创建资源。
3. EMQ X Broker 规则创建
启动 Webhook
python3 webhook.py
启动 EMQ X Broker
./bin/emqx start
模拟数据输入
python publish.py
查看液压系统冷却器状态预测结果
从下图中我们可以看出前五个周期内,通过输入传感器温度预测出当前冷却器状态为接近故障(close to total failure),这与数据集中给出的冷却器状态一致
分别调整输入数据,查看不同温度下冷却器状态预测结果,并和数据集中实验结果做对比
输入前十个周期内温度传感器数据,查看预测结果并与实验台收集的结果做对比
修改 publish.py 文件中: for x_data in data: -> for x_data in data[:10]:
从上图中我们可以看到预测结果与试验台收集结果一致
选择数据集中状态为 3 接近故障(close to total failure),20 低效率(reduced efficiency) 的数据作为输入,查看预测结果并与实验台收集的结果做对比
修改 publish.py 文件中: for x_data in data: -> for x_data in data[728:737]:
从上图中我们可以看到预测结果与实验台收集结果有一定误差,这也验证了模型分类指标的报告中预测准确性概率
输入后十个周期内温度传感器数据,查看预测结果并与实验台收集的结果做对比
修改 publish.py 文件中: for x_data in data: -> for x_data in data[-10:]:
从上图中我们可以看到预测结果与试验台收集结果大致一致,但还是存在一定偏差
至此我们实现了传感器数据上报,利用 EMQ X Broker 规则引擎实现数据转发,并使用一维卷积神经网络 (1D CNN) 实现了液压系统冷却器故障预测。
您可以通过以下方式了解 EMQ X 系列产品:
点击"阅读原文" ,了解更多。
↓↓↓
以上是关于当 EMQ X 遇见一维卷积神经网络:探索 AIoT 应用新可能的主要内容,如果未能解决你的问题,请参考以下文章