如何连接 kubeflow 管道组件

Posted

技术标签:

【中文标题】如何连接 kubeflow 管道组件【英文标题】:How to connect kubeflow pipeline components 【发布时间】:2020-10-27 14:27:04 【问题描述】:

我想通过传递任何类型的数据来建立组件之间的管道连接,只是为了让它看起来像带有箭头的流程图一样有组织。现在就像下面

无论 docker 容器是否生成输出,我都希望在组件之间传递一些示例数据。但是,如果需要对 docker 容器代码或 .yaml 进行任何更改,请告诉我

KFP 代码

import os
from pathlib import Path
import requests

import kfp

#Load the component
component1 = kfp.components.load_component_from_file('comp_typed.yaml')
component2 = kfp.components.load_component_from_file('component2.yaml')
component3 = kfp.components.load_component_from_file('component3.yaml')
component4 = kfp.components.load_component_from_file('component4.yaml')

#Use the component as part of the pipeline
@kfp.dsl.pipeline(name='Document Processing Pipeline', description='Document Processing Pipeline')
def data_passing():
    task1 = component1()
    task2 = component2(task1.output)
    task3 = component3(task2.output)
    task4 = component4(task3.output)

comp_typed.yaml 代码

name: DPC
description: This is an example
implementation:
  container:
    image: gcr.io/pro1-in-us/dpc_comp1@sha256:3768383b9cd694936ef00464cb1bdc7f48bc4e9bbf08bde50ac7346f25be15de
    command: [python3, /dpc_comp1.py,]

component2.yaml

name: Custom_Plugin_1
description: This is an example
implementation:
  container:
    image: gcr.io/pro1-in-us/plugin1@sha256:16cb4aa9edf59bdf138177d41d46fcb493f84ce798781125dc7777ff5e1602e3
    command: [python3, /plugin1.py,]

我尝试了this 和this,但除了错误之外什么也做不了。我是 python 和 kubeflow 的新手。我应该进行哪些代码更改以使用 KFP SDK 在所有 4 个组件之间传递数据。数据可以是文件/字符串

假设,组件 1 从 gs 存储桶下载了一个 .pdf 文件,我可以将相同的文件提供给下一个下游组件吗?组件 1 将文件下载到组件 1 docker 容器的 '/tmp/doc_pages' 位置,我认为该容器是该特定容器的本地位置,并且下游组件无法读取它们?

【问题讨论】:

【参考方案1】:

This notebook,它描述了如何在 KFP 组件之间传递数据,可能会有用。它包含“小数据”的概念,直接传递;与您写入文件的“大数据”相比,然后(如示例笔记本中所示)输入和输出文件的路径由系统选择并传递给函数(作为字符串)。

如果您不需要在步骤之间传递数据,但想要指定步骤排序依赖项(例如,op2 直到 op1 完成后才会运行),您可以在管道定义中指出这一点,如下所示:

op2.after(op1)

【讨论】:

【参考方案2】:

除了艾米的出色回答:

您的管道是正确的。建立组件之间依赖关系的最好方法是建立数据依赖关系

让我们看看你的管道代码:

task2 = component2(task1.output)

您将task1 的输出传递给component2。这应该会产生您想要的依赖关系。但是有几个问题(如果您尝试编译,您的管道会显示编译错误):

    component1 需要有输出 component2 需要输入 component2 需要有一个输出(以便您可以将其传递给 component3)

等等

让我们添加它们:

name: DPC
description: This is an example
outputs:
- name: output_1
implementation:
  container:
    image: gcr.io/pro1-in-us/dpc_comp1@sha256:3768383b9cd694936ef00464cb1bdc7f48bc4e9bbf08bde50ac7346f25be15de
    command: [python3, /dpc_comp1.py, --output-1-path, outputPath: output_1]
name: Custom_Plugin_1
description: This is an example
inputs:
- name: input_1
outputs:
- name: output_1
implementation:
  container:
    image: gcr.io/pro1-in-us/plugin1@sha256:16cb4aa9edf59bdf138177d41d46fcb493f84ce798781125dc7777ff5e1602e3
    command: [python3, /plugin1.py, --input-1-path, inputPath: input_1, --output-1-path, outputPath: output_1]

通过这些更改,您的管道应该编译并显示您想要的依赖项。

请查看tutorial关于从命令行程序创建组件。

【讨论】:

谢谢!。我们可以并行运行 2 个管道组件吗?就像事件驱动一样.. 当组件 1 从 pdf 文档中拆分页面时,应该向组件 2 触发一个事件,以便它可以开始处理一些 OCR,而不是必须等待组件 1 完成整个 pdf 文档的拆分? 这是可能的(只有两个独立的组件使用 PubSub 等外部服务进行通信),但这不是受支持的配置。 KFP 目前是为批处理而设计的。【参考方案3】:

如果你不想通过输出使用依赖或者在组件之间传递任何数据,你可以参考上一步中的 PVC 来显式调用依赖。

示例: 您可以创建 PVC 来存储数据。

vop = dsl.VolumeOp(name="pvc",
                   resource_name="pvc", size=<size>, 
                   modes=dsl.VOLUME_MODE_RWO,)

在组件中使用它:

download = dsl.ContainerOp(name="download",image="", 
                           command=[" "], arguments=[" "], 
                           pvolumes="/data": vop.volume,)

现在您可以调用下载和训练之间的依赖关系,如下所示:

train = dsl.ContainerOp(name="train",image="", 
                        command=[" "], arguments=[" "], 
                        pvolumes="/data": download.pvolumes["/data"],)

【讨论】:

以上是关于如何连接 kubeflow 管道组件的主要内容,如果未能解决你的问题,请参考以下文章

如何扩展 kubeflow 管道(使用顶点 ai),或者它只是自动完成

是否可以将 kubeflow 组件与 tensorflow 扩展组件混合使用?

构建 kubeflow 组件的最佳选择是啥?

如何在 kubeflow 管道中传递环境变量?

在 Kubeflow 中运行自定义容器时,如何将参数传递给容器?

从 Kubeflow 管道运行中获取实验名称