使用 Pandas 界面恢复上传到 BigQuery
Posted
技术标签:
【中文标题】使用 Pandas 界面恢复上传到 BigQuery【英文标题】:Resuming Upload to BigQuery using the Pandas interface 【发布时间】:2019-03-25 19:49:26 【问题描述】:我正在使用BigQuery python API 和BigQuery connector for Pandas。
每次我 append
到 BigQuery 中的数据集时,我都想确保从上次离开的地方开始,以防止数据重复和丢失。
Load Job Config 或其他地方是否有设置可以自动执行此操作?如果不是,你建议我如何处理连接错误和上传重连,同时防止数据重复?我知道我可以查询最后一行并据此追加数据,但我不想查询,因为 BigQuery 会收取查询费用。
这是我目前上传到 BigQuery 的内容:
import pandas as pd
from google.cloud import bigquery, exceptions
test_df = pd.DataFrame(
'num_legs': [2, 4, 8, 0],
'num_wings': [2, 0, 0, 0],
'num_specimen_seen': [10, 2, 1, 8],
'names': ['falcon', 'dog', 'spider', 'fish']
)
project = "test-project"
dataset_id = "test-dataset"
table_id = "test-table"
client = bigquery.Client()
try:
dataset_ref = client.dataset(dataset_id=dataset_id, project=project)
dataset = client.get_dataset(dataset_ref)
except exceptions.NotFound:
print("specified dataset not found! -- creating a new dataset.")
dataset = client.create_dataset(dataset_id)
table_ref = dataset.table(table_id)
load_job = client.load_table_from_dataframe(
dataframe=test_df, destination=table_ref, project=project)
load_job.result()
【问题讨论】:
【参考方案1】:假设,您可以在源数据上生成唯一的 load-batch-id。
实现目标的一种方法是:
使用 load_batch_id 标记您的加载作业
Client.load_table_from_dataframe 有参数:job_config(google.cloud.bigquery.job.LoadJobConfig,可选)
将 load_batch_id 注入 LoadJobConfig.labels(Dict[str, str] – 作业的标签。)
当您需要确认一个加载作业是否成功时,使用job.list api 搜索带有标签load_batch_id的作业。
一种简化的情况是,例如,您每天只将数据加载到 BQ,并且您的源数据可以按日期分组。然后,您的 20190325 python 脚本首先检查标记为 20190324 的作业(或更深入地了解过去),看看它是否需要重试。
【讨论】:
以上是关于使用 Pandas 界面恢复上传到 BigQuery的主要内容,如果未能解决你的问题,请参考以下文章
使用 Pandas 或命令行上传到 BigQuery 时出现奇怪的重复字段错误。所有字段唯一
由于 InvalidSchema 错误,将 Pandas 上传到 BigQuery 失败
Pandas,使用 merge_cells=False 从 excel 恢复多索引