如何使用 mlflow.pyfunc.log_model() 通过 Keras 步骤记录 sklearn 管道?类型错误:无法腌制 _thread.RLock 对象

Posted

技术标签:

【中文标题】如何使用 mlflow.pyfunc.log_model() 通过 Keras 步骤记录 sklearn 管道?类型错误:无法腌制 _thread.RLock 对象【英文标题】:How to log a sklearn pipeline with a Keras step using mlflow.pyfunc.log_model()? TypeError: can't pickle _thread.RLock objects 【发布时间】:2021-02-15 02:59:38 【问题描述】:

我想通过 Keras 步骤登录到 MlFlow sklearn 管道。

管道有 2 个步骤:sklearn StandardScale 和 Keras TensorFlow 模型。

我正在使用 mlflow.pyfunc.log_model() 作为可能的解决方案,但我遇到了这个错误:

TypeError: can't pickle _thread.RLock objects
--->   mlflow.pyfunc.log_model("test1", python_model=wrappedModel, signature=signature)

这是我的代码:

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import keras
from keras import layers, Input
from keras.wrappers.scikit_learn import KerasRegressor
import mlflow.pyfunc
from sklearn.pipeline import Pipeline
from mlflow.models.signature import infer_signature

#toy dataframe
df1 = pd.DataFrame([[1,2,3,4,5,6], [10,20,30,40,50,60],[100,200,300,400,500,600]] )

#create train test datasets
X_train, X_test = train_test_split(df1, random_state=42, shuffle=True)

#scale X_train
scaler = StandardScaler()
X_train_s = scaler.fit_transform(X_train)
X_train_s = pd.DataFrame(X_train_s)

#wrap the keras model to use it inside of sklearn pipeline
def create_model(optimizer='adam', loss='mean_squared_error', s = X_train.shape[1]):
  input_layer = keras.Input(shape=(s,))
  # "encoded" is the encoded representation of the input
  encoded = layers.Dense(25, activation='relu')(input_layer)
  encoded = layers.Dense(2, activation='relu')(encoded)

  # "decoded" is the lossy reconstruction of the input
  decoded = layers.Dense(2, activation='relu')(encoded)
  decoded = layers.Dense(25, activation='relu')(encoded)
  decoded = layers.Dense(s, activation='linear')(decoded)
  
  model = keras.Model(input_layer, decoded)
  model.compile(optimizer, loss)
  return model

# wrap the model
model = KerasRegressor(build_fn=create_model, verbose=1)

# create the pipeline
pipe = Pipeline(steps=[
    ('scale', StandardScaler()),
    ('model',model)
])

#function to wrap the pipeline to be logged by mlflow
class SklearnModelWrapper(mlflow.pyfunc.PythonModel):
  def __init__(self, model):
    self.model = model
    
  def predict(self, context, model_input):
    return self.model.predict(model_input)[:,1]
  
  
mlflow.end_run()
with mlflow.start_run(run_name='test1'):

  #train the pipeline
  pipe.fit(X_train, X_train_s, model__epochs=2)
  
  #wrap the model for mlflow log
  wrappedModel = SklearnModelWrapper(pipe)

  # Log the model with a signature that defines the schema of the model's inputs and outputs. 
  signature = infer_signature(X_train, wrappedModel.predict(None, X_train))
  mlflow.pyfunc.log_model("test1", python_model=wrappedModel, signature=signature)
  

从我的谷歌搜索来看,这种类型的错误似乎与线程的并发性有关。然后它可能与 TensorFlow 相关,因为它在模型训练阶段分发代码。

但是,有问题的代码行是在训练阶段之后。如果我删除这一行,其余代码就可以工作,这让我认为它发生在模型训练的并发阶段之后。我不知道为什么在这种情况下会收到此错误。 我是初学者?有人可以帮帮我吗? 谢谢

【问题讨论】:

您找到解决方案了吗?我遇到了同样的错误! 【参考方案1】:

python_model=wrappedModel 中应该是python_model=SklearnModelWrapper() 我觉得

【讨论】:

以上是关于如何使用 mlflow.pyfunc.log_model() 通过 Keras 步骤记录 sklearn 管道?类型错误:无法腌制 _thread.RLock 对象的主要内容,如果未能解决你的问题,请参考以下文章

如何使用本机反应创建登录以及如何验证会话

如何在自动布局中使用约束标识符以及如何使用标识符更改约束? [迅速]

如何使用 AngularJS 的 ng-model 创建一个数组以及如何使用 jquery 提交?

如何使用laravel保存所有行数据每个行名或相等

如何使用 Math.Net 连接矩阵。如何使用 Math.Net 调用特定的行或列?

WSARecv 如何使用 lpOverlapped?如何手动发出事件信号?