根据气流中 sql 查询的结果创建动态任务

Posted

技术标签:

【中文标题】根据气流中 sql 查询的结果创建动态任务【英文标题】:Create dynamic tasks depending on the result of an sql query in airflow 【发布时间】:2021-06-12 16:32:21 【问题描述】:

我正在尝试使用 TaskGroup 创建动态任务,并将结果保存在变量中。该变量每 N 分钟修改一次,具体取决于数据库查询,但是当第二次修改该变量时,调度程序出现故障

基本上我需要根据查询中收到的唯一行数来创建任务。

以 TaskGroup(f"task") 作为任务:

    data_variable = Variable.get("df")
    data = data_variable

    try :
        if data != False and data !='none':
            df = pd.read_json(data)

            for field_id in df.field.unique():
             

                task1 = PythonOperator(
                   
                )
                task2 = PythonOperator(
                   
                )

               
                task1 >> task2

    except:
        pass

有没有办法通过任务组来做到这一点?

【问题讨论】:

【参考方案1】:

这是 Airflow 的反模式。

虽然您可以在***代码中使用Variable.get("df"),但您不应该这样做。使用任何数据库创建查询的变量/连接/任何其他代码只能在操作员范围内或使用 Jinja 模板完成。原因是 Airflow 会定期解析 DAG 文件(如果您没有更改 min_file_process_interval 的默认值,则每 30 秒解析一次),因此每 30 秒与数据库交互的代码将导致该数据库的负载过重。 对于其中一些情况,未来的气流版本会发出警告(请参阅PR)

气流任务应尽可能静态(或缓慢变化)。

【讨论】:

是的,我明白了,这只是动态生成任务的可能解决方案(不起作用)。任务组提供 xcom 或 default_args 来执行此操作似乎很理想,但似乎还不可能:github.com/apache/airflow/issues/13911任何其他解决方案? 您对此有什么建议吗? 这仍然是一种反模式。 Airflow 并非旨在根据仅在运行时已知的数据创建任务。

以上是关于根据气流中 sql 查询的结果创建动态任务的主要内容,如果未能解决你的问题,请参考以下文章

查询值和目标字段的数量不同 - C# 脚本任务 SSIS - 使用动态列将 SQL Proc 的结果导出到 Excel

apache气流的sql查询

Informix SQL 11.5 将查询结果存储在具有动态名称的文件中

如何循环在 SQL Server 中动态创建的查询

气流 - 如何根据运营商结果发送电子邮件?

气流动态生成的任务未按顺序运行