使用 Airflow 将 Bigquery 查询结果发送到数据框
Posted
技术标签:
【中文标题】使用 Airflow 将 Bigquery 查询结果发送到数据框【英文标题】:Bigquery query result to dataframe with Airflow 【发布时间】:2019-10-22 14:07:30 【问题描述】:我正在尝试从 bigquery 查询数据并使用 Airflow 将其写入数据帧。但它要么给出file not found
(服务帐户密钥)或file name is too long
或eof line read
错误。
我也尝试过使用钩子,但我无法将密钥文件作为 json 文件,因为它说它太长了。
关于如何实现它的任何提示?
def get_data_from_GBQ():
global customer_data
ofo_cred = Variable.get("ofo_cred")
logging.info(ofo_cred)
logging.info("Variable is here")
customer_data_query = """ SELECT FirstName, LastName, Organisation FROM `bigquery-bi.ofo.Customers` LIMIT 2 """
logging.info("test")
# Creating a connection to the google bigquery
client = bigquery.Client.from_service_account_json(ofo_cred)
logging.info("after client")
customer_data = client.query(customer_data_query).to_dataframe()
logging.info("after client")
print(customer_data)
dag = DAG(
'odoo_gbq_connection',
default_args=default_args,
description='A connection between ',
schedule_interval=timedelta(days=1),)
错误是:
FileNotFoundError: [Errno 2] No such file or directory: '\r\n "type": "service_account",\r\n "project_id":...
【问题讨论】:
我很好奇您为什么会看到 3 种不同类型的错误消息。您能否为您的问题添加更多细节? 很高兴看到您的 DAG 代码。请准备最小的例子 @YunZhang 我没有看到他们三个在一起。每次,当我尝试新事物时,都会抛出不同的错误:( @ArtemVovsia 我已经更新了。你能检查一下吗?这是一个完全正常的日志变量,但是在授权期间,它会抛出上面的错误。我不知道为什么。 您使用的是开源 Airflow 还是 Cloud Composer? 【参考方案1】:bigquery.Client.from_service_account_json
函数需要服务帐户文件的文件名,您向它提供该文件的内容,因此它会尝试查找路径以\r\n "type": "servi...
开头的文件,但它会以FileNotFound
失败。
可能的修复:
client = bigquery.Client.from_service_account_json(path_to_ofo_cred)
https://googleapis.dev/python/google-api-core/latest/auth.html#service-accounts
【讨论】:
以上是关于使用 Airflow 将 Bigquery 查询结果发送到数据框的主要内容,如果未能解决你的问题,请参考以下文章
将 Airflow(版本 1.10.5)与 Bigquery 连接
Airflow BigQuery Hook - 通过 run_query 运行更新查询
将 BigQuery 查询结果行写入 csv 文件时,某些记录重复
Airflow 中是不是有操作员可以根据 BigQuery 中的查询创建表?