使用 Python API 获取 BigQuery 临时表“目标表”
Posted
技术标签:
【中文标题】使用 Python API 获取 BigQuery 临时表“目标表”【英文标题】:Getting BigQuery temporary table "destination table" using Python API 【发布时间】:2020-09-17 18:32:33 【问题描述】:我的用例是创建一个临时表并从选择查询中加载数据,然后使用 Python API 在云存储中将表提取为 CSV。
我可以使用查询作业创建和加载临时表,但是无法从作业响应中找出导出到云存储所需的“目标表”。
这是代码
from google.cloud import bigquery
bq_key = settings.BASE_DIR + '/api_keys/storage_bq_admin.json'
bq_client = bigquery.Client.from_service_account_json(bq_key, project='my-project-id')
query = """
EXECUTE IMMEDIATE
"CREATE TEMP TABLE segusers1 (user_id STRING, client_id STRING, inserted_at TIMESTAMP) AS SELECT user_id,client_id,inserted_at FROM (SELECT *, ROW_NUMBER() OVER(PARTITION BY user_id ORDER BY inserted_at DESC ) AS top FROM `project-id.prod.users_partition_by_client` WHERE partition_id = 3666 AND client_id = '123456' AND inserted_at > '2020-09-17 00:59:11.461')"
"""
query_job = bq_client.query(query, job_id="segment_temp_%s" % str(uuid.uuid4())) # Make an API request.
results = query_job.result() # Waits for job to complete.
bq_job_id = query_job.job_id
print(query_job.__dict__)
query_job 返回
'_retry': <google.api_core.retry.Retry object at 0x7fdc41758748>, '_result': <google.cloud.bigquery.job.QueryJob object at 0x7fdc3ca682e8>, '_exception': None, '_result_set': True, '_polling_thread': None, '_done_callbacks': [], '_properties': 'kind': 'bigquery#job', 'etag': '3uEKLSpG6pZPeLsnzA==', 'id': 'pid-107805:US.segment_temp_3f7e533f-eb64-427f-bbb6-d3e31d78ca56', 'selfLink': 'https://bigquery.googleapis.com/bigquery/v2/projects/pid-107805/jobs/segment_temp_3f7e533f-eb64-427f-bbb6-d3e31d78ca56?location=US', 'user_email': '', 'configuration': 'query': 'query': '\n EXECUTE IMMEDIATE\n "CREATE TEMP TABLE segusers1 (user_id STRING, client_id STRING, inserted_at TIMESTAMP) AS SELECT user_id,client_id,inserted_at FROM (SELECT *, ROW_NUMBER() OVER(PARTITION BY user_id ORDER BY inserted_at DESC ) AS top FROM `pid.prod.users_partition_by_client` WHERE partition_id = 3666 AND client_id = \'cl3666dnx3klmb\' AND inserted_at > \'2020-09-17 00:59:11.461\')"\n ', 'priority': 'INTERACTIVE', 'useLegacySql': False, 'jobType': 'QUERY', 'jobReference': 'projectId': 'pid-107805', 'jobId': 'segment_temp_3f7e533f-eb64-427f-bbb6-d3e31d78ca56', 'location': 'US', 'statistics': 'creationTime': 1600359344198.0, 'startTime': 1600359344308.0, 'endTime': 1600359346615.0, 'totalBytesProcessed': '1292600', 'query': 'totalBytesProcessed': '1292600', 'totalBytesBilled': '10485760', 'totalSlotMs': '6637', 'statementType': 'SCRIPT', 'totalSlotMs': '6637', 'numChildJobs': '1', 'scriptStatistics': , 'status': 'state': 'DONE', '_client': <google.cloud.bigquery.client.Client object at 0x7fdc42448588>, '_completion_lock': <unlocked _thread.lock object at 0x7fdc42355d00>, '_configuration': <google.cloud.bigquery.job.QueryJobConfig object at 0x7fdc423d8fd0>, '_query_results': <google.cloud.bigquery.query._QueryResults object at 0x7fdc42467da0>, '_done_timeout': None, '_transport_timeout': None
来自使用作业 ID 的文档 API 浏览器
"kind": "bigquery#job",
"etag": "3uEKLSpg961G6pZPeA==",
"id": "pid-107805:US.segment_temp_3f7e533f-eb64-427f-bbb6-d3e31d78ca56",
"selfLink": "https://content-bigquery.googleapis.com/bigquery/v2/projects/pid-107805/jobs/segment_temp_3f7e533f-eb64-427f-bbb6-d3e31d78ca56?location=US",
"user_email": "storage-bq-admin@pid-107805.iam.gserviceaccount.com",
"configuration":
"query":
"query": "\n EXECUTE IMMEDIATE\n \"CREATE TEMP TABLE segusers1 (user_id STRING, client_id STRING, inserted_at TIMESTAMP) AS SELECT user_id,client_id,inserted_at FROM (SELECT *, ROW_NUMBER() OVER(PARTITION BY user_id ORDER BY inserted_at DESC ) AS top FROM `pid-107805.prod.users_partition_by_client` WHERE partition_id = 3666 AND client_id = 'cl3666dnx3klmb' AND inserted_at \u003e '2020-09-17 00:59:11.461')\"\n ",
"priority": "INTERACTIVE",
"useLegacySql": false
,
"jobType": "QUERY"
,
"jobReference":
"projectId": "pid-107805",
"jobId": "segment_temp_3f7e533f-eb64-427f-bbb6-d3e31d78ca56",
"location": "US"
,
"statistics":
"creationTime": "1600359344198",
"startTime": "1600359344308",
"endTime": "1600359346615",
"totalBytesProcessed": "1292600",
"query":
"totalBytesProcessed": "1292600",
"totalBytesBilled": "10485760",
"totalSlotMs": "6637",
"statementType": "SCRIPT"
,
"totalSlotMs": "6637",
"numChildJobs": "1",
"scriptStatistics":
,
"status":
"state": "DONE"
两个返回都没有所需的destinationTable 详细信息,我认为查询应该保存新创建的表的datasetId 的值。不知道我在这里缺少什么。
【问题讨论】:
【参考方案1】:这是在您的代码中滥用 EXECUTE IMMEDIATE 和 CREATE TEMP 表。
如果您同时删除两者,您的查询将变成一个直接的 SELECT,例如:
query = """
SELECT user_id,client_id,inserted_at ...
"""
然后您将能够在作业中找到目标表,类似于:
"configuration":
"jobType": "QUERY",
"query":
"destinationTable": <======== what you're looking for
"datasetId": "_c53c0a2640dc04748b94ebc5d7193a6976b85fa1",
"projectId": "yourProject",
"tableId": "anon8b75560af5d60d88fd40befe1371bb83696c86e1"
,
...
【讨论】:
这不是创建普通表吗?我正在尝试创建临时表而不是需要脚本的常规表。削减成本是我选择临时表的主要原因 这个查询结果表和脚本中的TEMP表都是临时数据集中的,生命周期都是24小时,完全没有区别。以上是关于使用 Python API 获取 BigQuery 临时表“目标表”的主要内容,如果未能解决你的问题,请参考以下文章
使用 bigquery 表 GET api 获取表的最后修改日期
Python BigQuery API - 获取表架构/标题
可用于计算 BigQuery 查询成本的 Python API 是啥?