无法使用python根据BQ中的查询创建另一个表
Posted
技术标签:
【中文标题】无法使用python根据BQ中的查询创建另一个表【英文标题】:Unable to create another table based on query in BQ using python 【发布时间】:2021-04-01 10:14:18 【问题描述】:我正在尝试创建一个表,该表是来自另一个表的查询的产物。
基本上我想生成另一个名为main_table
FROM stg_table
的表。
script.sql
:
SELECT * FROM `project_id.dataset.stg_table`
my_python.py
:
def get_field_schema(field):
name = field['name']
field_type = field['type']
mode = field['mode']
fields = field.get('fields', [])
if fields:
subschema = []
for val in fields:
fields_res = get_field_schema(val)
subschema.append(fields_res)
else:
subschema = []
field_schema = bigquery.SchemaField(name=name, field_type=field_type,
mode=mode, fields=subschema)
return field_schema
def parse_bq_json_schema(schema_filename):
schema = []
with open(schema_filename, 'r') as infile:
jsonschema = json.load(infile)
for field in jsonschema:
schema.append(get_field_schema(field))
return schema
def push_to_main_bq(final_table, tbl_schema_json, sql_file):
bq_client = bigquery.Client.from_service_account_json(key_path)
with open(sql_file, 'r') as file:
query = file.read()
query = query.format(PROJECT_ID=PROJECT_ID)
job_config = bigquery.QueryJobConfig(
allow_large_results=True, use_legacy_sql=False
)
job_config.schema = parse_bq_json_schema(tbl_schema_json) #ERROR HERE
# Start the query, passing in the extra configuration.
query_job = bq_client.query(query, job_config=job_config) # Make an API request.
print("Starting BQ load job for ..".format(query_job.job_id, PROJECT_ID,
DATASET_NAME, final_table))
query_job.result() # Wait for the job to complete.
print("BQ load job finished.")
push_to_main_bq("main_table","table_schema.json", "script.sql")
get_field_schema()
和 parse_bq_json_schema()
将产生以下结果:
[SchemaField('column_1', 'INTEGER', 'NULLABLE', None, (), None),......]
我认为这不会导致任何问题。因为我在其他地方使用过它,它们工作得很好。
但是,我在push_to_main_bq()
在线job_config.schema = parse_bq_json_schema(tbl_schema_json)
上遇到错误说:
文件 "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/job/base.py", 第 697 行,在 setattr 中 “ 的属性 未知。”.format(name, type(self)) AttributeError:
.
【问题讨论】:
【参考方案1】:QueryJobConfig 没有schema
属性。请改用destination。
找例子here:
from google.cloud import bigquery
# Construct a BigQuery client object.
client = bigquery.Client()
# TODO(developer): Set table_id to the ID of the destination table.
# table_id = "your-project.your_dataset.your_table_name"
sql = "SELECT * FROM `bigquery-public-data.samples.shakespeare`"
cluster_fields = ["corpus"]
job_config = bigquery.QueryJobConfig(
clustering_fields=cluster_fields, destination=table_id
)
# Start the query, passing in the extra configuration.
query_job = client.query(sql, job_config=job_config) # Make an API request.
query_job.result() # Wait for the job to complete.
table = client.get_table(table_id) # Make an API request.
if table.clustering_fields == cluster_fields:
print(
"The destination table is written using the cluster_fields configuration."
)
【讨论】:
需要clustering_fields
吗?它有什么用?
不,不需要。这是一种优化,可加快按此字段过滤的查询。
我还有一个问题,好像QueryJobCOnfig()
在我添加了time_partitioning 后没有运行。如果您可以查看,我可以更新我的代码
最好提出一个新问题,以便其他人看到它,因为我很快就会下线。以上是关于无法使用python根据BQ中的查询创建另一个表的主要内容,如果未能解决你的问题,请参考以下文章
BQ 命令行工具出错:无法在没有项目 ID 的情况下启动作业
将 BQ 查询结果下载到 Python 数据框时出现 504 Deadline Exceeded 错误
通过传输功能将数据从一个项目复制到 BQ 中的另一个项目是不是需要成本?