运行 BigQuery 查询并使用气流将数据写入 Parquet 中的云存储桶

Posted

技术标签:

【中文标题】运行 BigQuery 查询并使用气流将数据写入 Parquet 中的云存储桶【英文标题】:Run a BigQuery query and write the data into cloud storage bucket in parquet using airflow 【发布时间】:2021-11-24 09:04:18 【问题描述】:

我正在尝试创建一个 DAG,它将从 BigQuery 查询中提取数据并以 parquet 格式写入 gcs 存储桶。我查看了this question 并在这里得到了一些帮助。它建议使用BigQueryOperator 执行查询,然后使用BigQueryToCloudStorageOperator 写入gcs 存储桶。使用这种方法,我必须首先将查询结果写入表中,然后从该表中写入 gcs 存储桶。

分两步:

bq_query = bigquery_operator.BigQueryOperator(
    task_id='bq_query',
    sql="""
        <select query with filters>
        """.format(date=date1),
        use_legacy_sql=False,
        destination_dataset_table=<table_name>
        location="southamerica-east1",
        write_disposition="WRITE_EMPTY",
        create_disposition="CREATE_IF_NEEDED")



export_to_gcs = bigquery_to_gcs.BigQueryToCloudStorageOperator(
    task_id='export_to_gcs',
    source_project_dataset_table=destination_dataset_table,
    destination_cloud_storage_uris=[output_file],
    export_format='PARQUET')

有没有一种方法可以直接将大查询数据写入 gcs 存储桶而无需先写入表?我相信直接导出是可能的,但我正在寻找使用过滤器运行查询然后写入 gcs。

【问题讨论】:

【参考方案1】:

有可能,运营商为我们提供了一种固定的做事方式,它并不总是最佳的,但它可以节省时间。因此,其中一种方法是使用 python 运算符,该运算符具有检索 bigquery 数据并将输出上传到存储的功能。

Python 运算符

task = PythonOperator(
        task_id='get_data_and_upload',
        python_callable=get_bigquery_data,
        op_kwargs='custom_date': date
      )

函数 get_bigquery_data

# Libraries to use 
from google.cloud import bigquery, storage 
from google.oauth2 import service_account
import pandas as pd

# function
def get_data_and_upload(custom_date):
    # Construct a BigQuery client object.
    key_path = "path/to/service_account.json"

    credentials = service_account.Credentials.from_service_account_file(
        key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"],
    )

    client = bigquery.Client(credentials=credentials, 
    project=credentials.project_id,)
    
    #for this sample im just print it and not using it.
    print(custom_date)

    query = """
        SELECT name, SUM(number) as total_people
        FROM `bigquery-public-data.usa_names.usa_1910_2013`
        WHERE state = 'TX'
        GROUP BY name, state
        ORDER BY total_people DESC
        LIMIT 20
    """
    query_job = client.query(query)  
    output = []

    # build our result object
    for row in query_job:
        output.append('name':row[0],'total_people':row[1])

    # move to a dataframe. I use pandas for parquet conversion
    df = pd.DataFrame(output)
    bobject = df.to_parquet(path=None,compression='gzip')

    # upload file object to google cloud storage
    bucket_name = "my-bucket-name" 
    destination_blob_name = "parquet_files/parquet_file.gzip" 

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_string(bobject) 

    print("File uploaded to .".format(destination_blob_name))

此外,以上示例都是一步完成的。它有效,但通常不好看,太死板了。请记住,您可以创建您认为合适的 python 函数(一个用于获取数据,一个用于转换数据,一个用于将数据推送到实际存储中)。如果你的airflow版本在2.0以上,可以使用taskflowapi传参。如果您的版本低于该版本,则必须使用XCOM

我已经测试了该功能,我认为由于您自己的环境限制或版本控制,您应该小心地将其转换为您的气流安装,因为您可能需要更新代码或使用不同的库来获得相同的输出。

其他选项是创建您自己的custom operator。

这里有一些有用的链接:

Python Operator Passing parameters between operators using taskflowapi Bigquery Python Client Bigquery authentication Pandas to_parquet Uploading a file using Cloud Storage Python Client

【讨论】:

以上是关于运行 BigQuery 查询并使用气流将数据写入 Parquet 中的云存储桶的主要内容,如果未能解决你的问题,请参考以下文章

优化将BigQuery的数据传输到MongoDB的气流任务

如何在另一个任务气流中使用查询结果(bigquery 运算符)

引入整行数据(通过气流)时,Google GCS 到 BIGQUERY 失败

使用 Google Composer 运行 Bigquery 查询

Airflow 中的 BigQuery 参数化查询

如何使用 Apps 脚本运行不将结果写入表的 BigQuery 作业?