使用 Pandas 界面恢复上传到 BigQuery

Posted

技术标签:

【中文标题】使用 Pandas 界面恢复上传到 BigQuery【英文标题】:Resuming Upload to BigQuery using the Pandas interface 【发布时间】:2019-03-25 19:49:26 【问题描述】:

我正在使用BigQuery python API 和BigQuery connector for Pandas。

每次我 append 到 BigQuery 中的数据集时,我都想确保从上次离开的地方开始,以防止数据重复和丢失。

Load Job Config 或其他地方是否有设置可以自动执行此操作?如果不是,你建议我如何处理连接错误和上传重连,同时防止数据重复?我知道我可以查询最后一行并据此追加数据,但我不想查询,因为 BigQuery 会收取查询费用。

这是我目前上传到 BigQuery 的内容:

import pandas as pd
from google.cloud import bigquery, exceptions

test_df = pd.DataFrame(
    'num_legs': [2, 4, 8, 0],
    'num_wings': [2, 0, 0, 0],
    'num_specimen_seen': [10, 2, 1, 8],
    'names': ['falcon', 'dog', 'spider', 'fish']
)
project = "test-project"
dataset_id = "test-dataset"
table_id = "test-table"
client = bigquery.Client()
try:
    dataset_ref = client.dataset(dataset_id=dataset_id, project=project)
    dataset = client.get_dataset(dataset_ref)
except exceptions.NotFound:
    print("specified dataset not found! -- creating a new dataset.")
    dataset = client.create_dataset(dataset_id)

table_ref = dataset.table(table_id)
load_job = client.load_table_from_dataframe(
    dataframe=test_df, destination=table_ref, project=project)
load_job.result()

【问题讨论】:

【参考方案1】:

假设,您可以在源数据上生成唯一的 load-batch-id

实现目标的一种方法是:

    使用 load_batch_id 标记您的加载作业

    Client.load_table_from_dataframe 有参数:job_config(google.cloud.bigquery.job.LoadJobConfig,可选)

    将 load_batch_id 注入 LoadJobConfig.labels(Dict[str, str] – 作业的标签。)

    当您需要确认一个加载作业是否成功时,使用job.list api 搜索带有标签load_batch_id的作业。

一种简化的情况是,例如,您每天只将数据加载到 BQ,并且您的源数据可以按日期分组。然后,您的 20190325 python 脚本首先检查标记为 20190324 的作业(或更深入地了解过去),看看它是否需要重试。

【讨论】:

以上是关于使用 Pandas 界面恢复上传到 BigQuery的主要内容,如果未能解决你的问题,请参考以下文章

使用 Pandas 或命令行上传到 BigQuery 时出现奇怪的重复字段错误。所有字段唯一

由于 InvalidSchema 错误,将 Pandas 上传到 BigQuery 失败

Pandas,使用 merge_cells=False 从 excel 恢复多索引

如何显示带有我使用 Dash 上传组件上传的文件的 pandas 数据框?

hibernate如何实现数据备份和数据恢复

支持恢复将大文件上传到 S3