如何使用分布式 Dask 和预训练的 Keras 模型进行模型预测?
Posted
技术标签:
【中文标题】如何使用分布式 Dask 和预训练的 Keras 模型进行模型预测?【英文标题】:How To Do Model Predict Using Distributed Dask With a Pre-Trained Keras Model? 【发布时间】:2020-09-07 12:16:50 【问题描述】:我正在加载我预训练的 keras 模型,然后尝试使用 dask 并行化大量输入数据?不幸的是,我遇到了一些与我如何创建我的 dask 数组有关的问题。任何指导将不胜感激!
设置:
首先我从这个 repo 克隆 https://github.com/sanchit2843/dlworkshop.git
可重现的代码示例:
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.model_selection import train_test_split
from keras.models import load_model
import keras
from keras.models import Sequential
from keras.layers import Dense
from dask.distributed import Client
import warnings
import dask.array as DaskArray
warnings.filterwarnings('ignore')
dataset = pd.read_csv('data/train.csv')
X = dataset.drop(['price_range'], axis=1).values
y = dataset[['price_range']].values
# scale data
sc = StandardScaler()
X = sc.fit_transform(X)
ohe = OneHotEncoder()
y = ohe.fit_transform(y).toarray()
X_train,X_test,y_train,y_test = train_test_split(X,y,test_size = 0.2)
# Neural network
model = Sequential()
model.add(Dense(16, input_dim=20, activation="relu"))
model.add(Dense(12, activation="relu"))
model.add(Dense(4, activation="softmax"))
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
model.fit(X_train, y_train, epochs=100, batch_size=64)
# Use dask
client = Client()
def load_and_predict(input_data_chunk):
def contrastive_loss(y_true, y_pred):
margin = 1
square_pred = K.square(y_pred)
margin_square = K.square(K.maximum(margin - y_pred, 0))
return K.mean(y_true * square_pred + (1 - y_true) * margin_square)
mlflow.set_tracking_uri('<uri>')
mlflow.set_experiment('clean_parties_ml')
runs = mlflow.search_runs()
artifact_uri = runs.loc[runs['start_time'].idxmax()]['artifact_uri']
model = mlflow.keras.load_model(artifact_uri + '/model', custom_objects='contrastive_loss': contrastive_loss)
y_pred = model.predict(input_data_chunk)
return y_pred
da_input_data = da.from_array(X_test, chunks=(100, None))
prediction_results = da_input_data.map_blocks(load_and_predict, dtype=X_test.dtype).compute()
我收到的错误:
AttributeError: '_thread._local' object has no attribute 'value'
【问题讨论】:
【参考方案1】:Keras/Tensorflow 不能很好地与其他线程系统配合使用。这里有一个关于这个主题的持续问题:https://github.com/dask/dask-examples/issues/35
【讨论】:
以上是关于如何使用分布式 Dask 和预训练的 Keras 模型进行模型预测?的主要内容,如果未能解决你的问题,请参考以下文章