BigQuery:将数据导出到分层文件夹:YYYY/MM/DD

Posted

技术标签:

【中文标题】BigQuery:将数据导出到分层文件夹:YYYY/MM/DD【英文标题】:BigQuery: Export data into hierarchical folders: YYYY/MM/DD 【发布时间】:2019-10-21 14:12:42 【问题描述】:

我想导出 BigQuery 中的一个日期分区表。我想将其导出,以便每天的数据最终存储在不同的文件中。例如,对于具有嵌套文件夹结构(如 gs://my-bucket/YYYY/MM/DD/)的 GS 存储桶。这可能吗?

请不要告诉我我需要为每天的数据运行单独的导出作业:我知道这是可能的,但导出多年的数据时会很痛苦,因为您需要运行数千个导出作业。

在导入方面,这可以通过镶木地板格式实现。

如果直接使用 BigQuery 无法做到这一点,是否有像 dataproc 或 dataflow 这样的 GCS 工具可以简化此操作(链接到实际执行此导出的脚本的奖励积分)。

【问题讨论】:

您目前有一些示例代码用于导出单个表吗? BigQuery 目前似乎不支持此功能,因此我创建了一个功能请求 here。如果您对此功能感兴趣,请启动它。 @Ben P 我在下面发布了我过去用来执行此操作的代码。 【参考方案1】:

带有bq extract 的 bash 脚本可以工作吗?

#!/bin/bash

# Stop on first error
set -e;

# Used for Bigquery partitioning (to distinguish from bash variable reference)
DOLLAR="\$"

# -I ISO DATE
# -d FROM STRING
start=$(date -I -d 2019-06-01) || exit -1
end=$(date -I -d 2019-06-15)   || exit -1

d=$start

# string(d) <= string(end)
while [[ ! "$d" > "$end" ]]; do
    YYYYMMDD=$(date -d $d +"%Y%m%d")
    YYYY=$(date -d $d +"%Y")
    MM=$(date -d $d +"%m")
    DD=$(date -d $d +"%d")

    # print current date
    echo $d

    cmd="bq extract --destination_format=AVRO \
    'project:dataset.table$DOLLAR$YYYYMMDD' \
    'gs://my-bucket/$YYYY/$MM/$DD/part*.avro'
    "

    # execute    
    eval $cmd

    # d++
    d=$(date -I -d "$d + 1 day")
done

也许您应该在https://issuetracker.google.com/savedsearches/559654 申请新功能。

不是 bash 忍者,所以请确保有更酷的方法来比较日期。

【讨论】:

感谢您的建议,但正如我在问题中指出的那样,我已经有了一个解决方案,可以通过大量日常导出工作来解决这个问题。那里的问题是,当您踢出数千个这些时,您需要监视每个并并行运行它们,这比普通用户想要处理的复杂性更高。我创建了一个功能请求here。 目前不可能,正如您所说,您可以选择在第一个错误模式(我的解决方案)上停止的顺序或使用-sync,--[no]synchronous_mode 模式与监视/记录并行(可能是默认模式)使用 API 标记)。【参考方案2】:

应@Ben P 的要求,这是我之前用来并行运行大量导出作业的解决方案(python 脚本)。这是相当粗略的代码,应该通过在运行后检查每个导出作业的状态以查看它是否成功来改进。

我不会接受这个答案,因为这个问题正在寻找执行此任务的 bigquery-native 方式。

请注意,此脚本用于导出版本化的数据集,因此围绕着许多用户可能不需要的一些额外逻辑。它假定输入表和输出文件夹名称都使用该版本。这应该很容易去掉。

import argparse
import datetime as dt
from google.cloud import bigquery
from multiprocessing import Pool
import random
import time

GCS_EXPORT_BUCKET = "YOUR_BUCKET_HERE"
VERSION = "dataset_v1"

def export_date(export_dt, bucket=GCS_EXPORT_BUCKET, version=VERSION):
    table_id = '$:%Y%m%d'.format(version, export_dt)
    gcs_filename = '/:%Y/%m/%d/-*.jsonlines.gz'.format(version, export_dt, table_id)
    gcs_path = 'gs:///'.format(bucket, gcs_filename)
    job_id = export_data_to_gcs(table_id, gcs_path, 'currents')
    return (export_dt, job_id)

def export_data_to_gcs(table_id, destination_gcs_path, dataset):
    bigquery_client = bigquery.Client()
    dataset_ref = bigquery_client.dataset(dataset)
    table_ref = dataset_ref.table(table_id)
    job_config = bigquery.job.ExtractJobConfig()
    job_config.destination_format = 'NEWLINE_DELIMITED_JSON'
    job_config.compression = 'GZIP'
    job_id = 'export--:%Y%m%d%H%M%S'.format(table_id.replace('$', '--'),
                                                dt.datetime.utcnow())
    # Add a bit of jitter
    time.sleep(5 * random.random())
    job = bigquery_client.extract_table(table_ref,
                                        destination_gcs_path,
                                        job_config=job_config,
                                        job_id=job_id)
    print(f'Now running job_id job_id')
    time.sleep(50)
    job.reload()
    while job.running():
        time.sleep(10)
        job.reload()
    return job_id



if __name__ == "__main__":
    parser = argparse.ArgumentParser()

    parser.add_argument('-s', "--startdate",
                        help="The Start Date - format YYYY-MM-DD (Inclusive)",
                        required=True,
                        type=dt.date.fromisoformat)
    parser.add_argument('-e', "--enddate",
                        help="The End Date format YYYY-MM-DD (Exclusive)",
                        required=True,
                        type=dt.date.fromisoformat)

    args = parser.parse_args()

    start_date = args.startdate
    end_date = args.enddate

    dates = []
    while start_date < end_date:
        dates.append(start_date)
        start_date += dt.timedelta(days=1)

    with Pool(processes=30) as pool:
        jobs = pool.map(export_date, dates, chunksize=1)

要运行此代码,请将其放入名为 bq_exporter.py 的文件中,然后运行 ​​python bq_exporter.py -s 2019-01-01 -e 2019-02-01。这将导出 2019 年 1 月,并打印每个导出作业的 ID。您可以通过 bq show -j JOB_ID 使用 BigQuery CLI 检查作业的状态。

【讨论】:

以上是关于BigQuery:将数据导出到分层文件夹:YYYY/MM/DD的主要内容,如果未能解决你的问题,请参考以下文章

GCP将数据作为字符串从GCS中的CSV文件加载到BigQuery表中

无法将 Google BigQuery 导出到本地计算机中的 CSV 文件

谷歌分析 Bigquery 导出

如何限制从 bigquery 导出到 gcs 的文件大小?

如何将所有事件数据从 Firebase 导出到 BigQuery?

如何将数据框从 Cloud Datalab 导出到 BigQuery 表?