前端开发想了解机器学习?用一台Mac就可以
Posted 淘系技术
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了前端开发想了解机器学习?用一台Mac就可以相关的知识,希望对你有一定的参考价值。
如果想了解用机器学习是怎么解决实际问题的,可以看这篇: 从问题定义、算法选型、样本准备、模型训练、模型评估、模型服务部署、到模型应用都有介绍。
环境准备
▐ 安装 Anaconda
source ~/.bashrc
cat ~/.bashrc
conda list
▐ 安装相关依赖
pip install keras
pip install tensorflow
pip install opencv-python
样本准备
模型训练
▐ 开发训练逻辑
.
├── CNN_net.py
├── dataset
├── nn_train.py
└── utils_paths.py
# nn_train.py
from CNN_net import SimpleVGGNet
from sklearn.preprocessing import LabelBinarizer
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from keras.optimizers import SGD
from keras.preprocessing.image import ImageDataGenerator
import utils_paths
import matplotlib.pyplot as plt
from cv2 import cv2
import numpy as np
import argparse
import random
import pickle
import os
# 读取数据和标签
print("------开始读取数据------")
data = []
labels = []
# 拿到图像数据路径,方便后续读取
imagePaths = sorted(list(utils_paths.list_images('./dataset')))
random.seed(42)
random.shuffle(imagePaths)
image_size = 256
# 遍历读取数据
for imagePath in imagePaths:
# 读取图像数据
image = cv2.imread(imagePath)
image = cv2.resize(image, (image_size, image_size))
data.append(image)
# 读取标签
label = imagePath.split(os.path.sep)[-2]
labels.append(label)
data = np.array(data, dtype="float") / 255.0
labels = np.array(labels)
# 数据集切分
(trainX, testX, trainY, testY) = train_test_split(data,labels, test_size=0.25, random_state=42)
# 转换标签为one-hot encoding格式
lb = LabelBinarizer()
trainY = lb.fit_transform(trainY)
testY = lb.transform(testY)
# 数据增强处理
aug = ImageDataGenerator(
rotation_range=30,
width_shift_range=0.1,
height_shift_range=0.1,
shear_range=0.2,
zoom_range=0.2,
horizontal_flip=True,
fill_mode="nearest")
# 建立卷积神经网络
model = SimpleVGGNet.build(width=256, height=256, depth=3,classes=len(lb.classes_))
# 设置初始化超参数
# 学习率
INIT_LR = 0.01
# Epoch
# 这里设置 5 是为了能尽快训练完毕,可以设置高一点,比如 30
EPOCHS = 5
# Batch Size
BS = 32
# 损失函数,编译模型
print("------开始训练网络------")
opt = SGD(lr=INIT_LR, decay=INIT_LR / EPOCHS)
model.compile(loss="categorical_crossentropy", optimizer=opt,metrics=["accuracy"])
# 训练网络模型
H = model.fit_generator(
aug.flow(trainX, trainY, batch_size=BS),
validation_data=(testX, testY),
steps_per_epoch=len(trainX) // BS,
epochs=EPOCHS
)
# 测试
print("------测试网络------")
predictions = model.predict(testX, batch_size=32)
print(classification_report(testY.argmax(axis=1), predictions.argmax(axis=1), target_names=lb.classes_))
# 绘制结果曲线
N = np.arange(0, EPOCHS)
plt.style.use("ggplot")
plt.figure()
plt.plot(N, H.history["loss"], label="train_loss")
plt.plot(N, H.history["val_loss"], label="val_loss")
plt.plot(N, H.history["accuracy"], label="train_acc")
plt.plot(N, H.history["val_accuracy"], label="val_acc")
plt.title("Training Loss and Accuracy")
plt.xlabel("Epoch #")
plt.ylabel("Loss/Accuracy")
plt.legend()
plt.savefig('./output/cnn_plot.png')
# 保存模型
print("------保存模型------")
model.save('./cnn.model.h5')
f = open('./cnn_lb.pickle', "wb")
f.write(pickle.dumps(lb))
f.close()
▐ 开始训练
python nn_train.py
模型评估
python predict.py
# predict.py
import allspark
import io
import numpy as np
import json
from PIL import Image
import requests
import threading
import cv2
import os
import tensorflow as tf
from tensorflow.keras.models import load_model
import time
model = load_model('./train/cnn.model.h5')
# pred的输入应该是一个images的数组,而且图片都已经转为numpy数组的形式
# pred = model.predict(['./validation/button/button-demoplus-20200216-16615.png'])
#这个顺序一定要与label.json顺序相同,模型输出是一个数组,取最大值索引为预测值
Label = [
"button",
"keyboard",
"searchbar",
"switch"
]
testPath = "./test/button.png"
images = []
image = cv2.imread(testPath)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
image = cv2.resize(image,(256,256))
images.append(image)
images = np.asarray(images)
pred = model.predict(images)
print(pred)
max_ = np.argmax(pred)
print('预测结果为:',Label[max_])
模型服务部署
▐ 开发模型服务
# 模型服务 app.py
import allspark
import io
import numpy as np
import json
from PIL import Image
import requests
import threading
import cv2
import tensorflow as tf
from tensorflow.keras.models import load_model
with open('label.json') as f:
mp = json.load(f)
labels = {value:key for key,value in mp.items()}
def create_opencv_image_from_stringio(img_stream, cv2_img_flag=-1):
img_stream.seek(0)
img_array = np.asarray(bytearray(img_stream.read()), dtype=np.uint8)
image_temp = cv2.imdecode(img_array, cv2_img_flag)
if image_temp.shape[2] == 4:
image_channel3 = cv2.cvtColor(image_temp, cv2.COLOR_BGRA2BGR)
image_mask = image_temp[:,:,3] #.reshape(image_temp.shape[0],image_temp.shape[1], 1)
image_mask = np.stack((image_mask, image_mask, image_mask), axis = 2)
index_mask = np.where(image_mask == 0)
image_channel3[index_mask[0], index_mask[1], index_mask[2]] = 255
return image_channel3
else:
return image_temp
def get_string_io(origin_path):
r = requests.get(origin_path, timeout=2)
stringIo_content = io.BytesIO(r.content)
return stringIo_content
def handleReturn(pred, percent, msg_length):
result = {
"content":[]
}
argm = np.argsort(-pred, axis = 1)
for i in range(msg_length):
label = labels[argm[i, 0]]
index = argm[i, 0]
if(pred[i, index] > percent):
confident = True
else:
confident = False
result['content'].append({'isConfident': confident, 'label': label})
return result
def process(msg, model):
msg_dict = json.loads(msg)
percent = msg_dict['threshold']
msg_dict = msg_dict['images']
msg_length = len(msg_dict)
desire_size = 256
images = []
for i in range(msg_length):
image_temp = create_opencv_image_from_stringio(get_string_io(msg_dict[i]))
image_temp = cv2.cvtColor(image_temp, cv2.COLOR_BGR2RGB)
image = cv2.resize(image_temp, (256, 256))
images.append(image)
images = np.asarray(images)
pred = model.predict(images)
return bytes(json.dumps(handleReturn(pred, percent, msg_length)) ,'utf-8')
def worker(srv, thread_id, model):
while True:
msg = srv.read()
try:
rsp = process(msg, model)
srv.write(rsp)
except Exception as e:
srv.error(500,bytes('invalid data format', 'utf-8'))
if __name__ == '__main__':
desire_size = 256
model = load_model('./cnn.model.h5')
context = allspark.Context(4)
queued = context.queued_service()
workers = []
for i in range(10):
t = threading.Thread(target=worker, args=(queued, i, model))
t.setDaemon(True)
t.start()
workers.append(t)
for t in workers:
t.join()
▐ 部署模型服务
.
├── app.py
├── cnn.model.h5
└── label.json
✎ 安装环境
安装 Docker
用 Homebrew 安装 需要先现状 Homebrew: https://brew.sh
brew cask install docker
创建 anaconda 的虚拟环境
# 使用conda创建python环境,目录需指定固定名字:ENV
$ conda create -p ENV python=3.7
# 安装EAS python sdk
$ ENV/bin/pip install http://eas-data.oss-cn-shanghai.aliyuncs.com/sdk/allspark-0.9-py2.py3-none-any.whl
# 安装其它依赖包
$ ENV/bin/pip install tensorflow keras opencv-python
# 激活虚拟环境
$ conda activate ./ENV
# 退出虚拟环境(不使用时)
$ conda deactivate
运行 Docker 环境
sudo docker run -ti -v /Users/chang/Desktop/ml-test/deploy-project:/home -p 8080:8080
registry.cn-shanghai.aliyuncs.com/eas/eas-python-base-image:py3.6-allspark-0.8
✎ 本地部署
cd /home
./ENV/bin/python app.py
curl -X POST 'localhost:8080/predict'
-H 'Content-Type: application/json'
-d '{
"images": ["https://img.alicdn.com/tfs/TB1W8K2MeH2gK0jSZJnXXaT1FXa-638-430.png"],
"threshold": 0.5
}'
{"content": [{"isConfident": true, "label": "keyboard"}]}
完整代码
# 1、训练模型
$ cd train-project
$ python nn_train.py
# 生成模型文件:cnn.model.h5
# 2、将模型文件拷贝到 deploy-project 中,部署模型服务
# 先安装模型服务运行环境
$ conda activate ./ENV
$ sudo docker run -ti -v /Users/chang/Desktop/ml-test/deploy-project:/home -p 8080:8080 registry.cn-shanghai.aliyuncs.com/eas/eas-python-base-image:py3.6-allspark-0.8
$ cd /home
$ ./ENV/bin/python app.py
# 得到模型服务 API:localhost:8080/predict
# 3、访问模型服务
curl -X POST 'localhost:8080/predict'
-H 'Content-Type: application/json'
-d '{
"images": ["https://img.alicdn.com/tfs/TB1W8K2MeH2gK0jSZJnXXaT1FXa-638-430.png"],
"threshold": 0.5
}'
最后
以上是关于前端开发想了解机器学习?用一台Mac就可以的主要内容,如果未能解决你的问题,请参考以下文章