气流动态生成的任务未按顺序运行

Posted

技术标签:

【中文标题】气流动态生成的任务未按顺序运行【英文标题】:Airflow dynamically genarated task not run in order 【发布时间】:2021-08-12 15:03:54 【问题描述】:

我创建了动态任务生成 dag。任务是准确生成的,但这些任务不是按顺序触发的,也不是一致的。 我注意到它按字母数字顺序触发。 让我们检查一下 run_modification_ 任务。我已经生成了 0 到 29 个任务。我注意到它触发了以下格式。 run_modification_0 run_modification_1 run_modification_10 run_modification_11 run_modification_12 run_modification_13 run_modification_14 run_modification_15 run_modification_16 run_modification_17 run_modification_18 run_modification_19 run_modification_2 run_modification_21 run_modification_23....

但我需要按任务顺序运行它,例如run_modification_0 run_modification_1 run_modification_2 run_modification_3 run_modification_4 run_modification_5..

请帮助我按照任务创建顺序运行这些任务。

from datetime import date, timedelta, datetime
from airflow.utils.dates import days_ago
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.hooks.postgres_hook import PostgresHook
from airflow.models import Variable
import os

args = 
        'owner': 'Airflow',
        'start_date': days_ago(2),


dag = DAG(
        dag_id='tastOrder',
        default_args=args,
        schedule_interval=None,
        tags=['task']
)

modification_processXcom = """ cd  ti.xcom_pull(task_ids=\'run_modification_\'+params.i, key=\'taskDateFolder\')   """


def modificationProcess(ds,**kwargs):
    today  = datetime.strptime('2021-01-01', '%Y-%m-%d').date()
    i = str(kwargs['i'])
    newDate = today-timedelta(days=int(i))
    print(str(newDate))
    kwargs["ti"].xcom_push("taskDateFolder", str(newDate))



def getDays():
    today = today = datetime.strptime('2021-01-01', '%Y-%m-%d').date()
    yesterday = today - timedelta(days=30)
    day_Diff = today-yesterday
    return day_Diff,today

day_Diff, today = getDays()
for i in reversed(range(0,day_Diff.days)):
    run_modification = PythonOperator(
        task_id='run_modification_'+str(i),
        provide_context=True,
        python_callable=modificationProcess,
        op_kwargs='i': str(i),
        dag=dag,
    )

    modification_processXcom = BashOperator(
        task_id='modification_processXcom_'+str(i),
        bash_command=modification_processXcom,
        params = 'i' :str(i),
        dag = dag
    )

    run_modification >> modification_processXcom

【问题讨论】:

请分享您的代码 @Elad,代码共享 您要寻找的结构是什么? run_modification_1 -> modification_processXcom_1 -> run_modification_2 -> modification_processXcom_2 -> ... - > run_modification_29 -> modification_processXcom_29 ? 查看答案编辑 【参考方案1】:

获取依赖为:

run_modification_1 -> modification_processXcom_1 -> 
run_modification_2 -> modification_processXcom_2 -> ... - > 
run_modification_29 -> modification_processXcom_29

你可以这样做:

from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator


dag = DAG(
    dag_id='my_dag',
    schedule_interval=None,
    start_date=datetime(2021, 8, 10),
    catchup=False,
    is_paused_upon_creation=False,
)

mylist1 = []
mylist2 = []
for i in range(1, 30):
    mylist1.append(
        BashOperator( # Replace with your requested operator
            task_id=f'run_modification_i',
            bash_command=f"""echo executing run_modification_i""",
            dag=dag,
        )
    )
    mylist2.append(
        BashOperator( # Replace with your requested operator
            task_id=f'modification_processXcom_i',
            bash_command=f"""echo executing modification_processXcom_i""",
            dag=dag,
        )
    )
if len(mylist1) > 0:
    mylist1[-1] >> mylist2[-1] # This set dependency between run_modifiation to modification_processXcom
if len(mylist1) > 1:
    mylist2[-2] >> mylist1[-1] # This set dependency between modification_processXcom to previous run_modifiation

此代码创建一个运算符列表并将它们设置为依次运行:

树状图:

【讨论】:

嗨,Elad,它的工作符合我的预期。谢谢

以上是关于气流动态生成的任务未按顺序运行的主要内容,如果未能解决你的问题,请参考以下文章

根据气流中 sql 查询的结果创建动态任务

Airflow - 从 BigQuery 动态生成任务,但任务在之前完成之前重复运行

Gulp 任务未按顺序运行

动态创建蝗虫任务?

s-s-rS 2008 月份编号未按顺序显示

请问,ld链接生成的时候,加载动态库/静态库顺序如何?