在 Kubeflow Pipelines 中,如何将元素列表发送到轻量级 python 组件?

Posted

技术标签:

【中文标题】在 Kubeflow Pipelines 中,如何将元素列表发送到轻量级 python 组件?【英文标题】:In Kubeflow Pipelines, how to send a list of elements to a lightweight python component? 【发布时间】:2020-01-08 09:59:43 【问题描述】:

我正在尝试将元素列表作为 PipelineParameter 发送到轻量级组件。 这是重现该问题的示例。这是函数:

def my_func(my_list: list) -> bool:
    print(f'my_list is my_list')
    print(f'my_list is of type type(my_list)')
    print(f'elem 0 is my_list[0]')
    print(f'elem 1 is my_list[1]')
    return True

如果我用这个来执行它:

test_data = ['abc', 'def']
my_func(test_data)

它的行为符合预期:

my_list is ['abc', 'def']
my_list is of type <class 'list'>
elem 0 is abc
elem 1 is def

但如果我将它包装在一个操作中并设置一个管道:

import kfp

my_op = kfp.components.func_to_container_op(my_func)

@kfp.dsl.pipeline()
def my_pipeline(my_list: kfp.dsl.PipelineParam = kfp.dsl.PipelineParam('my_list', param_type=kfp.dsl.types.List())):
    my_op(my_list)

kfp.compiler.Compiler().compile(my_pipeline, 'my_pipeline.zip')

然后运行一个管道:

client = kfp.Client()
experiment = client.create_experiment('Default')
client.run_pipeline(experiment.id, 'my job', 'my_pipeline.zip', params='my_list': test_data)

然后似乎我的列表在某些时候被转换为字符串!

my_list is ['abc', 'def']
my_list is of type <class 'str'>
elem 0 is [
elem 1 is '

【问题讨论】:

【参考方案1】:

这是我发现的一种解决方法,将参数序列化为 json 字符串。不确定这真的是最好的方法...

裸函数变为:

def my_func(json_arg_str: str) -> bool:
    import json
    args = json.loads(json_arg_str)
    my_list = args['my_list']
    print(f'my_list is my_list')
    print(f'my_list is of type type(my_list)')
    print(f'elem 0 is my_list[0]')
    print(f'elem 1 is my_list[1]')
    return True

只要您将 args 作为 json 字符串而不是列表传递,这仍然有效:

test_data = '"my_list":["abc", "def"]' my_func(test_data)

产生预期结果:

my_list is ['abc', 'def']
my_list is of type <class 'list'>
elem 0 is abc
elem 1 is def

现在管道已更改为接受 str 而不是 PipelineParam 类型的 kfp.dsl.types.List

import kfp 

my_op = kfp.components.func_to_container_op(my_func)

@kfp.dsl.pipeline()
def my_pipeline(json_arg_str: str):
    my_op(json_arg_str)

kfp.compiler.Compiler().compile(my_pipeline, 'my_pipeline.zip')

当这样执行时:

client = kfp.Client()
experiment = client.create_experiment('Default')
client.run_pipeline(experiment.id, 'my job', 'my_pipeline.zip', params='json_arg_str': test_data)

产生相同的结果:

my_list is ['abc', 'def']
my_list is of type <class 'list'>
elem 0 is abc
elem 1 is def

虽然它有效,但我仍然觉得这种解决方法很烦人。如果不允许作为列表的 PipelineParam,那么kfp.dsl.types.List 的意义何在?

【讨论】:

【参考方案2】:

目前最好的选择似乎是序列化参数。有一个问题与此相关:https://github.com/kubeflow/pipelines/issues/1901

【讨论】:

以上是关于在 Kubeflow Pipelines 中,如何将元素列表发送到轻量级 python 组件?的主要内容,如果未能解决你的问题,请参考以下文章

使用带有 Python 和 PyCharm 的 Kubeflow Pipelines SDK 连接到 AI Platform Pipelines

使用 kfp.dls.containerOp() 在 Kubeflow Pipelines 上运行多个脚本

在 Vertex AI 上使用 Tesla A100 GPU 和 Kubeflow Pipelines

在 Kubeflow 中运行自定义容器时,如何将参数传递给容器?

如何在 kubeflow 管道中传递环境变量?

使用Kubeflow构建机器学习流水线