Kubeflow 管道终止通知

Posted

技术标签:

【中文标题】Kubeflow 管道终止通知【英文标题】:Kubeflow Pipeline Termination Notificaiton 【发布时间】:2019-12-21 19:39:16 【问题描述】:

我尝试添加一个逻辑,该逻辑将在管道由于某些错误而终止时发送松弛通知。我试图用ExitHandler 来实现它。但是,ExitHandler 似乎不能依赖任何操作。你有什么好主意吗?

【问题讨论】:

【参考方案1】:

我找到了一个使用ExitHandler 的解决方案。我在下面发布我的代码,希望它可以帮助其他人。


def slack_notification(slack_channel: str, status: str, name: str, is_exit_handler: bool = False):
    """
    performs slack notifications
    """    
    send_slack_op = dsl.ContainerOp(
        name=name,
        image='wenmin.wu/slack-cli:latest',
        is_exit_handler=is_exit_handler,
        command=['sh', '-c'],
        arguments=["/send-message.sh -d  ''".format(slack_channel, status)]
    )
    send_slack_op.add_env_variable(V1EnvVar(name = 'SLACK_CLI_TOKEN', value_from=V1EnvVarSource(config_map_key_ref=V1ConfigMapKeySelector(name='workspace-config', key='SLACK_CLI_TOKEN'))))
    return send_slack_op

@dsl.pipeline(
    name='forecasting-supply',
    description='forecasting supply ...'
)
def ml_pipeline(
    param1,
    param2,
    param3,
):
    exit_task = slack_notification(
        slack_channel = slack_channel,
        name = "supply-forecasting",
        status = "Kubeflow pipeline: workflow.name has workflow.status!",
        is_exit_handler = True
    )

    with dsl.ExitHandler(exit_task):
        # put other tasks here

【讨论】:

嗨@wenmin-wu 如何解决workflow.xxx 参数?这是 KFP 的一个功能,在任何地方都没有看到它的记录。你有没有机会知道它是如何工作的? 嗨@AlexLatchford 这是 argo 的一个特性,因为 kubeflow 是基于 argo 的,所以所有的 argo 宏都可以使用。参考github.com/argoproj/argo/blob/master/docs/variables.md查看所有的argo宏。 嘿文敏,非常感谢您的链接!绝对没有意识到这是一个功能,谢谢分享! @WenminWu 我们如何从传递给退出处理程序的任何先前容器中获取输出?我浏览了 argo 文档,似乎没有任何效果。 如何访问 KFPL python 代码中的工作流变量。

以上是关于Kubeflow 管道终止通知的主要内容,如果未能解决你的问题,请参考以下文章

气流和 Kubeflow 管道有啥区别?

同一个 GCP 项目中的 Kubeflow 管道存储访问错误?

如何将 OutputPathPlaceholder 与带有 Kubeflow 管道的字符串连接起来?

如何连接 kubeflow 管道组件

如何扩展 kubeflow 管道(使用顶点 ai),或者它只是自动完成

如何在 kubeflow 管道中传递环境变量?