使用 Airflow 将 Bigquery 查询结果发送到数据框

Posted

技术标签:

【中文标题】使用 Airflow 将 Bigquery 查询结果发送到数据框【英文标题】:Bigquery query result to dataframe with Airflow 【发布时间】:2019-10-22 14:07:30 【问题描述】:

我正在尝试从 bigquery 查询数据并使用 Airflow 将其写入数据帧。但它要么给出file not found(服务帐户密钥)或file name is too longeof line read错误。

我也尝试过使用钩子,但我无法将密钥文件作为 json 文件,因为它说它太长了。

关于如何实现它的任何提示?

def get_data_from_GBQ():

global customer_data
ofo_cred = Variable.get("ofo_cred")
logging.info(ofo_cred)
logging.info("Variable is here")
customer_data_query = """ SELECT FirstName, LastName, Organisation FROM `bigquery-bi.ofo.Customers` LIMIT 2 """
logging.info("test")

# Creating a connection to the google bigquery
client = bigquery.Client.from_service_account_json(ofo_cred)
logging.info("after client")
customer_data = client.query(customer_data_query).to_dataframe()
logging.info("after client")
print(customer_data)

dag = DAG(
'odoo_gbq_connection',
default_args=default_args,
description='A connection between ',
schedule_interval=timedelta(days=1),)

错误是:

FileNotFoundError: [Errno 2] No such file or directory: '\r\n  "type": "service_account",\r\n  "project_id":...

【问题讨论】:

我很好奇您为什么会看到 3 种不同类型的错误消息。您能否为您的问题添加更多细节? 很高兴看到您的 DAG 代码。请准备最小的例子 @YunZhang 我没有看到他们三个在一起。每次,当我尝试新事物时,都会抛出不同的错误:( @ArtemVovsia 我已经更新了。你能检查一下吗?这是一个完全正常的日志变量,但是在授权期间,它会抛出上面的错误。我不知道为什么。 您使用的是开源 Airflow 还是 Cloud Composer? 【参考方案1】:

bigquery.Client.from_service_account_json 函数需要服务帐户文件的文件名,您向它提供该文件的内容,因此它会尝试查找路径以\r\n "type": "servi... 开头的文件,但它会以FileNotFound 失败。

可能的修复:

client = bigquery.Client.from_service_account_json(path_to_ofo_cred)

https://googleapis.dev/python/google-api-core/latest/auth.html#service-accounts

【讨论】:

以上是关于使用 Airflow 将 Bigquery 查询结果发送到数据框的主要内容,如果未能解决你的问题,请参考以下文章

将 Airflow(版本 1.10.5)与 Bigquery 连接

Airflow BigQuery Hook - 通过 run_query 运行更新查询

将 BigQuery 查询结果行写入 csv 文件时,某些记录重复

Airflow 中是不是有操作员可以根据 BigQuery 中的查询创建表?

Bigquery:如果不存在则创建表并使用 Python 和 Apache AirFlow 加载数据

如何设置 Airflow DAG 权限以查询基于 Google Sheets 文档构建的 BigQuery 表?