如何通过 BigQuery 连接将 use_legacy_sql=False 传递给 Airflow DAG 中的 SqlSensor?

Posted

技术标签:

【中文标题】如何通过 BigQuery 连接将 use_legacy_sql=False 传递给 Airflow DAG 中的 SqlSensor?【英文标题】:How to pass use_legacy_sql=False to the SqlSensor in an Airflow DAG with a BigQuery connection? 【发布时间】:2019-05-02 12:25:15 【问题描述】:

自 Apache Airflow 1.9 发布以来,我一直在使用自己的自定义 SqlSensor,因为我无法使用在 Google BigQuery 上运行的标准 SQL 语句中包含的那个,因为默认值为 use legacy SQL。

我检查了最近的 1.10.3 版本,似乎情况仍然如此。除了使用我自己的 SQL 传感器作为插件之外,还有其他方法可以完成这项工作吗?

【问题讨论】:

【参考方案1】:

更新您的自定义传感器以将 use_legacy_sql=False 传递给 BigQueryHook。

hook = BigQueryHook(
            bigquery_conn_id=self.bigquery_conn_id,
            delegate_to=self.delegate_to,
            use_legacy_sql=False
       )

【讨论】:

这正是我目前正在做的,但我想知道是否有办法在没有插件的情况下做到这一点。 你不需要用插件来做。您已经拥有自定义传感器,对吧?您可以将此代码添加到该传感器。 自定义传感器是一个插件,它是我必须维护的代码,我想避免这种情况。【参考方案2】:

我找到的最快的解决方案是

定义一个新类 BigQuerySqlSensor 覆盖_get_hook 方法 在覆盖中设置use_legacy_sql=False 返回更新后的钩子
from airflow.sensors.sql_sensor import SqlSensor

class BigQuerySqlSensor(SqlSensor):
    def _get_hook(self):
        hook = super()._get_hook()
        hook.use_legacy_sql = False
        return hook

sense_stuff = BigQuerySqlSensor(
        dag=dag,
        task_id='sense_stuff',
        conn_id='the_connection_id',
        sql="SELECT COUNT(*) FROM some_table",
        mode='reschedule',
        poke_interval=600,
        timeout=(3600)
    )

【讨论】:

这就是我所说的“我正在使用我自己的自定义 SqlSensor”;)【参考方案3】:

从文档中的enabling standard SQL主题,如果不能直接设置选项,另一种方法是使用#standardSQL shebang,例如:

#standardSQL
SELECT x
FROM UNNEST([1, 2, 3]) AS x;

应该可以使用此前缀提交查询以覆盖设置。

【讨论】:

这不起作用,因为它正在向 API 发送矛盾的设置:"Query text specifies use_legacy_sql:false, while API options specify:true"

以上是关于如何通过 BigQuery 连接将 use_legacy_sql=False 传递给 Airflow DAG 中的 SqlSensor?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 BigQuery 连接器将自定义查询从谷歌数据工作室传递到 BigQuery?

如何将日期转换为 CDT 并使用 BigQuery 制作连接字符串?

如何将表 1 上的结构数组与 BigQuery 中表 2 的普通列连接起来

Google BigQuery:如何使用 gsutil 删除或覆盖表?

Google BigQuery Spark 连接器:如何在追加时忽略未知值

BigQuery 数据连接器:如何按字段值限制可访问数据