如何将 AzureDatafactory 中的 DataPath PipelineParameter 传递给 AzureMachineLearningExecutePipeline Activity?

Posted

技术标签:

【中文标题】如何将 AzureDatafactory 中的 DataPath PipelineParameter 传递给 AzureMachineLearningExecutePipeline Activity?【英文标题】:How to pass a DataPath PipelineParameter from AzureDatafactory to AzureMachineLearningExecutePipeline Activity? 【发布时间】:2020-12-23 01:59:14 【问题描述】:

我正在尝试从 Blob Storage 读取文件,加载到 pandas 并将其写入 BlobStorage

我有一个带有 PythonScriptStep 的 Azure 机器学习管道,它采用 2 个 PipelineParameters 并且是如下的 DataPath。

from azureml.core import Datastore
from azureml.data.datapath import DataPath, DataPathComputeBinding, DataReference
from azureml.pipeline.core import PipelineParameter

datastore = Datastore(ws, "SampleStore")
in_raw_path_default = 'somefolder/raw/alerts/2020/08/03/default_in.csv'
in_cleaned_path_default= 'somefolder/cleaned/alerts/2020/08/03/default_out.csv'

in_raw_datapath = DataPath(datastore=datastore, path_on_datastore=in_raw_path_default)
in_raw_path_pipelineparam = PipelineParameter(name="inrawpath", default_value=in_raw_datapath)
raw_datapath_input = (in_raw_path_pipelineparam, DataPathComputeBinding(mode='mount'))

in_cleaned_datapath = DataPath(datastore=datastore, path_on_datastore=in_cleaned_path_default)
in_cleaned_path_pipelineparam = PipelineParameter(name="incleanedpath", default_value=in_cleaned_datapath)
cleaned_datapath_input = (in_cleaned_path_pipelineparam, DataPathComputeBinding(mode='mount'))

from azureml.pipeline.steps import PythonScriptStep

source_directory = script_folder + '/pipeline_Steps'
dataprep_step = PythonScriptStep(
    script_name="SimpleTest.py", 
    arguments=["--input_data", raw_datapath_input, "--cleaned_data", cleaned_datapath_input],
    inputs=[raw_datapath_input, cleaned_datapath_input],    
    compute_target=default_compute, 
    source_directory=source_directory,
    runconfig=run_config,
    allow_reuse=True
)

from azureml.pipeline.core import Pipeline
pipeline_test = Pipeline(workspace=ws, steps=[dataprep_step])

test_raw_path = DataPath(datastore=datastore, path_on_datastore='samplefolder/raw/alerts/2017/05/31/test.csv')
test_cleaned_path = DataPath(datastore=datastore, path_on_datastore='samplefolder/cleaned/alerts/2020/09/03')
pipeline_run_msalerts = Experiment(ws, 'SampleExperiment').submit(pipeline_test, pipeline_parameters="inrawpath"  : test_raw_path,
                                                                                                        "incleanedpath" : test_cleaned_path)```

这是使用的脚本(SimpleTest.py):

import os
import sys
import argparse
import pathlib
import azureml.core
import pandas as pd

parser = argparse.ArgumentParser("datapreponly")
parser.add_argument("--input_data", type=str)
parser.add_argument("--cleaned_data", type=str)

args = parser.parse_args()

print("Argument 1: %s" % args.input_data)
print("Argument 2: %s" % args.cleaned_data)

testDf = pd.read_csv(args.input_data, error_bad_lines=False)
print('Total Data Shape' + str(testDf.shape))

if not (args.cleaned_data is None):
    output_path = args.cleaned_data
    os.makedirs(output_path, exist_ok=True)
    outdatapath = output_path + '/alert.csv'    
    testDf.to_csv(outdatapath, index=False)

从 AzureDataFactory 触发此 AzureMLipeline: 通过在 AzureMLWorkspace/PipelineSDK 中执行 ML 管道,上述代码可以正常工作。我正在尝试从 AzureDataFactory(AzureMachineLearningExecutePipeline) 活动触发 AzureMLpipeline,如下所示

通过传递 2 个字符串输入路径尝试如下调试运行 rawdatapath = "samplefolder/raw/alerts/2017/05/31/test.csv" cleandatapath = "samplefolder/raw/cleaned/2020/09/03/"

Current directory:  /mnt/batch/tasks/shared/LS_root/jobs/myazuremlworkspace/azureml/d8ee11ea-5838-46e5-a8ce-da2fbff5aade/mounts/workspaceblobstore/azureml/d8ee11ea-5838-46e5-a8ce-da2fbff5aade
Preparing to call script [ SimpleTest.py ] 
with arguments:
 ['--input_data', '/mnt/batch/tasks/shared/LS_root/jobs/myazuremlworkspace/azureml/d8ee11ea-5838-46e5-a8ce-da2fbff5aade/mounts/SampleStore/somefolder/raw/alerts/2020/08/03/default_in.csv',
 '--cleaned_data', '/mnt/batch/tasks/shared/LS_root/jobs/myazuremlworkspace/azureml/d8ee11ea-5838-46e5-a8ce-da2fbff5aade/mounts/SampleStore/somefolder/cleaned/alerts/2020/08/03/default_out.csv']
After variable expansion, calling script [ SimpleTest.py ] with arguments:
 ['--input_data', '/mnt/batch/tasks/shared/LS_root/jobs/myazuremlworkspace/azureml/d8ee11ea-5838-46e5-a8ce-da2fbff5aade/mounts/SampleStore/somefolder/raw/alerts/2020/08/03/default_in.csv',
 '--cleaned_data', '/mnt/batch/tasks/shared/LS_root/jobs/myazuremlworkspace/azureml/d8ee11ea-5838-46e5-a8ce-da2fbff5aade/mounts/SampleStore/somefolder/cleaned/alerts/2020/08/03/default_out.csv']

Script type = None
Argument 1: /mnt/batch/tasks/shared/LS_root/jobs/myazuremlworkspace/azureml/d8ee11ea-5838-46e5-a8ce-da2fbff5aade/mounts/SampleStore/somefolder/raw/alerts/2020/08/03/default_in.csv
Argument 2: /mnt/batch/tasks/shared/LS_root/jobs/myazuremlworkspace/azureml/d8ee11ea-5838-46e5-a8ce-da2fbff5aade/mounts/SampleStore/somefolder/cleaned/alerts/2020/08/03/default_out.csv
.......................
FileNotFoundError: [Errno 2] No such file or directory: '/mnt/batch/tasks/shared/LS_root/jobs/myazuremlworkspace/azureml/d8ee11ea-5838-46e5-a8ce-da2fbff5aade/mounts/SampleStore/somefolder/raw/alerts/2020/08/03/default_in.csv'

它表明采用默认路径而不是管道参数(没有这样的文件或目录错误不太重要,因为要点是采用默认路径而不是管道参数)。我怀疑它是因为将管道参数作为字符串而不是数据路径传递。最后的问题:如何将数据路径从 Azure 数据工厂传递给 AzureMLipelineActivity?

谢谢。

【问题讨论】:

【参考方案1】:

输入参数好像被定义为字符串,请尝试将它们修改为对象数据类型。根据documentation,它需要对象 "Key" : "value" 参数。

【讨论】:

嘿@KranthiPakala-MSFT,感谢您的回复。您能否澄清数据类型必须是对象的位置? 1.“SimpleTest.py”中的参数应该是str还是object? 2. Azureml管道应该有什么变化吗? 3. 在 ADF AzureMLipeline 活动中,尝试将路径作为对象传递,但 AzureML 管道仍然选择默认值而不是传递的参数。我应该使用 getmetada 活动来创建对象还是仅使用字符串作为对象?感谢您的澄清【参考方案2】:

This notebook 演示了在 AML Pipeline 中使用 DataPathPipelineParameters。您将了解如何将字符串和DataPath 参数化并通过PipelineParameters 提交给AML Pipelines。您可以参数化输入数据集,这里是 sample 笔记本,它展示了如何做到这一点。

目前,ParallelRunStep 接受数据集作为数据输入。您可以在ParallelRunStep 之前再添加一个步骤来创建指向新数据的数据集对象并传递给ParallelRunStep。这是使用多个步骤的an example:

对于输出,如果使用append_row输出动作,可以通过append_row_file_nameconfig自定义输出文件名。输出将存储在默认 blob 中。要将其移至其他商店,我们建议在ParallelRunStep 之后使用另一个DataTransferStep。 请关注this example进行数据传输步骤:

【讨论】:

可能是我不太了解,当我触发注册管道时,如何将 path_on_datastore 动态传递给 DataPath 参数?请澄清,谢谢。 如果您按照上面的示例,管道将有一个输入管道参数,您可以在每次触发管道时插入数据存储名称和路径。【参考方案3】:

得到了微软的答复(请参阅此线程here)。 Azure 数据工厂产品团队确认,目前 Azure 数据工厂 (ADF) 中的“DataPath”参数不支持数据类型。但是,已经为此提出了一项功能,并且正在为此进行工作。此功能将成为 11 月版本的一部分。

【讨论】:

以上是关于如何将 AzureDatafactory 中的 DataPath PipelineParameter 传递给 AzureMachineLearningExecutePipeline Activity?的主要内容,如果未能解决你的问题,请参考以下文章

将数据从 Azure DWH 复制到 Azure 中的 Dynamics 365

Azure 存储表副本

Azure 数据工厂:如何从数据流转换中的流中获取第一行

Azure 数据工厂 - 尝试将参数添加到 REST API 请求正文中的动态内容

Azure 数据工厂-数据流-完成后-移动

Azure 数据工厂“等待验证”