Apache Airflow - 如何在目标 DAG 中使用 TriggerDagRunOperator 设置 execution_date 以使用当前 execution_date

Posted

技术标签:

【中文标题】Apache Airflow - 如何在目标 DAG 中使用 TriggerDagRunOperator 设置 execution_date 以使用当前 execution_date【英文标题】:Apache Airflow - How to set execution_date using TriggerDagRunOperator in target DAG for use the current execution_date 【发布时间】:2019-05-15 16:02:10 【问题描述】:

我想在触发器 DAG 中设置 execution_date。我用的是运算符TriggerDagRunOperator,这个运算符有参数execution_date,我想设置当前的execution_date。

def conditionally_trigger(context, dag_run_obj):
    """This function decides whether or not to Trigger the remote DAG"""
    pp = pprint.PrettyPrinter(indent=4)
    c_p = Variable.get("VAR2") == Variable.get("VAR1") and Variable.get("VAR3") == "1"
    print("Controller DAG : conditionally_trigger = ".format(c_p))
    if Variable.get("VAR2") == Variable.get("VAR1") and Variable.get("VAR3") == "1":
        pp.pprint(dag_run_obj.payload)
        return dag_run_obj

default_args = 
    'owner': 'pepito',
    'depends_on_past': False,
    'retries': 2,
    'start_date': datetime(2018, 12, 1, 0, 0),
    'email': ['xxxx@yyyyy.net'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retry_delay': timedelta(minutes=1)


dag = DAG(
    'DAG_1',
    default_args=default_args,
    schedule_interval="0 12 * * 1",
    dagrun_timeout=timedelta(hours=22),
    max_active_runs=1,
    catchup=False
)

trigger_dag_2 = TriggerDagRunOperator(
    task_id='trigger_dag_2',
    trigger_dag_id="DAG_2",
   python_callable=conditionally_trigger,
    execution_date= execution_date ,
   dag=dag,
   pool='a_roz'
)

但我得到下一个错误

名称“执行日期”未定义

如果我设置

execution_date= 'execution_date' ,

execution_date=' execution_date ',

我得到

Traceback(最近一次调用最后一次):

文件“/usr/local/lib/python3.6/site-packages/airflow/models.py”,第 1659 行,在 _run_raw_task 中

结果 = task_copy.execute(context=context)

文件“/usr/local/lib/python3.6/site-packages/airflow/operators/dagrun_operator.py”,第 78 行,在执行中

replace_microseconds=False)

文件“/usr/local/lib/python3.6/site-packages/airflow/api/common/experimental/trigger_dag.py”,第 98 行,在 trigger_dag 中

replace_microseconds=replace_microseconds,

文件“/usr/local/lib/python3.6/site-packages/airflow/api/common/experimental/trigger_dag.py”,第 45 行,在 _trigger_dag 中

断言 timezone.is_localized(execution_date)

文件“/usr/local/lib/python3.6/site-packages/airflow/utils/timezone.py”,第 38 行,在 is_localized 中

return value.utcoffset() 不是 None

AttributeError: 'str' 对象没有属性 'utcoffset'

如果我想等于 DAG_1,有谁知道如何设置 DAG_2 的执行日期?

这个问题与airflow TriggerDagRunOperator how to change the execution date不同,因为在这篇文章中没有解释如何通过运算符TriggerDagRunOperator发送execution_date,只是说存在这种可能性。 https://***.com/a/49442868/10269204

【问题讨论】:

airflow TriggerDagRunOperator how to change the execution date的可能重复 本帖中没有说明如何通过算子TriggerDagRunOperator发送execution_date,只是说存在这种可能性。 【参考方案1】:

以前没有模板化,但现在用这个commit模板化了

您可以使用新版本的气流尝试您的代码

另外对于硬编码的execution_date,你需要设置tzinfo:

from datetime import datetime, timezone
execution_date=datetime(2019, 3, 27, tzinfo=timezone.utc)
# or:
execution_date=datetime.now().replace(tzinfo=timezone.utc)

【讨论】:

我正在为 execution_date 尝试宏 ds ts ,但 datetime(...) 没有用。非常感谢@mustafagok

以上是关于Apache Airflow - 如何在目标 DAG 中使用 TriggerDagRunOperator 设置 execution_date 以使用当前 execution_date的主要内容,如果未能解决你的问题,请参考以下文章

Apache Airflow:将s3复制到s3的运算符

如何在 AWS Managed Workflows for Apache Airflow 中启用 API?

对于 Apache Airflow,如何通过 CLI 手动触发 DAG 时传递参数?

使用 Apache 气流存储和访问密码

在Conda环境中安装Apache-Airflow

Apache Atlas 和 Airflow 集成