气流:一次运行气流子项的模式

Posted

技术标签:

【中文标题】气流:一次运行气流子项的模式【英文标题】:Airflow: pattern to run airflow subdag once 【发布时间】:2017-09-16 04:18:52 【问题描述】:

来自气流文档:

SubDAGs must have a schedule and be enabled. If the SubDAG’s schedule is set to None or @once, the SubDAG will succeed without having done anything

我知道 subdagooperator 实际上是作为 BackfillJob 实现的,因此我们必须向操作员提供 schedule_interval。但是,有没有办法为 subdag 获得 schedule_interval="@once" 的语义等价物?我担心如果我将 set schedule_interval="@daily" 用于 subdag,如果 subdag 运行时间超过一天,则 subdag 可能会运行多次。

def subdag_factory(parent_dag_name, child_dag_name, args):
    subdag = DAG(
        dag_id="parent_dag_name.child_dag_name".format(
            parent_dag_name=parent_dag_name, child_dag_name=child_dag_name
        ),
        schedule_interval="@daily", # <--- this bit here
        default_args=args
    )

    ... do more stuff to the subdag here
    return subdag

TLDR:如何伪造“每次触发父 dag 时只运行一次这个 subdag”

【问题讨论】:

【参考方案1】:

我发现 schedule=@once 非常适合我的 subdags。也许我的版本已经过时了,但即使所有任务都成功(或被跳过),我的 subdagsfailing 也有更多问题。

实际的示例代码现在在我的机器上运行得非常愉快:

subdag_name = ".".join((parent_name,child_name))
logging.info(parent_name)
logging.info(subdag_name)
dag_subdag = DAG(
    dag_id=subdag_name,
    default_args=dargs,
    schedule_interval="@once",
)

事实上,我最初将几乎所有的 dag 构建为我的 subdag 的美化 cfg 文件。不确定经过反复试验后的想法有多好,但计划间隔对我来说从来都不是障碍。

我正在运行一个相对较新的 1.8 版本,几乎没有自定义。我一直在遵循示例 dag 建议,将我的 subdag 保存在 dags 文件夹内的文件夹中,这样它们就不会出现在 DagBag 中。

【讨论】:

我正在使用气流 1.7.1.3 和 1.8 不是选项 ATM,因为该版本意外破坏了自定义执行器插件。我会看一下 1.8,看看是否可以运行时间表为 "@once" 的 subdags,但如果这是真的,我会感到惊讶,因为文档说它不是。 运气好吗?我的代码仍在愉快地逃跑。我试图在 1.7 中查找为您执行此操作的规范方法。我能找到的最接近的东西(假设@once 不可行)是设置你的execution_timeout,因为实际的 subdag 任务比你在 subdag 本身中设置的执行频率短。这样你就可以在你的 subdag 启动更多任务之前超时。我知道这是猜测,但我无法轻易地在我们的前叉中找到与您使用的一样古老的气流。 很想听听作者的意见,为什么当文档明确表示不应该这样做时它会起作用。【参考方案2】:

为 subdag 尝试使用 schedule=None 的外部触发模式。在这种情况下,它只会在被父 dag 触发时运行

【讨论】:

为了澄清,您建议使用TriggerDagRunOperator 来触发没有时间表的dag? subdag 的关键是我们想要 blocking 语义,trigger dagrun 操作符只是触发一个 dagrun 然后继续前进,而不是等到 dagrun 完成。此外,您不会在运行 subdag 的气流 UI 中获得透明度,您只知道触发了一些随机 dagrun。

以上是关于气流:一次运行气流子项的模式的主要内容,如果未能解决你的问题,请参考以下文章

如何防止气流回填dag运行?

气流 - 如何仅“填充 DagBag”一次

在相同的气流执行中创建和删除 BQ 临时表,然后在下一次运行时先创建表但表删除不起作用

气流 - 如何获得所有未来的运行日期

气流回填澄清

如果任何任务失败,气流回填将停止