使用 python 将历史数据从谷歌云存储移动到日期分区的 bigquery 表

Posted

技术标签:

【中文标题】使用 python 将历史数据从谷歌云存储移动到日期分区的 bigquery 表【英文标题】:Moving historical data from google cloud storage to date-partitioned bigquery table using python 【发布时间】:2017-01-26 19:27:35 【问题描述】:

我需要在 google bigquery 中将大量历史数据组织到日期分区中。它将为您划分加载日期(仅限当前日期),但这对历史数据并没有真正的帮助。到目前为止,我看到的唯一解决方案是使用日期标志为每个日期手动执行此操作,直到谷歌进一步构建该工具。有什么解决办法吗?

【问题讨论】:

我建议通过将其拆分为实际问题(并将其作为问题发布)来“修复”您发布的内容,然后(看起来您已经有了很好的答案)只需回答您自己的问题作为回答(不是问题的一部分)。你也可以阅读How to Ask。 我会附和这似乎是一个有用的帖子。作为自我回答问题的示例,请参阅***.com/questions/41638651/…。 感谢您的建议 :) 【参考方案1】:

我创建了自己的管道并将其包含在下面。要运行它,请将这篇文章中的所有代码块放在一起。

import datetime
from subprocess import call

start = datetime.date( year = 2016, month = 10, day = 24)
#end = datetime.date( year = 2016, month = 10, day = 01 )
end = datetime.date.today()
file_type = ['activity', 'click', 'impression', 'rich_media']       
dataset = 'dataset_name'

下面的脚本会将文件从一个 GCS 存储桶复制到另一个。 google 双击文件的文件名中有创建日期,此脚本使用该创建日期来确定将文件放置在哪个日期分区中。

#pull all the files into our own buckets on gcs so that we dont lose data    after 3 months
call("gsutil -m cp -r gs://bucket/path/to/mysource* gs://mybucket/path/to/mydata, shell=True)

这似乎是快速分发历史数据的最佳方式。我想打开单个文件并将每一行放入正确的分区中,但我不知道如何。

#create list of dates based on stat and end date supplied by user at begining of script
def daterange( start_date, end_date ):
    if start_date <= end_date:
        for n in range( ( end_date - start_date ).days + 1 ):
            yield start_date + datetime.timedelta( n )
    else:
        for n in range( ( start_date - end_date ).days + 1 ):
            yield start_date - datetime.timedelta( n )

我在底部添加了 try/except 用于错误处理,但我认为它实际上并没有做任何事情,因为调用永远不会出错,如果表名弄乱了,它将在服务器上创建一个作业错误结束,但实际上不会停止该过程或伤害任何东西。

使用 --nosync 标志允许我将 call 用于异步作业,最初我使用的是 popen 但我不认为 popen 会自行清理(我认为 call 可以?)所以这似乎是一个更好的选择。

#creates a bunch of jobs to upload GCS files into GBQ partitioned tables
for bucket in file_type:
    for date in daterange( start, end ):
        date = str(date).replace('-','')
        source = 'gs://mybucket/path/to/mydata' + '*'
        table = bucket + '$' + date
        try: 
            process = call("bq --nosync load --skip_leading_rows=1 --replace  dev."\
                           + table + ' ' + source, shell=True)
        except:
            print 'missing ' + bucket + ' data for ' + date

【讨论】:

以上是关于使用 python 将历史数据从谷歌云存储移动到日期分区的 bigquery 表的主要内容,如果未能解决你的问题,请参考以下文章

使用python从谷歌云存储桶中删除数据时出错

从谷歌云数据存储迁移到谷歌云 sql

将数据从谷歌数据存储复制到 CSV

从谷歌云存储获取状态 503

从谷歌云存储中的 csv 加载数据作为 bigquery 'in' 查询

Numpy 从谷歌云存储加载内存映射数组(mmap_mode)