对于 Apache Airflow,如何通过 CLI 手动触发 DAG 时传递参数?

Posted

技术标签:

【中文标题】对于 Apache Airflow,如何通过 CLI 手动触发 DAG 时传递参数?【英文标题】:For Apache Airflow, How can I pass the parameters when manually trigger DAG via CLI? 【发布时间】:2019-05-08 20:47:11 【问题描述】:

我使用 Airflow 来管理 ETL 任务的执行和计划。一个 DAG 已创建并且工作正常。但是通过cli手动触发dag时是否可以传递参数。

例如: 我的 DAG 每天 01:30 运行,处理昨天的数据(时间范围从昨天 01:30 到今天 01:30)。数据源可能存在一些问题。我需要重新处理这些数据(手动指定时间范围)。

那么我可以创建这样一个气流 DAG,当它被安排时,默认时间范围是从昨天 01:30 到今天 01:30。那么如果数据源有问题,我需要手动触发 DAG 并手动将时间范围作为参数传递。

据我所知 airflow test-tp 可以将参数传递给任务。但这仅用于测试特定任务。而airflow trigger_dag 没有-tp 选项。那么有没有什么办法可以通过tigger_dag向DAG传递参数,然后Operator可以读取这些参数呢?

谢谢!

【问题讨论】:

【参考方案1】:
key: ['param1=somevalue1', 'param2=somevalue2']

第一种方式:

" dag_run.conf["key"] "

这会将传递的值呈现为字符串"['param1=somevalue1', 'param2=somevalue2']"

第二种方式:

def get_parameters(self, **kwargs):
    dag_run = kwargs.get('dag_run')
    parameters = dag_run.conf['key']
    return parameters

在这种情况下,正在传递一个字符串列表并将其呈现为列表['param1=somevalue1', 'param2=somevalue2']

【讨论】:

【参考方案2】:

您可以使用 --conf '"key":"value"' 从 CLI 传递参数,然后在 DAG 文件中将其用作模板字段中的 " dag_run.conf["key"] "

CLI

airflow trigger_dag 'example_dag_conf' -r 'run_id' --conf '"message":"value"'

DAG 文件

args = 
    'start_date': datetime.utcnow(),
    'owner': 'airflow',


dag = DAG(
    dag_id='example_dag_conf',
    default_args=args,
    schedule_interval=None,
)

def run_this_func(ds, **kwargs):
    print("Remotely received value of  for key=message".
          format(kwargs['dag_run'].conf['message']))


run_this = PythonOperator(
    task_id='run_this',
    provide_context=True,
    python_callable=run_this_func,
    dag=dag,
)

# You can also access the DagRun object in templates
bash_task = BashOperator(
    task_id="bash_task",
    bash_command='echo "Here is the message: '
                 ' dag_run.conf["message"] if dag_run else "" " ',
    dag=dag,
)

【讨论】:

有没有办法在非模板字段中传递参数? @AshuGG 当你设置 provide_context=True 时,运行参数会在 kwargs['dag_run'].conf ,如代码示例所示 我也需要访问该参数值,因为我需要循环到该值并创建任务。如何在非模板字段之外访问该参数值? 你不能这样做,你需要使用气流变量:)【参考方案3】:

根据气流文档,这应该可以工作:https://airflow.apache.org/cli.html#trigger_dag

airflow trigger_dag -c '"key1":1, "key2":2' dag_id

确保-c 的值是一个有效的json 字符串,所以这里需要用双引号包裹键。

【讨论】:

以上是关于对于 Apache Airflow,如何通过 CLI 手动触发 DAG 时传递参数?的主要内容,如果未能解决你的问题,请参考以下文章

安装 Apache Airflow 后出错

Google Cloud Composer (Apache Airflow) 无法访问日志文件

Apache Airflow - BigQueryOperator:如何动态设置destination_dataset_table 分区

Apache Airflow 中的分布式日志记录

任务调度工具 Apache Airflow 初识

使用 Apache 气流存储和访问密码