将结果写入 bigquery 中的永久表
Posted
技术标签:
【中文标题】将结果写入 bigquery 中的永久表【英文标题】:Write results to permanent table in bigquery 【发布时间】:2017-03-27 07:47:15 【问题描述】:我在 Bigquery SQL 中使用命名参数,并希望将结果写入永久表。我有两个函数 1 用于使用命名查询参数和 1 用于将查询结果写入表。如何将两者结合起来将查询结果写入表;具有命名参数的查询。
这是使用参数化查询的函数:
def sync_query_named_params(column_name,min_word_count,value):
query = """with lsq_results as
(select "%s" = @min_word_count)
replace (%s AS %s)
from lsq.lsq_results
""" % (min_word_count,value,column_name)
client = bigquery.Client()
query_results = client.run_sync_query(query
,
query_parameters=(
bigquery.ScalarQueryParameter('column_name', 'STRING', column_name),
bigquery.ScalarQueryParameter(
'min_word_count',
'STRING',
min_word_count),
bigquery.ScalarQueryParameter('value','INT64',value)
))
query_results.use_legacy_sql = False
query_results.run()
写入永久表的函数
class BigQueryClient(object):
def __init__(self, bq_service, project_id, swallow_results=True):
self.bigquery = bq_service
self.project_id = project_id
self.swallow_results = swallow_results
self.cache =
def write_to_table(
self,
query,
dataset=None,
table=None,
external_udf_uris=None,
allow_large_results=None,
use_query_cache=None,
priority=None,
create_disposition=None,
write_disposition=None,
use_legacy_sql=None,
maximum_billing_tier=None,
flatten=None):
configuration =
"query": query,
if dataset and table:
configuration['destinationTable'] =
"projectId": self.project_id,
"tableId": table,
"datasetId": dataset
if allow_large_results is not None:
configuration['allowLargeResults'] = allow_large_results
if flatten is not None:
configuration['flattenResults'] = flatten
if maximum_billing_tier is not None:
configuration['maximumBillingTier'] = maximum_billing_tier
if use_query_cache is not None:
configuration['useQueryCache'] = use_query_cache
if use_legacy_sql is not None:
configuration['useLegacySql'] = use_legacy_sql
if priority:
configuration['priority'] = priority
if create_disposition:
configuration['createDisposition'] = create_disposition
if write_disposition:
configuration['writeDisposition'] = write_disposition
if external_udf_uris:
configuration['userDefinedFunctionResources'] = \
[ 'resourceUri': u for u in external_udf_uris ]
body =
"configuration":
'query': configuration
logger.info("Creating write to table job %s" % body)
job_resource = self._insert_job(body)
self._raise_insert_exception_if_error(job_resource)
return job_resource
如何结合这两个函数来编写参数化查询并将结果写入永久表?或者如果有另一种更简单的方法。请提出建议。
【问题讨论】:
我的意思是问,如何在 write_to_table 函数中传递参数化查询?如,如果我需要传递参数化查询,我应该在 write_to_table() 中传递什么来代替查询? 【参考方案1】:您似乎在使用两个不同的客户端库。
您的第一个代码示例使用了 BigQuery 客户端库的 Beta 版,但我暂时建议不要使用它,因为它需要大量修改才能被认为普遍可用。 (如果您确实使用它,我建议使用run_async_query()
使用所有可用参数创建作业,然后调用results()
以获取QueryResults
对象。)
您的第二个代码示例是直接创建作业资源,这是一个较低级别的接口。使用此方法时,您可以直接在查询配置中指定 configuration.query.queryParameters 字段。这是我现在推荐的方法。
【讨论】:
以上是关于将结果写入 bigquery 中的永久表的主要内容,如果未能解决你的问题,请参考以下文章
如何获取 ValueProvider 的值并将其写入 BigQuery 表?
BigQuery:写入查询结果时使用 bigquery 作业的意外行为
如何将 Spark-BigQuery_connector 用于现有的 spark 环境(不使用 google dataproc)