Apache Airflow - 如何在目标 DAG 中使用 TriggerDagRunOperator 设置 execution_date 以使用当前 execution_date
Posted
技术标签:
【中文标题】Apache Airflow - 如何在目标 DAG 中使用 TriggerDagRunOperator 设置 execution_date 以使用当前 execution_date【英文标题】:Apache Airflow - How to set execution_date using TriggerDagRunOperator in target DAG for use the current execution_date 【发布时间】:2019-05-15 16:02:10 【问题描述】:我想在触发器 DAG 中设置 execution_date。我用的是运算符TriggerDagRunOperator,这个运算符有参数execution_date,我想设置当前的execution_date。
def conditionally_trigger(context, dag_run_obj):
"""This function decides whether or not to Trigger the remote DAG"""
pp = pprint.PrettyPrinter(indent=4)
c_p = Variable.get("VAR2") == Variable.get("VAR1") and Variable.get("VAR3") == "1"
print("Controller DAG : conditionally_trigger = ".format(c_p))
if Variable.get("VAR2") == Variable.get("VAR1") and Variable.get("VAR3") == "1":
pp.pprint(dag_run_obj.payload)
return dag_run_obj
default_args =
'owner': 'pepito',
'depends_on_past': False,
'retries': 2,
'start_date': datetime(2018, 12, 1, 0, 0),
'email': ['xxxx@yyyyy.net'],
'email_on_failure': False,
'email_on_retry': False,
'retry_delay': timedelta(minutes=1)
dag = DAG(
'DAG_1',
default_args=default_args,
schedule_interval="0 12 * * 1",
dagrun_timeout=timedelta(hours=22),
max_active_runs=1,
catchup=False
)
trigger_dag_2 = TriggerDagRunOperator(
task_id='trigger_dag_2',
trigger_dag_id="DAG_2",
python_callable=conditionally_trigger,
execution_date= execution_date ,
dag=dag,
pool='a_roz'
)
但我得到下一个错误
名称“执行日期”未定义
如果我设置
execution_date= 'execution_date' ,
或
execution_date=' execution_date ',
我得到
Traceback(最近一次调用最后一次):
文件“/usr/local/lib/python3.6/site-packages/airflow/models.py”,第 1659 行,在 _run_raw_task 中
结果 = task_copy.execute(context=context)
文件“/usr/local/lib/python3.6/site-packages/airflow/operators/dagrun_operator.py”,第 78 行,在执行中
replace_microseconds=False)
文件“/usr/local/lib/python3.6/site-packages/airflow/api/common/experimental/trigger_dag.py”,第 98 行,在 trigger_dag 中
replace_microseconds=replace_microseconds,
文件“/usr/local/lib/python3.6/site-packages/airflow/api/common/experimental/trigger_dag.py”,第 45 行,在 _trigger_dag 中
断言 timezone.is_localized(execution_date)
文件“/usr/local/lib/python3.6/site-packages/airflow/utils/timezone.py”,第 38 行,在 is_localized 中
return value.utcoffset() 不是 None
AttributeError: 'str' 对象没有属性 'utcoffset'
如果我想等于 DAG_1,有谁知道如何设置 DAG_2 的执行日期?
这个问题与airflow TriggerDagRunOperator how to change the execution date不同,因为在这篇文章中没有解释如何通过运算符TriggerDagRunOperator发送execution_date,只是说存在这种可能性。 https://***.com/a/49442868/10269204
【问题讨论】:
airflow TriggerDagRunOperator how to change the execution date的可能重复 本帖中没有说明如何通过算子TriggerDagRunOperator发送execution_date,只是说存在这种可能性。 【参考方案1】:以前没有模板化,但现在用这个commit模板化了
您可以使用新版本的气流尝试您的代码
另外对于硬编码的execution_date,你需要设置tzinfo:
from datetime import datetime, timezone
execution_date=datetime(2019, 3, 27, tzinfo=timezone.utc)
# or:
execution_date=datetime.now().replace(tzinfo=timezone.utc)
【讨论】:
我正在为execution_date
尝试宏 ds
和 ts
,但 datetime(...) 没有用。非常感谢@mustafagok以上是关于Apache Airflow - 如何在目标 DAG 中使用 TriggerDagRunOperator 设置 execution_date 以使用当前 execution_date的主要内容,如果未能解决你的问题,请参考以下文章
如何在 AWS Managed Workflows for Apache Airflow 中启用 API?