无法使用python根据BQ中的查询创建另一个表

Posted

技术标签:

【中文标题】无法使用python根据BQ中的查询创建另一个表【英文标题】:Unable to create another table based on query in BQ using python 【发布时间】:2021-04-01 10:14:18 【问题描述】:

我正在尝试创建一个表,该表是来自另一个表的查询的产物。 基本上我想生成另一个名为main_table FROM stg_table 的表。

script.sql:

SELECT * FROM `project_id.dataset.stg_table`

my_python.py:

def get_field_schema(field):

    name = field['name']
    field_type = field['type']
    mode = field['mode']
    fields = field.get('fields', [])

    if fields:
        subschema = []
        for val in fields:
            fields_res = get_field_schema(val)
            subschema.append(fields_res)
    else:
        subschema = []

    field_schema = bigquery.SchemaField(name=name, field_type=field_type,
                                        mode=mode, fields=subschema)

    return field_schema


def parse_bq_json_schema(schema_filename):

    schema = []
    with open(schema_filename, 'r') as infile:
        jsonschema = json.load(infile)

    for field in jsonschema:
        schema.append(get_field_schema(field))

    return schema

def push_to_main_bq(final_table, tbl_schema_json, sql_file):

    bq_client = bigquery.Client.from_service_account_json(key_path)
    
    with open(sql_file, 'r') as file:
        query = file.read()
    query = query.format(PROJECT_ID=PROJECT_ID)

    job_config = bigquery.QueryJobConfig(
        allow_large_results=True, use_legacy_sql=False
    )

    job_config.schema = parse_bq_json_schema(tbl_schema_json) #ERROR HERE
    # Start the query, passing in the extra configuration.
    query_job = bq_client.query(query, job_config=job_config)  # Make an API request.
    print("Starting BQ load job  for ..".format(query_job.job_id, PROJECT_ID,
        DATASET_NAME, final_table))
    query_job.result()  # Wait for the job to complete.
    print("BQ load job finished.")

push_to_main_bq("main_table","table_schema.json", "script.sql")

get_field_schema()parse_bq_json_schema() 将产生以下结果:

[SchemaField('column_1', 'INTEGER', 'NULLABLE', None, (), None),......]

我认为这不会导致任何问题。因为我在其他地方使用过它,它们工作得很好。 但是,我在push_to_main_bq() 在线job_config.schema = parse_bq_json_schema(tbl_schema_json) 上遇到错误说:

文件 "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/job/base.py", 第 697 行,在 setattr 中 “ 的属性 未知。”.format(name, type(self)) AttributeError: .

【问题讨论】:

【参考方案1】:

QueryJobConfig 没有schema 属性。请改用destination。 找例子here:

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the destination table.
# table_id = "your-project.your_dataset.your_table_name"

sql = "SELECT * FROM `bigquery-public-data.samples.shakespeare`"
cluster_fields = ["corpus"]

job_config = bigquery.QueryJobConfig(
    clustering_fields=cluster_fields, destination=table_id
)

# Start the query, passing in the extra configuration.
query_job = client.query(sql, job_config=job_config)  # Make an API request.
query_job.result()  # Wait for the job to complete.

table = client.get_table(table_id)  # Make an API request.
if table.clustering_fields == cluster_fields:
    print(
        "The destination table is written using the cluster_fields configuration."
    )

【讨论】:

需要clustering_fields 吗?它有什么用? 不,不需要。这是一种优化,可加快按此字段过滤的查询。 我还有一个问题,好像QueryJobCOnfig() 在我添加了time_partitioning 后没有运行。如果您可以查看,我可以更新我的代码 最好提出一个新问题,以便其他人看到它,因为我很快就会下线。

以上是关于无法使用python根据BQ中的查询创建另一个表的主要内容,如果未能解决你的问题,请参考以下文章

BQ 命令行工具出错:无法在没有项目 ID 的情况下启动作业

将 BQ 查询结果下载到 Python 数据框时出现 504 Deadline Exceeded 错误

通过传输功能将数据从一个项目复制到 BQ 中的另一个项目是不是需要成本?

替换 bq SQL 中的撇号或引号

在 Google Big Query 中使用 bq 命令行执行查询

接收带变量的 HTTP 请求,查询 BQ 并返回响应