Airflow 中的动态任务定义

Posted

技术标签:

【中文标题】Airflow 中的动态任务定义【英文标题】:Dynamic task definition in Airflow 【发布时间】:2018-07-27 06:40:16 【问题描述】:

我目前正在尝试使用 Airflow 来编排一个流程,其中一些操作符是动态定义的,并且依赖于另一个(早期)操作符的输出。

在下面的代码中,t1 用新记录更新了一个文本文件(这些记录实际上是从外部队列中读取的,但为简单起见,我在这里将它们硬编码为 A、B 和 C)。然后,我想为从该文本文件中读取的每条记录创建单独的运算符。这些运算符将分别创建目录 A、B 和 C,并且在 Airflow UI 中将被视为单独的 bash 进程 Create_directory_A、Create_directory_B 和 Create_directory_C。

dag = DAG('Test_DAG',
          description="Lorem ipsum.",
          start_date=datetime(2017, 3, 20),
          schedule_interval=None,
          catchup=False)


def create_text_file(list_of_rows):
    text_file = open('text_file.txt', "w")
    for row in list_of_rows:
        text_file.write(row + '\n')
    text_file.close()


def read_text():
    txt_file = open('text_file.txt', 'r')
    return [element for element in txt_file.readlines()]


t1 = PythonOperator(
    task_id='Create_text_file',
    python_callable=create_text_file,
    op_args=[['A', 'B', 'C']],
    dag=dag
)

for row in read_text():
    t2 = BashOperator(
        task_id='Create_directory_'.format(row),
        bash_command="mkdir params.dir_name",
        params='dir_name': row,
        dag=dag
    )

    t1 >> t2

在Airflow’s documentation 我可以看到调度程序将定期执行它[DAG] 以反映变化(如果有)。这是否意味着即使我的 t1 运算符在 t2 之前执行,bash 运算符也会在更新之前为记录列表创建(因为那是评估 DAG 时)存在风险?

【问题讨论】:

【参考方案1】:

您不能动态创建依赖于上游任务输出的任务。您正在混淆计划和执行时间。一个 DAG 定义 和一个 task 在计划时间创建。 DAG runtask 实例 在执行时创建。只有任务实例可以产生输出。

Airflow 调度程序将在调度时间使用text_file.txt 包含的任何内容构建动态图。然后将这些任务运送给工人。

一个worker最终会执行t1任务实例并创建一个新的text_file.txt,但是此时,t2的任务列表已经被调度器计算好并发送给worker了。

因此,无论最新的 t1 任务实例转储到 text_file.txt 中,都将在下次调度程序决定运行 DAG 时使用。

如果您的任务很快并且您的工作人员没有积压,那将是上一次 DAG 运行的内容。如果它们被积压,text_file.txt 的内容可能是陈旧的,如果你真的不走运,调度程序会在任务实例写入文件时读取文件,你将从read_text() 获得不完整的数据。

【讨论】:

感谢您的解释!我们如何确保 t1 在 t2 之前运行?这与 SubDAG 有关吗? t1 >> t2 就是这样做的——它构建了一个任务的有向无环图 (DAG),描述了任务应该以什么顺序运行。【参考方案2】:

这段代码实际上会创建t2 的一个实例,它将是使用从read_text() 获得的最后一个row 构建的bash 运算符。我确定这不是你想要的。

更好的方法是为 t2 运算符创建一个单独的 DAG,当文件由 t1 写入时触发。有一个关于此的问题可能会有所帮助:Apache Airflow - trigger/schedule DAG rerun on completion (File Sensor)

【讨论】:

谢谢史蒂夫。我意识到我错过了t1 中的额外方括号。就目前而言,代码确实为从read_text() 读取的所有行生成t2。它获得的行列表是否存在过时的风险? 啊,好的。我仍然认为一个单独的 DAG 会更干净,更容易维护,我不会依赖这种工作方式。多个 DAG 在 Airflow 上很常见/ 什么可能是一个好的方法:按计划查询表,如果找到,则为每一行运行任务?

以上是关于Airflow 中的动态任务定义的主要内容,如果未能解决你的问题,请参考以下文章

在 DAG 运行期间动态生成 DAG - Airflow

入门Airflow 如何实现动态DAGs ?利用多任务提升效率

入门Airflow 如何实现动态DAGs ?利用多任务提升效率

入门Airflow 如何实现动态DAGs ?利用多任务提升效率

入门Airflow 如何实现动态DAGs ?利用多任务提升效率

大数据调度平台Airflow:Airflow使用