使用 Python API 获取 BigQuery 临时表“目标表”

Posted

技术标签:

【中文标题】使用 Python API 获取 BigQuery 临时表“目标表”【英文标题】:Getting BigQuery temporary table "destination table" using Python API 【发布时间】:2020-09-17 18:32:33 【问题描述】:

我的用例是创建一个临时表并从选择查询中加载数据,然后使用 Python API 在云存储中将表提取为 CSV。

我可以使用查询作业创建和加载临时表,但是无法从作业响应中找出导出到云存储所需的“目标表”。

这是代码

    from google.cloud import bigquery
    bq_key = settings.BASE_DIR + '/api_keys/storage_bq_admin.json'
    bq_client = bigquery.Client.from_service_account_json(bq_key, project='my-project-id')

    query = """
            EXECUTE IMMEDIATE
      "CREATE TEMP TABLE segusers1 (user_id STRING, client_id STRING, inserted_at TIMESTAMP) AS SELECT user_id,client_id,inserted_at FROM (SELECT *, ROW_NUMBER() OVER(PARTITION BY user_id ORDER BY inserted_at DESC ) AS top FROM `project-id.prod.users_partition_by_client` WHERE partition_id = 3666 AND client_id = '123456' AND inserted_at > '2020-09-17 00:59:11.461')"
            """
    query_job = bq_client.query(query, job_id="segment_temp_%s" % str(uuid.uuid4()))  # Make an API request.

    results = query_job.result()  # Waits for job to complete.
    bq_job_id = query_job.job_id
    print(query_job.__dict__)

query_job 返回

'_retry': <google.api_core.retry.Retry object at 0x7fdc41758748>, '_result': <google.cloud.bigquery.job.QueryJob object at 0x7fdc3ca682e8>, '_exception': None, '_result_set': True, '_polling_thread': None, '_done_callbacks': [], '_properties': 'kind': 'bigquery#job', 'etag': '3uEKLSpG6pZPeLsnzA==', 'id': 'pid-107805:US.segment_temp_3f7e533f-eb64-427f-bbb6-d3e31d78ca56', 'selfLink': 'https://bigquery.googleapis.com/bigquery/v2/projects/pid-107805/jobs/segment_temp_3f7e533f-eb64-427f-bbb6-d3e31d78ca56?location=US', 'user_email': '', 'configuration': 'query': 'query': '\n            EXECUTE IMMEDIATE\n      "CREATE TEMP TABLE segusers1 (user_id STRING, client_id STRING, inserted_at TIMESTAMP) AS SELECT user_id,client_id,inserted_at FROM (SELECT *, ROW_NUMBER() OVER(PARTITION BY user_id ORDER BY inserted_at DESC ) AS top FROM `pid.prod.users_partition_by_client` WHERE partition_id = 3666 AND client_id = \'cl3666dnx3klmb\' AND inserted_at > \'2020-09-17 00:59:11.461\')"\n            ', 'priority': 'INTERACTIVE', 'useLegacySql': False, 'jobType': 'QUERY', 'jobReference': 'projectId': 'pid-107805', 'jobId': 'segment_temp_3f7e533f-eb64-427f-bbb6-d3e31d78ca56', 'location': 'US', 'statistics': 'creationTime': 1600359344198.0, 'startTime': 1600359344308.0, 'endTime': 1600359346615.0, 'totalBytesProcessed': '1292600', 'query': 'totalBytesProcessed': '1292600', 'totalBytesBilled': '10485760', 'totalSlotMs': '6637', 'statementType': 'SCRIPT', 'totalSlotMs': '6637', 'numChildJobs': '1', 'scriptStatistics': , 'status': 'state': 'DONE', '_client': <google.cloud.bigquery.client.Client object at 0x7fdc42448588>, '_completion_lock': <unlocked _thread.lock object at 0x7fdc42355d00>, '_configuration': <google.cloud.bigquery.job.QueryJobConfig object at 0x7fdc423d8fd0>, '_query_results': <google.cloud.bigquery.query._QueryResults object at 0x7fdc42467da0>, '_done_timeout': None, '_transport_timeout': None

来自使用作业 ID 的文档 API 浏览器


  "kind": "bigquery#job",
  "etag": "3uEKLSpg961G6pZPeA==",
  "id": "pid-107805:US.segment_temp_3f7e533f-eb64-427f-bbb6-d3e31d78ca56",
  "selfLink": "https://content-bigquery.googleapis.com/bigquery/v2/projects/pid-107805/jobs/segment_temp_3f7e533f-eb64-427f-bbb6-d3e31d78ca56?location=US",
  "user_email": "storage-bq-admin@pid-107805.iam.gserviceaccount.com",
  "configuration": 
    "query": 
      "query": "\n            EXECUTE IMMEDIATE\n      \"CREATE TEMP TABLE segusers1 (user_id STRING, client_id STRING, inserted_at TIMESTAMP) AS SELECT user_id,client_id,inserted_at FROM (SELECT *, ROW_NUMBER() OVER(PARTITION BY user_id ORDER BY inserted_at DESC ) AS top FROM `pid-107805.prod.users_partition_by_client` WHERE partition_id = 3666 AND client_id = 'cl3666dnx3klmb' AND inserted_at \u003e '2020-09-17 00:59:11.461')\"\n            ",
      "priority": "INTERACTIVE",
      "useLegacySql": false
    ,
    "jobType": "QUERY"
  ,
  "jobReference": 
    "projectId": "pid-107805",
    "jobId": "segment_temp_3f7e533f-eb64-427f-bbb6-d3e31d78ca56",
    "location": "US"
  ,
  "statistics": 
    "creationTime": "1600359344198",
    "startTime": "1600359344308",
    "endTime": "1600359346615",
    "totalBytesProcessed": "1292600",
    "query": 
      "totalBytesProcessed": "1292600",
      "totalBytesBilled": "10485760",
      "totalSlotMs": "6637",
      "statementType": "SCRIPT"
    ,
    "totalSlotMs": "6637",
    "numChildJobs": "1",
    "scriptStatistics": 
  ,
  "status": 
    "state": "DONE"
  

两个返回都没有所需的destinationTable 详细信息,我认为查询应该保存新创建的表的datasetId 的值。不知道我在这里缺少什么。

【问题讨论】:

【参考方案1】:

这是在您的代码中滥用 EXECUTE IMMEDIATE 和 CREATE TEMP 表。

如果您同时删除两者,您的查询将变成一个直接的 SELECT,例如:

    query = """
            SELECT user_id,client_id,inserted_at ...
            """

然后您将能够在作业中找到目标表,类似于:


  "configuration": 
    "jobType": "QUERY", 
    "query": 
      "destinationTable":      <======== what you're looking for
        "datasetId": "_c53c0a2640dc04748b94ebc5d7193a6976b85fa1", 
        "projectId": "yourProject", 
        "tableId": "anon8b75560af5d60d88fd40befe1371bb83696c86e1"
      , 
...

【讨论】:

这不是创建普通表吗?我正在尝试创建临时表而不是需要脚本的常规表。削减成本是我选择临时表的主要原因 这个查询结果表和脚本中的TEMP表都是临时数据集中的,生命周期都是24小时,完全没有区别。

以上是关于使用 Python API 获取 BigQuery 临时表“目标表”的主要内容,如果未能解决你的问题,请参考以下文章

使用 bigquery 表 GET api 获取表的最后修改日期

使用bigquery表GET api获取表的最后修改日期

Python BigQuery API - 获取表架构/标题

可用于计算 BigQuery 查询成本的 Python API 是啥?

如何使用API 而不是使用Google BigQuery数据传输服务?

是否可以使用 python 中的 bigquery API 将数据集中所有表的计数(*)发送到 csv 文件?