运行 BigQuery 查询并使用气流将数据写入 Parquet 中的云存储桶
Posted
技术标签:
【中文标题】运行 BigQuery 查询并使用气流将数据写入 Parquet 中的云存储桶【英文标题】:Run a BigQuery query and write the data into cloud storage bucket in parquet using airflow 【发布时间】:2021-11-24 09:04:18 【问题描述】:我正在尝试创建一个 DAG,它将从 BigQuery 查询中提取数据并以 parquet 格式写入 gcs 存储桶。我查看了this question 并在这里得到了一些帮助。它建议使用BigQueryOperator
执行查询,然后使用BigQueryToCloudStorageOperator
写入gcs 存储桶。使用这种方法,我必须首先将查询结果写入表中,然后从该表中写入 gcs 存储桶。
分两步:
bq_query = bigquery_operator.BigQueryOperator(
task_id='bq_query',
sql="""
<select query with filters>
""".format(date=date1),
use_legacy_sql=False,
destination_dataset_table=<table_name>
location="southamerica-east1",
write_disposition="WRITE_EMPTY",
create_disposition="CREATE_IF_NEEDED")
export_to_gcs = bigquery_to_gcs.BigQueryToCloudStorageOperator(
task_id='export_to_gcs',
source_project_dataset_table=destination_dataset_table,
destination_cloud_storage_uris=[output_file],
export_format='PARQUET')
有没有一种方法可以直接将大查询数据写入 gcs 存储桶而无需先写入表?我相信直接导出是可能的,但我正在寻找使用过滤器运行查询然后写入 gcs。
【问题讨论】:
【参考方案1】:有可能,运营商为我们提供了一种固定的做事方式,它并不总是最佳的,但它可以节省时间。因此,其中一种方法是使用 python 运算符,该运算符具有检索 bigquery 数据并将输出上传到存储的功能。
Python 运算符
task = PythonOperator(
task_id='get_data_and_upload',
python_callable=get_bigquery_data,
op_kwargs='custom_date': date
)
函数 get_bigquery_data
# Libraries to use
from google.cloud import bigquery, storage
from google.oauth2 import service_account
import pandas as pd
# function
def get_data_and_upload(custom_date):
# Construct a BigQuery client object.
key_path = "path/to/service_account.json"
credentials = service_account.Credentials.from_service_account_file(
key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
client = bigquery.Client(credentials=credentials,
project=credentials.project_id,)
#for this sample im just print it and not using it.
print(custom_date)
query = """
SELECT name, SUM(number) as total_people
FROM `bigquery-public-data.usa_names.usa_1910_2013`
WHERE state = 'TX'
GROUP BY name, state
ORDER BY total_people DESC
LIMIT 20
"""
query_job = client.query(query)
output = []
# build our result object
for row in query_job:
output.append('name':row[0],'total_people':row[1])
# move to a dataframe. I use pandas for parquet conversion
df = pd.DataFrame(output)
bobject = df.to_parquet(path=None,compression='gzip')
# upload file object to google cloud storage
bucket_name = "my-bucket-name"
destination_blob_name = "parquet_files/parquet_file.gzip"
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_string(bobject)
print("File uploaded to .".format(destination_blob_name))
此外,以上示例都是一步完成的。它有效,但通常不好看,太死板了。请记住,您可以创建您认为合适的 python 函数(一个用于获取数据,一个用于转换数据,一个用于将数据推送到实际存储中)。如果你的airflow版本在2.0以上,可以使用taskflowapi
传参。如果您的版本低于该版本,则必须使用XCOM
。
我已经测试了该功能,我认为由于您自己的环境限制或版本控制,您应该小心地将其转换为您的气流安装,因为您可能需要更新代码或使用不同的库来获得相同的输出。
其他选项是创建您自己的custom operator。
这里有一些有用的链接:
Python Operator Passing parameters between operators using taskflowapi Bigquery Python Client Bigquery authentication Pandas to_parquet Uploading a file using Cloud Storage Python Client【讨论】:
以上是关于运行 BigQuery 查询并使用气流将数据写入 Parquet 中的云存储桶的主要内容,如果未能解决你的问题,请参考以下文章
如何在另一个任务气流中使用查询结果(bigquery 运算符)
引入整行数据(通过气流)时,Google GCS 到 BIGQUERY 失败