BigQuery Python API copy_table 复制架构但不复制数据

Posted

技术标签:

【中文标题】BigQuery Python API copy_table 复制架构但不复制数据【英文标题】:BigQuery Python API copy_table copies schema but not data 【发布时间】:2020-06-27 15:12:37 【问题描述】:

我正在尝试将一个 BigQuery 表复制到同一数据集中的另一个表中,使用 https://cloud.google.com/bigquery/docs/managing-tables#copy-table 的示例

我创建了一个函数来进行复制,如下:

def copy_table (source_table,dest_table):
    client = bigquery.Client()
    source_table_ref="my_project.my_dataset."+source_table
    dest_table_ref="my_project.my_dataset."+dest_table

    job = client.copy_table(
        source_table_ref,
        dest_table_ref)  # API request
    job.result()

但是当我进行复制时,会创建 dest_table,其架构与 source_table 相同,但没有数据从 source_table 复制到 dest_table。

这是我正在做的事情的总体顺序:

    创建源表 在 source_table 中插入行 执行查询以检查行是否在 source_table 中(它们是 -- SELECT COUNT(*) 返回正确的行数) 使用上面的函数将source_table复制到dest_table 执行查询以检查行是否在 dest_table 中(它们不是 - SELECT COUNT(*) 返回零行)

我的猜测是这些作业以某种方式异步执行,但我不知道如何使它们同步执行。任何想法将不胜感激。

如果有帮助,我的总体目标是使用每日批处理作业的结果创建并填充一个新表(例如 get_user_info_2020-06-27),然后将其复制到始终包含当前的 get_user_info_current 表当天的用户信息。

编辑:

更多信息,基于测试:

在https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataavailability 的页面上,它说:“数据最多可能需要 90 分钟才能用于复制操作”。所以,我在插入语句之后编写了一个小东西来等待它完成:

def insert_table_wait(table_name,prev_rows,rows_inserted):
    client = bigquery.Client()
    table_id = "pacs-user-analysis-dev.google_users."+table_name
    table = client.get_table(table_id)  # Make an API request.

    #wait until the insert fully completes
    curr_table=client.get_table(table_id)
    sys.stderr.write(str(datetime.datetime.now()) +" "+table_name +" properties: "+str(curr_table._properties)+"\n")
    curr_rows=int(curr_table._properties.get('numRows'))
    while curr_table._properties.get('streamingBuffer') is not None or curr_rows != prev_rows+rows_inserted:
        sys.stderr.write(str(datetime.datetime.now()) +" Waiting for insert into "+str(curr_table._properties.get('id'))+" to complete. StreamingBuffer details: "+str(curr_table._properties.get('streamingBuffer'))+" prev_rows: "+str(prev_rows)+" curr_rows: "+str(curr_rows)+ " should be: " + str(prev_rows+rows_inserted)+"\n")
        time.sleep(10)
        curr_table=client.get_table(table_id)
        curr_rows=int(curr_table._properties.get('numRows') )

我希望这可以解决问题。我不明白的是,新行几乎立即出现在 BigQuery 控制台 UI 中,但 table._properties.get('numRows') 似乎没有及时更新。

【问题讨论】:

我试图重现您的问题,但对我来说效果很好。您能否让我知道您的数据是否会在一段时间后插入到目标表中? 谢谢@rmesteves。不,它不会在一段时间后(周末)显示出来,即使测试插入只有 7 行。 您是在插入后复制数据吗?所以也许这就是问题所在。如果您在这方面有任何进展,请告诉我 @rmesteves:是的,这是由于插入后立即复制所致。我在编辑的问题中添加的“insert_table_wait”函数似乎确实可以通过等待插入完全完成来解决问题。 (虽然这确实需要很长时间——通常是 75-90 分钟——才能完成。痛苦!)我欢迎任何关于替代品的建议。奇怪的是,可以在插入完成后立即查询表中插入的行,而不是复制该表。耸耸肩。 我有一个建议给你。我将直接发布它作为答案,因为它很难在 cmets 中发布代码。我们可以在答案中讨论它是否达到您的预期行为 【参考方案1】:

由于流缓冲区中的数据存在copy jobs 的限制,我建议您使用query job 并设置目标表,如下面的代码:

from google.cloud import bigquery
client = bigquery.Client(project = "your-project")

job_config = bigquery.QueryJobConfig(destination="destination-table-id")
# SELECT * to copy the whole table
sql = "SELECT * FROM <source_table>"
query_job = client.query(sql, job_config=job_config)
query_job.result()

如果对你有帮助,请告诉我

【讨论】:

非常感谢!我会试一试并报告。 伟大的@IanCrew。如果最终这个答案对您有帮助,请考虑接受或投票:)

以上是关于BigQuery Python API copy_table 复制架构但不复制数据的主要内容,如果未能解决你的问题,请参考以下文章

通过 Python API 客户端将经过验证的查询发送到 BigQuery 时出现语法错误

是否已更改Python BigQuery API?

使用 python 和 BigQuery API 获取 BigQuery 数据集中的表列表

使用 Python 和 API 创建 Bigquery 分区表

如何使用 Python BigQuery API 追加到 BigQuery 中的表

如何使用 google-api-python-client 设置 BigQuery 配置属性?