如何连接 kubeflow 管道组件
Posted
技术标签:
【中文标题】如何连接 kubeflow 管道组件【英文标题】:How to connect kubeflow pipeline components 【发布时间】:2020-10-27 14:27:04 【问题描述】:我想通过传递任何类型的数据来建立组件之间的管道连接,只是为了让它看起来像带有箭头的流程图一样有组织。现在就像下面
无论 docker 容器是否生成输出,我都希望在组件之间传递一些示例数据。但是,如果需要对 docker 容器代码或 .yaml 进行任何更改,请告诉我
KFP 代码
import os
from pathlib import Path
import requests
import kfp
#Load the component
component1 = kfp.components.load_component_from_file('comp_typed.yaml')
component2 = kfp.components.load_component_from_file('component2.yaml')
component3 = kfp.components.load_component_from_file('component3.yaml')
component4 = kfp.components.load_component_from_file('component4.yaml')
#Use the component as part of the pipeline
@kfp.dsl.pipeline(name='Document Processing Pipeline', description='Document Processing Pipeline')
def data_passing():
task1 = component1()
task2 = component2(task1.output)
task3 = component3(task2.output)
task4 = component4(task3.output)
comp_typed.yaml 代码
name: DPC
description: This is an example
implementation:
container:
image: gcr.io/pro1-in-us/dpc_comp1@sha256:3768383b9cd694936ef00464cb1bdc7f48bc4e9bbf08bde50ac7346f25be15de
command: [python3, /dpc_comp1.py,]
component2.yaml
name: Custom_Plugin_1
description: This is an example
implementation:
container:
image: gcr.io/pro1-in-us/plugin1@sha256:16cb4aa9edf59bdf138177d41d46fcb493f84ce798781125dc7777ff5e1602e3
command: [python3, /plugin1.py,]
我尝试了this 和this,但除了错误之外什么也做不了。我是 python 和 kubeflow 的新手。我应该进行哪些代码更改以使用 KFP SDK 在所有 4 个组件之间传递数据。数据可以是文件/字符串
假设,组件 1 从 gs 存储桶下载了一个 .pdf 文件,我可以将相同的文件提供给下一个下游组件吗?组件 1 将文件下载到组件 1 docker 容器的 '/tmp/doc_pages' 位置,我认为该容器是该特定容器的本地位置,并且下游组件无法读取它们?
【问题讨论】:
【参考方案1】:This notebook,它描述了如何在 KFP 组件之间传递数据,可能会有用。它包含“小数据”的概念,直接传递;与您写入文件的“大数据”相比,然后(如示例笔记本中所示)输入和输出文件的路径由系统选择并传递给函数(作为字符串)。
如果您不需要在步骤之间传递数据,但想要指定步骤排序依赖项(例如,op2
直到 op1
完成后才会运行),您可以在管道定义中指出这一点,如下所示:
op2.after(op1)
【讨论】:
【参考方案2】:除了艾米的出色回答:
您的管道是正确的。建立组件之间依赖关系的最好方法是建立数据依赖关系。
让我们看看你的管道代码:
task2 = component2(task1.output)
您将task1
的输出传递给component2
。这应该会产生您想要的依赖关系。但是有几个问题(如果您尝试编译,您的管道会显示编译错误):
component1
需要有输出
component2
需要输入
component2
需要有一个输出(以便您可以将其传递给 component3)
等等
让我们添加它们:
name: DPC
description: This is an example
outputs:
- name: output_1
implementation:
container:
image: gcr.io/pro1-in-us/dpc_comp1@sha256:3768383b9cd694936ef00464cb1bdc7f48bc4e9bbf08bde50ac7346f25be15de
command: [python3, /dpc_comp1.py, --output-1-path, outputPath: output_1]
name: Custom_Plugin_1
description: This is an example
inputs:
- name: input_1
outputs:
- name: output_1
implementation:
container:
image: gcr.io/pro1-in-us/plugin1@sha256:16cb4aa9edf59bdf138177d41d46fcb493f84ce798781125dc7777ff5e1602e3
command: [python3, /plugin1.py, --input-1-path, inputPath: input_1, --output-1-path, outputPath: output_1]
通过这些更改,您的管道应该编译并显示您想要的依赖项。
请查看tutorial关于从命令行程序创建组件。
【讨论】:
谢谢!。我们可以并行运行 2 个管道组件吗?就像事件驱动一样.. 当组件 1 从 pdf 文档中拆分页面时,应该向组件 2 触发一个事件,以便它可以开始处理一些 OCR,而不是必须等待组件 1 完成整个 pdf 文档的拆分? 这是可能的(只有两个独立的组件使用 PubSub 等外部服务进行通信),但这不是受支持的配置。 KFP 目前是为批处理而设计的。【参考方案3】:如果你不想通过输出使用依赖或者在组件之间传递任何数据,你可以参考上一步中的 PVC 来显式调用依赖。
示例: 您可以创建 PVC 来存储数据。
vop = dsl.VolumeOp(name="pvc",
resource_name="pvc", size=<size>,
modes=dsl.VOLUME_MODE_RWO,)
在组件中使用它:
download = dsl.ContainerOp(name="download",image="",
command=[" "], arguments=[" "],
pvolumes="/data": vop.volume,)
现在您可以调用下载和训练之间的依赖关系,如下所示:
train = dsl.ContainerOp(name="train",image="",
command=[" "], arguments=[" "],
pvolumes="/data": download.pvolumes["/data"],)
【讨论】:
以上是关于如何连接 kubeflow 管道组件的主要内容,如果未能解决你的问题,请参考以下文章
如何扩展 kubeflow 管道(使用顶点 ai),或者它只是自动完成
是否可以将 kubeflow 组件与 tensorflow 扩展组件混合使用?