Airflow 中的动态任务定义
Posted
技术标签:
【中文标题】Airflow 中的动态任务定义【英文标题】:Dynamic task definition in Airflow 【发布时间】:2018-07-27 06:40:16 【问题描述】:我目前正在尝试使用 Airflow 来编排一个流程,其中一些操作符是动态定义的,并且依赖于另一个(早期)操作符的输出。
在下面的代码中,t1 用新记录更新了一个文本文件(这些记录实际上是从外部队列中读取的,但为简单起见,我在这里将它们硬编码为 A、B 和 C)。然后,我想为从该文本文件中读取的每条记录创建单独的运算符。这些运算符将分别创建目录 A、B 和 C,并且在 Airflow UI 中将被视为单独的 bash 进程 Create_directory_A、Create_directory_B 和 Create_directory_C。
dag = DAG('Test_DAG',
description="Lorem ipsum.",
start_date=datetime(2017, 3, 20),
schedule_interval=None,
catchup=False)
def create_text_file(list_of_rows):
text_file = open('text_file.txt', "w")
for row in list_of_rows:
text_file.write(row + '\n')
text_file.close()
def read_text():
txt_file = open('text_file.txt', 'r')
return [element for element in txt_file.readlines()]
t1 = PythonOperator(
task_id='Create_text_file',
python_callable=create_text_file,
op_args=[['A', 'B', 'C']],
dag=dag
)
for row in read_text():
t2 = BashOperator(
task_id='Create_directory_'.format(row),
bash_command="mkdir params.dir_name",
params='dir_name': row,
dag=dag
)
t1 >> t2
在Airflow’s documentation 我可以看到调度程序将定期执行它[DAG] 以反映变化(如果有)。这是否意味着即使我的 t1 运算符在 t2 之前执行,bash 运算符也会在更新之前为记录列表创建(因为那是评估 DAG 时)存在风险?
【问题讨论】:
【参考方案1】:您不能动态创建依赖于上游任务输出的任务。您正在混淆计划和执行时间。一个 DAG 定义 和一个 task 在计划时间创建。 DAG run 和 task 实例 在执行时创建。只有任务实例可以产生输出。
Airflow 调度程序将在调度时间使用text_file.txt
包含的任何内容构建动态图。然后将这些任务运送给工人。
一个worker最终会执行t1
任务实例并创建一个新的text_file.txt
,但是此时,t2
的任务列表已经被调度器计算好并发送给worker了。
因此,无论最新的 t1
任务实例转储到 text_file.txt
中,都将在下次调度程序决定运行 DAG 时使用。
如果您的任务很快并且您的工作人员没有积压,那将是上一次 DAG 运行的内容。如果它们被积压,text_file.txt
的内容可能是陈旧的,如果你真的不走运,调度程序会在任务实例写入文件时读取文件,你将从read_text()
获得不完整的数据。
【讨论】:
感谢您的解释!我们如何确保 t1 在 t2 之前运行?这与 SubDAG 有关吗?t1 >> t2
就是这样做的——它构建了一个任务的有向无环图 (DAG),描述了任务应该以什么顺序运行。【参考方案2】:
这段代码实际上会创建t2
的一个实例,它将是使用从read_text()
获得的最后一个row
构建的bash 运算符。我确定这不是你想要的。
更好的方法是为 t2
运算符创建一个单独的 DAG,当文件由 t1
写入时触发。有一个关于此的问题可能会有所帮助:Apache Airflow - trigger/schedule DAG rerun on completion (File Sensor)
【讨论】:
谢谢史蒂夫。我意识到我错过了t1
中的额外方括号。就目前而言,代码确实为从read_text()
读取的所有行生成t2
。它获得的行列表是否存在过时的风险?
啊,好的。我仍然认为一个单独的 DAG 会更干净,更容易维护,我不会依赖这种工作方式。多个 DAG 在 Airflow 上很常见/
什么可能是一个好的方法:按计划查询表,如果找到,则为每一行运行任务?以上是关于Airflow 中的动态任务定义的主要内容,如果未能解决你的问题,请参考以下文章
入门Airflow 如何实现动态DAGs ?利用多任务提升效率
入门Airflow 如何实现动态DAGs ?利用多任务提升效率
入门Airflow 如何实现动态DAGs ?利用多任务提升效率