气流:一次运行气流子项的模式
Posted
技术标签:
【中文标题】气流:一次运行气流子项的模式【英文标题】:Airflow: pattern to run airflow subdag once 【发布时间】:2017-09-16 04:18:52 【问题描述】:来自气流文档:
SubDAGs must have a schedule and be enabled. If the SubDAG’s schedule is set to None or @once, the SubDAG will succeed without having done anything
我知道 subdagooperator 实际上是作为 BackfillJob 实现的,因此我们必须向操作员提供 schedule_interval
。但是,有没有办法为 subdag 获得 schedule_interval="@once"
的语义等价物?我担心如果我将 set schedule_interval="@daily"
用于 subdag,如果 subdag 运行时间超过一天,则 subdag 可能会运行多次。
def subdag_factory(parent_dag_name, child_dag_name, args):
subdag = DAG(
dag_id="parent_dag_name.child_dag_name".format(
parent_dag_name=parent_dag_name, child_dag_name=child_dag_name
),
schedule_interval="@daily", # <--- this bit here
default_args=args
)
... do more stuff to the subdag here
return subdag
TLDR:如何伪造“每次触发父 dag 时只运行一次这个 subdag”
【问题讨论】:
【参考方案1】:我发现
schedule=@once
非常适合我的 subdags。也许我的版本已经过时了,但即使所有任务都成功(或被跳过),我的 subdagsfailing 也有更多问题。
实际的示例代码现在在我的机器上运行得非常愉快:
subdag_name = ".".join((parent_name,child_name))
logging.info(parent_name)
logging.info(subdag_name)
dag_subdag = DAG(
dag_id=subdag_name,
default_args=dargs,
schedule_interval="@once",
)
事实上,我最初将几乎所有的 dag 构建为我的 subdag 的美化 cfg 文件。不确定经过反复试验后的想法有多好,但计划间隔对我来说从来都不是障碍。
我正在运行一个相对较新的 1.8 版本,几乎没有自定义。我一直在遵循示例 dag 建议,将我的 subdag 保存在 dags 文件夹内的文件夹中,这样它们就不会出现在 DagBag 中。
【讨论】:
我正在使用气流 1.7.1.3 和 1.8 不是选项 ATM,因为该版本意外破坏了自定义执行器插件。我会看一下 1.8,看看是否可以运行时间表为"@once"
的 subdags,但如果这是真的,我会感到惊讶,因为文档说它不是。
运气好吗?我的代码仍在愉快地逃跑。我试图在 1.7 中查找为您执行此操作的规范方法。我能找到的最接近的东西(假设@once
不可行)是设置你的execution_timeout
,因为实际的 subdag 任务比你在 subdag 本身中设置的执行频率短。这样你就可以在你的 subdag 启动更多任务之前超时。我知道这是猜测,但我无法轻易地在我们的前叉中找到与您使用的一样古老的气流。
很想听听作者的意见,为什么当文档明确表示不应该这样做时它会起作用。【参考方案2】:
为 subdag 尝试使用 schedule=None 的外部触发模式。在这种情况下,它只会在被父 dag 触发时运行
【讨论】:
为了澄清,您建议使用TriggerDagRunOperator 来触发没有时间表的dag? subdag 的关键是我们想要 blocking 语义,trigger dagrun 操作符只是触发一个 dagrun 然后继续前进,而不是等到 dagrun 完成。此外,您不会在运行 subdag 的气流 UI 中获得透明度,您只知道触发了一些随机 dagrun。以上是关于气流:一次运行气流子项的模式的主要内容,如果未能解决你的问题,请参考以下文章