Airflow 中的 BigQuery 参数化查询
Posted
技术标签:
【中文标题】Airflow 中的 BigQuery 参数化查询【英文标题】:BigQuery parameterized queries in Airflow 【发布时间】:2020-12-02 12:22:25 【问题描述】:我正在尝试使用 Airflow 在 BigQuery 中创建参数化查询。将在 Airflow 中设置的参数,并将更改将在 BigQuery 中运行的查询中的值。
例如,这是我的查询:
select id from table where id = params.number_days
气流代码包括:
( task_id='表', use_legacy_sql=假, write_disposition='WRITE_TRUNCATE', allow_large_results=真, bql=merchant_rank_query, destination_dataset_table='prod.table_result', 参数 = 'number_days': 1, dag=dag)
这样不行,正确的方法是什么?
【问题讨论】:
【参考方案1】:对于 Airflow,具体如下所示:
params =
'number_days': 'value_abc'
dag = DAG(
'example_dag',
schedule_interval='@daily',
default_args=default_args,
params=params,
)
确保在您的 sql 查询中设置 jinja 模板。
select id from table where id = params.number_days
一篇不太针对操作员的中等文章,但一般解释了一些技巧,还有参数用法。
(4) “参数”参数
https://medium.com/datareply/airflow-lesser-known-tips-tricks-and-best-practises-cf4d4a90f8f
第二个选项 - BigQuery 查询参数
如果您的 BigQuery 运算符支持 query_params,您可以使用如下内容:
#on top of your code, can be before declaring the dag
dag_exec_date_time = ' ts '
...
...
...
#then your bigquery operator, if supports query_params
query_params=[
"name": "dag_exec_date_time",
"parameterType": "type": "STRING" ,
"parameterValue": "value": dag_exec_date_time
],
sql查询
select cast(@dag_exec_date_time as timestamp) as airflow_timestamp
【讨论】:
【参考方案2】:要运行parameterized query,代码如下所示:
sql = "select id from table where id = @days"
job_config = bigquery.QueryJobConfig()
job_config.query_parameters = [bigquery.ScalarQueryParameter('days','INT64',params.number_days)]
query_job = bigquery.Client().query(command=sql, job_config=job_config)
results = query_job.result()
if query_job.errors:
# log all errors then raise first error
for e in query_job.errors:
logging.error(e)
raise Exception(query_job.errors[0])
Bigquery 不是普通的关系数据库。没有“计划缓存”或其他优化可以使参数化查询更好。因此,除非存在某种可能是 SQL 注入的不安全输入,否则我建议不要使用参数并将值放入 SQL 中,因为它非常简单:
sql = f"select id from table where id = params.number_days"
query_job = bigquery.Client().query(command=sql)
...
【讨论】:
我认为您的回答脱离了 Apache-Airflow 的上下文。我相信他想将 Airflow 提供的 Jinja 模板用于参数化查询。params
是 Airflow 传递给每个操作员的字典的名称。
@Luis 你说得对,这段代码在气流之外有效。我也在气流中使用它。以上是关于Airflow 中的 BigQuery 参数化查询的主要内容,如果未能解决你的问题,请参考以下文章
使用 Airflow 将 Bigquery 查询结果发送到数据框
如何设置 Airflow DAG 权限以查询基于 Google Sheets 文档构建的 BigQuery 表?
如何安全地为 bigquery 节点插入转义用户输入?可以在 bigquery.insert 节点库上使用参数化查询吗?