Airflow 中的 BigQuery 参数化查询

Posted

技术标签:

【中文标题】Airflow 中的 BigQuery 参数化查询【英文标题】:BigQuery parameterized queries in Airflow 【发布时间】:2020-12-02 12:22:25 【问题描述】:

我正在尝试使用 Airflow 在 BigQuery 中创建参数化查询。将在 Airflow 中设置的参数,并将更改将在 BigQuery 中运行的查询中的值。

例如,这是我的查询:

select id from table where id = params.number_days

气流代码包括:

( task_id='表', use_legacy_sql=假, write_disposition='WRITE_TRUNCATE', allow_large_results=真, bql=merchant_rank_query, destination_dataset_table='prod.table_result', 参数 = 'number_days': 1, dag=dag)

这样不行,正确的方法是什么?

【问题讨论】:

【参考方案1】:

对于 Airflow,具体如下所示:

params = 
    'number_days': 'value_abc'


dag = DAG(
    'example_dag',
    schedule_interval='@daily',
    default_args=default_args,
    params=params,
)

确保在您的 sql 查询中设置 jinja 模板。

select id from table where id = params.number_days

一篇不太针对操作员的中等文章,但一般解释了一些技巧,还有参数用法。

(4) “参数”参数

https://medium.com/datareply/airflow-lesser-known-tips-tricks-and-best-practises-cf4d4a90f8f

第二个选项 - BigQuery 查询参数

如果您的 BigQuery 运算符支持 query_params,您可以使用如下内容:

#on top of your code, can be before declaring the dag
dag_exec_date_time = ' ts '
...
...
...
#then your bigquery operator, if supports query_params

  query_params=[
      
          "name": "dag_exec_date_time",
          "parameterType":  "type": "STRING" ,
          "parameterValue":  "value": dag_exec_date_time
      
  ],


sql查询

select cast(@dag_exec_date_time as timestamp) as airflow_timestamp

【讨论】:

【参考方案2】:

要运行parameterized query,代码如下所示:

sql = "select id from table where id = @days"
job_config = bigquery.QueryJobConfig()
job_config.query_parameters = [bigquery.ScalarQueryParameter('days','INT64',params.number_days)]
query_job = bigquery.Client().query(command=sql, job_config=job_config)
results = query_job.result()
if query_job.errors:
    # log all errors then raise first error
    for e in query_job.errors:
        logging.error(e)
    raise Exception(query_job.errors[0])

Bigquery 不是普通的关系数据库。没有“计划缓存”或其他优化可以使参数化查询更好。因此,除非存在某种可能是 SQL 注入的不安全输入,否则我建议不要使用参数并将值放入 SQL 中,因为它非常简单:

sql = f"select id from table where id = params.number_days"
query_job = bigquery.Client().query(command=sql)
...

【讨论】:

我认为您的回答脱离了 Apache-Airflow 的上下文。我相信他想将 Airflow 提供的 Jinja 模板用于参数化查询。 params 是 Airflow 传递给每个操作员的字典的名称。 @Luis 你说得对,这段代码在气流之外有效。我也在气流中使用它。

以上是关于Airflow 中的 BigQuery 参数化查询的主要内容,如果未能解决你的问题,请参考以下文章

Google BigQuery 中的参数化查询错误

IN()表达式中的nodejs bigquery参数化查询

使用 Airflow 将 Bigquery 查询结果发送到数据框

如何获取参数化 BigQuery 查询的控制台视图?

如何设置 Airflow DAG 权限以查询基于 Google Sheets 文档构建的 BigQuery 表?

如何安全地为 bigquery 节点插入转义用户输入?可以在 bigquery.insert 节点库上使用参数化查询吗?