如何将您安装的变压器保存到 blob 中,以便您的预测管道可以在 AML 服务中使用它?
Posted
技术标签:
【中文标题】如何将您安装的变压器保存到 blob 中,以便您的预测管道可以在 AML 服务中使用它?【英文标题】:How to save your fitted transformer into blob, so your prediction pipeline can use it in AML Service? 【发布时间】:2019-10-26 18:03:43 【问题描述】:我正在 Azure 机器学习服务上构建数据转换和训练管道。我想将我安装的变压器(例如 tf-idf)保存到 blob,以便我的预测管道稍后可以访问它。
transformed_data = PipelineData("transformed_data",
datastore = default_datastore,
output_path_on_compute="my_project/tfidf")
step_tfidf = PythonScriptStep(name = "tfidf_step",
script_name = "transform.py",
arguments = ['--input_data', blob_train_data,
'--output_folder', transformed_data],
inputs = [blob_train_data],
outputs = [transformed_data],
compute_target = aml_compute,
source_directory = project_folder,
runconfig = run_config,
allow_reuse = False)
以上代码将转换器保存到当前运行的文件夹中,该文件夹在每次运行期间动态生成。
我想将转换器保存到 blob 上的固定位置,以便稍后在调用预测管道时访问它。
我尝试使用DataReference
类的实例作为PythonScriptStep
输出,但它会导致错误:
ValueError: Unexpected output type: <class 'azureml.data.data_reference.DataReference'>
这是因为PythonScriptStep
只接受PipelineData
或OutputPortBinding
对象作为输出。
我怎样才能保存我安装的变压器,以便以后可以通过任何任意过程(例如我的预测管道)访问它?
【问题讨论】:
【参考方案1】:另一种解决方案是将DataReference
作为输入传递给您的PythonScriptStep
。
然后在transform.py
中,您可以将DataReference
作为命令行参数读取。
您可以解析它并将其用作任何常规路径来保存矢量化器。
例如你可以:
step_tfidf = PythonScriptStep(name = "tfidf_step",
script_name = "transform.py",
arguments = ['--input_data', blob_train_data,
'--output_folder', transformed_data,
'--transformer_path', trained_transformer_path],
inputs = [blob_train_data, trained_transformer_path],
outputs = [transformed_data],
compute_target = aml_compute,
source_directory = project_folder,
runconfig = run_config,
allow_reuse = False)
然后在你的脚本中(上面例子中的transform.py
)你可以例如:
import argparse
import joblib as jbl
import os
from sklearn.feature_extraction.text import TfidfVectorizer
parser = argparse.ArgumentParser()
parser.add_argument('--transformer_path', dest="transformer_path", required=True)
args = parser.parse_args()
tfidf = ### HERE CREATE AND TRAIN YOUR VECTORIZER ###
vect_filename = os.path.join(args.transformer_path, 'my_vectorizer.jbl')
EXTRA:第三种方法是将矢量化器注册为工作区中的另一个模型。然后,您可以像使用任何其他注册模型一样使用它。 (尽管此选项不涉及显式写入 blob - 如上述问题中所述)
【讨论】:
嗨@PythoLove,上述方法对我有用。您遇到了什么错误?【参考方案2】:另一个选项是使用DataTransferStep
并将输出复制到“已知位置”。 This notebook 提供了使用 DataTransferStep 从各种受支持的数据存储复制数据的示例。
from azureml.data.data_reference import DataReference
from azureml.exceptions import ComputeTargetException
from azureml.core.compute import ComputeTarget, DataFactoryCompute
from azureml.pipeline.steps import DataTransferStep
blob_datastore = Datastore.get(ws, "workspaceblobstore")
blob_data_ref = DataReference(
datastore=blob_datastore,
data_reference_name="knownloaction",
path_on_datastore="knownloaction")
data_factory_name = 'adftest'
def get_or_create_data_factory(workspace, factory_name):
try:
return DataFactoryCompute(workspace, factory_name)
except ComputeTargetException as e:
if 'ComputeTargetNotFound' in e.message:
print('Data factory not found, creating...')
provisioning_config = DataFactoryCompute.provisioning_configuration()
data_factory = ComputeTarget.create(workspace, factory_name, provisioning_config)
data_factory.wait_for_completion()
return data_factory
else:
raise e
data_factory_compute = get_or_create_data_factory(ws, data_factory_name)
# Assuming output data is your output from the step that you want to copy
transfer_to_known_location = DataTransferStep(
name="transfer_to_known_location",
source_data_reference=[output_data],
destination_data_reference=blob_data_ref,
compute_target=data_factory_compute
)
from azureml.pipeline.core import Pipeline
from azureml.core import Workspace, Experiment
pipeline_01 = Pipeline(
description="transfer_to_known_location",
workspace=ws,
steps=[transfer_to_known_location])
pipeline_run_01 = Experiment(ws, "transfer_to_known_location").submit(pipeline_01)
pipeline_run_01.wait_for_completion()
【讨论】:
【参考方案3】:这可能不够灵活,无法满足您的需求(另外,我还没有对此进行测试),但如果您使用 scikit-learn,一种可能性是将 tf-idf/transformation 步骤包含在 scikit-learn @ 987654322@ 对象并将其注册到您的工作区。
因此,您的训练脚本将包含:
pipeline = Pipeline([
('vectorizer', TfidfVectorizer(stop_words = list(text.ENGLISH_STOP_WORDS))),
('classifier', SGDClassifier()
])
pipeline.fit(train[label].values, train[pred_label].values)
# Serialize the pipeline
joblib.dump(value=pipeline, filename='outputs/model.pkl')
您的实验提交脚本将包含
run = exp.submit(src)
run.wait_for_completion(show_output = True)
model = run.register_model(model_name='my_pipeline', model_path='outputs/model.pkl')
然后,您可以使用注册的“模型”并将其部署为explained in the documentation 的服务,方法是将其加载到评分脚本中
model_path = Model.get_model_path('my_pipeline')
# deserialize the model file back into a sklearn model
model = joblib.load(model_path)
但是,这会在您的管道中进行转换,因此不会像您要求的那样模块化...
【讨论】:
谢谢你,@Davide。我认为这是个好主意!去测试一下。以上是关于如何将您安装的变压器保存到 blob 中,以便您的预测管道可以在 AML 服务中使用它?的主要内容,如果未能解决你的问题,请参考以下文章