BigQuery Python API copy_table 复制架构但不复制数据
Posted
技术标签:
【中文标题】BigQuery Python API copy_table 复制架构但不复制数据【英文标题】:BigQuery Python API copy_table copies schema but not data 【发布时间】:2020-06-27 15:12:37 【问题描述】:我正在尝试将一个 BigQuery 表复制到同一数据集中的另一个表中,使用 https://cloud.google.com/bigquery/docs/managing-tables#copy-table 的示例
我创建了一个函数来进行复制,如下:
def copy_table (source_table,dest_table):
client = bigquery.Client()
source_table_ref="my_project.my_dataset."+source_table
dest_table_ref="my_project.my_dataset."+dest_table
job = client.copy_table(
source_table_ref,
dest_table_ref) # API request
job.result()
但是当我进行复制时,会创建 dest_table,其架构与 source_table 相同,但没有数据从 source_table 复制到 dest_table。
这是我正在做的事情的总体顺序:
-
创建源表
在 source_table 中插入行
执行查询以检查行是否在 source_table 中(它们是 -- SELECT COUNT(*) 返回正确的行数)
使用上面的函数将source_table复制到dest_table
执行查询以检查行是否在 dest_table 中(它们不是 - SELECT COUNT(*) 返回零行)
我的猜测是这些作业以某种方式异步执行,但我不知道如何使它们同步执行。任何想法将不胜感激。
如果有帮助,我的总体目标是使用每日批处理作业的结果创建并填充一个新表(例如 get_user_info_2020-06-27),然后将其复制到始终包含当前的 get_user_info_current 表当天的用户信息。
编辑:
更多信息,基于测试:
在https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataavailability 的页面上,它说:“数据最多可能需要 90 分钟才能用于复制操作”。所以,我在插入语句之后编写了一个小东西来等待它完成:
def insert_table_wait(table_name,prev_rows,rows_inserted):
client = bigquery.Client()
table_id = "pacs-user-analysis-dev.google_users."+table_name
table = client.get_table(table_id) # Make an API request.
#wait until the insert fully completes
curr_table=client.get_table(table_id)
sys.stderr.write(str(datetime.datetime.now()) +" "+table_name +" properties: "+str(curr_table._properties)+"\n")
curr_rows=int(curr_table._properties.get('numRows'))
while curr_table._properties.get('streamingBuffer') is not None or curr_rows != prev_rows+rows_inserted:
sys.stderr.write(str(datetime.datetime.now()) +" Waiting for insert into "+str(curr_table._properties.get('id'))+" to complete. StreamingBuffer details: "+str(curr_table._properties.get('streamingBuffer'))+" prev_rows: "+str(prev_rows)+" curr_rows: "+str(curr_rows)+ " should be: " + str(prev_rows+rows_inserted)+"\n")
time.sleep(10)
curr_table=client.get_table(table_id)
curr_rows=int(curr_table._properties.get('numRows') )
我希望这可以解决问题。我不明白的是,新行几乎立即出现在 BigQuery 控制台 UI 中,但 table._properties.get('numRows') 似乎没有及时更新。
【问题讨论】:
我试图重现您的问题,但对我来说效果很好。您能否让我知道您的数据是否会在一段时间后插入到目标表中? 谢谢@rmesteves。不,它不会在一段时间后(周末)显示出来,即使测试插入只有 7 行。 您是在插入后复制数据吗?所以也许这就是问题所在。如果您在这方面有任何进展,请告诉我 @rmesteves:是的,这是由于插入后立即复制所致。我在编辑的问题中添加的“insert_table_wait”函数似乎确实可以通过等待插入完全完成来解决问题。 (虽然这确实需要很长时间——通常是 75-90 分钟——才能完成。痛苦!)我欢迎任何关于替代品的建议。奇怪的是,可以在插入完成后立即查询表中插入的行,而不是复制该表。耸耸肩。 我有一个建议给你。我将直接发布它作为答案,因为它很难在 cmets 中发布代码。我们可以在答案中讨论它是否达到您的预期行为 【参考方案1】:由于流缓冲区中的数据存在copy jobs
的限制,我建议您使用query job
并设置目标表,如下面的代码:
from google.cloud import bigquery
client = bigquery.Client(project = "your-project")
job_config = bigquery.QueryJobConfig(destination="destination-table-id")
# SELECT * to copy the whole table
sql = "SELECT * FROM <source_table>"
query_job = client.query(sql, job_config=job_config)
query_job.result()
如果对你有帮助,请告诉我
【讨论】:
非常感谢!我会试一试并报告。 伟大的@IanCrew。如果最终这个答案对您有帮助,请考虑接受或投票:)以上是关于BigQuery Python API copy_table 复制架构但不复制数据的主要内容,如果未能解决你的问题,请参考以下文章
通过 Python API 客户端将经过验证的查询发送到 BigQuery 时出现语法错误
使用 python 和 BigQuery API 获取 BigQuery 数据集中的表列表
使用 Python 和 API 创建 Bigquery 分区表