如何通过 BigQuery 连接将 use_legacy_sql=False 传递给 Airflow DAG 中的 SqlSensor?
Posted
技术标签:
【中文标题】如何通过 BigQuery 连接将 use_legacy_sql=False 传递给 Airflow DAG 中的 SqlSensor?【英文标题】:How to pass use_legacy_sql=False to the SqlSensor in an Airflow DAG with a BigQuery connection? 【发布时间】:2019-05-02 12:25:15 【问题描述】:自 Apache Airflow 1.9 发布以来,我一直在使用自己的自定义 SqlSensor,因为我无法使用在 Google BigQuery 上运行的标准 SQL 语句中包含的那个,因为默认值为 use legacy SQL。
我检查了最近的 1.10.3 版本,似乎情况仍然如此。除了使用我自己的 SQL 传感器作为插件之外,还有其他方法可以完成这项工作吗?
【问题讨论】:
【参考方案1】:更新您的自定义传感器以将 use_legacy_sql=False
传递给 BigQueryHook。
hook = BigQueryHook(
bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to,
use_legacy_sql=False
)
【讨论】:
这正是我目前正在做的,但我想知道是否有办法在没有插件的情况下做到这一点。 你不需要用插件来做。您已经拥有自定义传感器,对吧?您可以将此代码添加到该传感器。 自定义传感器是一个插件,它是我必须维护的代码,我想避免这种情况。【参考方案2】:我找到的最快的解决方案是
定义一个新类 BigQuerySqlSensor 覆盖_get_hook
方法
在覆盖中设置use_legacy_sql=False
返回更新后的钩子
from airflow.sensors.sql_sensor import SqlSensor
class BigQuerySqlSensor(SqlSensor):
def _get_hook(self):
hook = super()._get_hook()
hook.use_legacy_sql = False
return hook
sense_stuff = BigQuerySqlSensor(
dag=dag,
task_id='sense_stuff',
conn_id='the_connection_id',
sql="SELECT COUNT(*) FROM some_table",
mode='reschedule',
poke_interval=600,
timeout=(3600)
)
【讨论】:
这就是我所说的“我正在使用我自己的自定义 SqlSensor”;)【参考方案3】:从文档中的enabling standard SQL主题,如果不能直接设置选项,另一种方法是使用#standardSQL
shebang,例如:
#standardSQL
SELECT x
FROM UNNEST([1, 2, 3]) AS x;
应该可以使用此前缀提交查询以覆盖设置。
【讨论】:
这不起作用,因为它正在向 API 发送矛盾的设置:"Query text specifies use_legacy_sql:false, while API options specify:true"
以上是关于如何通过 BigQuery 连接将 use_legacy_sql=False 传递给 Airflow DAG 中的 SqlSensor?的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 BigQuery 连接器将自定义查询从谷歌数据工作室传递到 BigQuery?
如何将日期转换为 CDT 并使用 BigQuery 制作连接字符串?
如何将表 1 上的结构数组与 BigQuery 中表 2 的普通列连接起来
Google BigQuery:如何使用 gsutil 删除或覆盖表?