对于 Apache Airflow,如何通过 CLI 手动触发 DAG 时传递参数?
Posted
技术标签:
【中文标题】对于 Apache Airflow,如何通过 CLI 手动触发 DAG 时传递参数?【英文标题】:For Apache Airflow, How can I pass the parameters when manually trigger DAG via CLI? 【发布时间】:2019-05-08 20:47:11 【问题描述】:我使用 Airflow 来管理 ETL 任务的执行和计划。一个 DAG 已创建并且工作正常。但是通过cli手动触发dag时是否可以传递参数。
例如: 我的 DAG 每天 01:30 运行,处理昨天的数据(时间范围从昨天 01:30 到今天 01:30)。数据源可能存在一些问题。我需要重新处理这些数据(手动指定时间范围)。
那么我可以创建这样一个气流 DAG,当它被安排时,默认时间范围是从昨天 01:30 到今天 01:30。那么如果数据源有问题,我需要手动触发 DAG 并手动将时间范围作为参数传递。
据我所知 airflow test
有 -tp
可以将参数传递给任务。但这仅用于测试特定任务。而airflow trigger_dag
没有-tp
选项。那么有没有什么办法可以通过tigger_dag向DAG传递参数,然后Operator可以读取这些参数呢?
谢谢!
【问题讨论】:
【参考方案1】:key: ['param1=somevalue1', 'param2=somevalue2']
第一种方式:
" dag_run.conf["key"] "
这会将传递的值呈现为字符串"['param1=somevalue1', 'param2=somevalue2']"
第二种方式:
def get_parameters(self, **kwargs):
dag_run = kwargs.get('dag_run')
parameters = dag_run.conf['key']
return parameters
在这种情况下,正在传递一个字符串列表并将其呈现为列表['param1=somevalue1', 'param2=somevalue2']
【讨论】:
【参考方案2】:您可以使用 --conf '"key":"value"'
从 CLI 传递参数,然后在 DAG 文件中将其用作模板字段中的 " dag_run.conf["key"] "
。
CLI:
airflow trigger_dag 'example_dag_conf' -r 'run_id' --conf '"message":"value"'
DAG 文件:
args =
'start_date': datetime.utcnow(),
'owner': 'airflow',
dag = DAG(
dag_id='example_dag_conf',
default_args=args,
schedule_interval=None,
)
def run_this_func(ds, **kwargs):
print("Remotely received value of for key=message".
format(kwargs['dag_run'].conf['message']))
run_this = PythonOperator(
task_id='run_this',
provide_context=True,
python_callable=run_this_func,
dag=dag,
)
# You can also access the DagRun object in templates
bash_task = BashOperator(
task_id="bash_task",
bash_command='echo "Here is the message: '
' dag_run.conf["message"] if dag_run else "" " ',
dag=dag,
)
【讨论】:
有没有办法在非模板字段中传递参数? @AshuGG 当你设置 provide_context=True 时,运行参数会在 kwargs['dag_run'].conf ,如代码示例所示 我也需要访问该参数值,因为我需要循环到该值并创建任务。如何在非模板字段之外访问该参数值? 你不能这样做,你需要使用气流变量:)【参考方案3】:根据气流文档,这应该可以工作:https://airflow.apache.org/cli.html#trigger_dag
airflow trigger_dag -c '"key1":1, "key2":2' dag_id
确保-c
的值是一个有效的json 字符串,所以这里需要用双引号包裹键。
【讨论】:
以上是关于对于 Apache Airflow,如何通过 CLI 手动触发 DAG 时传递参数?的主要内容,如果未能解决你的问题,请参考以下文章
Google Cloud Composer (Apache Airflow) 无法访问日志文件
Apache Airflow - BigQueryOperator:如何动态设置destination_dataset_table 分区