如何在气流中配置 Google Cloud BigQuery

Posted

技术标签:

【中文标题】如何在气流中配置 Google Cloud BigQuery【英文标题】:How to configure Google Cloud BigQuery in airflow 【发布时间】:2021-09-15 15:44:05 【问题描述】:

我正在尝试在我的 Apache Airflow 中添加一个连接以连接到谷歌云,以便使用 BigQueryHook,目前,我的服务帐户 json 文件存储在airflow-projects -> dags -> keys。从Admin -> Connections 中的添加连接,我已经指定 Conn Id = bigquery_defaultConn Type = Google CloudKeyfile Path=/keys/serviceKey.json。当我运行我的 dag 时,我收到了一个错误

FileNotFoundError: [Errno 2] 没有这样的文件或目录:'/keys/serviceKey.json'

我尝试更改 Keyfile Path=/dags/keys/serviceKey.json* 但仍然收到 FileNotFoundError。我错过了什么?

def get_data_from_bq(**kwargs):
    hook = BigQueryHook(bigquery_conn_id='bigquery_default', delegate_to=None, use_legacy_sql=False)
    conn = hook.get_conn()
    cursor = conn.cursor()
    cursor.execute('SELECT owner_display_name, title, view_count FROM `bigquery-public-data.***.posts_questions` WHERE creation_date > "2020-09-09" ORDER BY view_count DESC LIMIT 2')
    result = cursor.fetchall()
    print('result', result)
    return result

【问题讨论】:

最简单的解决方案是将完整的 JSON 粘贴到 Keyfile JSON 部分。那行得通 【参考方案1】:

您必须确保您的密钥在所有工作人员的特定路径中都可用。当您的工作人员运行时,您应该检查密钥的路径是什么。您的 dags 通常位于 $AIRFLOW_HOME/dags 中,因此您需要检查 $AIRFLOW_HOME 指向的内容并设置绝对路径。

但是,这可能不是最好的身份验证方式,如果您使用 GCP/GKE 来运行您的气流工作程序,最好使用类似工作负载身份的东西:例如,https://cloud.google.com/kubernetes-engine/docs/how-to/workload-identity。另一种选择是为您的工作人员设置 ENV 变量或使用虚拟机凭据https://cloud.google.com/docs/authentication/best-practices-applications - 在所有情况下,如果您未在挂钩中指定任何凭据,将使用这些默认凭据。

【讨论】:

以上是关于如何在气流中配置 Google Cloud BigQuery的主要内容,如果未能解决你的问题,请参考以下文章

如何将 Google Cloud SQL 与 Google Big Query 集成

将文件从 Google Cloud 自动上传到 Big Query

javascript Google Cloud函数用于在pub / sub和pub / sub到Big查询中发布数据

将存储在 Google Cloud Storage 中的文件加载到 Big Query 时出错

将数据流从 Google Cloud Storage 流式传输到 Big Query

气流.providers 和气流.contrib 之间的差异