Json 架构文件不会在 BigQuery Python API 中执行

Posted

技术标签:

【中文标题】Json 架构文件不会在 BigQuery Python API 中执行【英文标题】:Json schema file will not execute in BigQuery Python API 【发布时间】:2021-05-09 14:05:22 【问题描述】:

我遇到了 Bigquery Python API 的问题。这是我执行脚本时的堆栈跟踪:

Traceback (most recent call last):
  File "createTable.py", line 17, in <module>
    open_schema()
  File "createTable.py", line 12, in open_schema
    table = bigquery.Table(table_id, schema=schema)
    ...
    "Schema items must either be fields or compatible "
ValueError: Schema items must either be fields or compatible mapping representations.

脚本很简单,打开一个schema文件并创建表:

   from google.cloud import bigquery
   # Construct a BigQuery client object.
   client = bigquery.Client()
   table_id = "project-py-290522:bq_dts.bq-test"
       
   def open_schema():
       with open("hcl-schema.json","r", encoding = "utf-8") as fName:
           schema = fName.readlines()
       
           table = bigquery.Table(table_id, schema=schema)
           print(repr(table))
           client.create_table(table)  # Make an API request.
       
   if __name__ == "__main__":
       open_schema()
       
   print("Created table ..".format(table.project, table.dataset_id, table.table_id)) 

当我在控制台和 CLI 中执行架构时,表会按原样完美创建。控制台和 CLI 如何执行创建表但在 API 中阻塞。我已经搜索和搜索并没有找到答案。有人可以帮忙吗?

这是存储在 hcl-schema.json 文件中的架构。为简洁起见,我缩短了属性列表,但在其他方面保持不变:

    [  
      
        "name":"user_id",
        "type":"STRING",
        "mode":"NULLABLE"
      ,
      
        "name":"msg_version",
        "type":"STRING",
        "mode":"REQUIRED"           
      ,
      
        "name":"APIStreamData",
        "type":"RECORD",
        "mode":"REQUIRED",
        "fields":
        [
          
            "name":"msg_version",
            "type":"STRING",
            "mode":"REQUIRED"
          ,
          
            "name":"streams",
            "type":"RECORD",
            "mode":"REPEATED",
            "fields":
            [
                
                  "name":"length",
                  "type":"STRING",
                  "mode":"REQUIRED"
                ,                      
                
                  "name":"cached",
                  "type":"STRING",
                  "mode":"NULLABLE"
                ,  
              
                "name":"track",
                "type":"RECORD",
                "mode":"REQUIRED",
                "fields":
                [
                  
                    "name":"msg_version",
                    "type":"STRING",
                    "mode":"REQUIRED"
                  ,
                  
                    "name":"track_id",
                    "type":"STRING",
                    "mode":"REQUIRED"
                  
                ]
                  
    
            ]
          
        ]
      
    ]

谢谢

茫然和困惑

【问题讨论】:

可能是文件编码的问题...尝试在table = bigquery.Table(table_id, schema=schema)之前打印出schema变量 【参考方案1】:

您可以使用 json 文件的 dict 表示,而不是原始问题中的字符串:

with open("schema.json") as json_file:
    schema_dict = json.load(json_file)
    table = bigquery.Table(table_id, schema=schema_dict)
    table = client.create_table(table)

【讨论】:

【参考方案2】:

我所指的以下文档显示了要在 Python 中指定的架构,如下所示 https://cloud.google.com/bigquery/docs/tables#creating_an_empty_table_with_a_schema_definition

schema = [bigquery.SchemaField("full_name", "STRING", mode="REQUIRED"),bigquery.SchemaField("age", "INTEGER", mode="REQUIRED")]

我尝试使用以下 JSON 和代码,效果很好。我认为您提供的 json 在 RECORD 中有 RECORD 所以我们需要相应地处理

from google.cloud import bigquery
import json
# Construct a BigQuery client object.
client = bigquery.Client()
table_id = "my-project.mock_dataset.bq-test"

def open_schema():
            
    bigquerySchema = []
    bigqueryfieldSchema = []
    with open('test.json') as f:
        bigqueryColumns = json.load(f)
        print(bigqueryColumns)
        for col in bigqueryColumns:
            if col['type'] != 'RECORD':
                print(col['name'])
                bigquerySchema.append(bigquery.SchemaField(col['name'], col['type'],mode=col['mode']))
            else:
                for colfield in col['fields']:
                    bigqueryfieldSchema.append(bigquery.SchemaField(colfield['name'], colfield['type'],colfield['mode']))
                print(bigqueryfieldSchema)
                print(col['fields'])
                bigquerySchema.append(bigquery.SchemaField(col['name'], col['type'],col['mode'],'',bigqueryfieldSchema))
        print(bigquerySchema)

    table = bigquery.Table(table_id, schema=bigquerySchema)
    print(repr(table))
    client.create_table(table)  # Make an API request.
    print("Created table ..".format(table.project, table.dataset_id, table.table_id)) 

if __name__ == "__main__":
    open_schema()
 [  
      
        "name":"user_id",
        "type":"STRING",
        "mode":"NULLABLE"
      ,
      
        "name":"msg_version",
        "type":"STRING",
        "mode":"REQUIRED"           
      ,
      
        "name":"APIStreamData",
        "type":"RECORD",
        "mode":"REQUIRED",
        "fields":
        [
          
            "name":"msg_version",
            "type":"STRING",
            "mode":"REQUIRED"
          ,
          
             "name":"track_id",
             "type":"STRING",
             "mode":"REQUIRED"
          
       ]
      
 ]

【讨论】:

非常感谢!这是一个巨大的帮助。现在我需要将其转换为对 json 的递归潜入。我不知道我会遇到多少嵌套记录 很高兴它对您有用。我认为根据您的嵌套级别,我们必须相应地更改脚本。 好吧,从表面上看,它似乎在工作。然而,经过几次失败后,我发现我还必须考虑代码页。但是... 我认为它可以帮助您解决问题,而不是将 json 作为模式传递,我们需要在使用 Python API 时将其作为 SchemaField 对象传递?如果是,请考虑接受并投票。 ***.com/help/someone-answers 我刚遇到这个问题,似乎 json 文件的简单 dict 表示对我有用。在这里查看我的其他答案。

以上是关于Json 架构文件不会在 BigQuery Python API 中执行的主要内容,如果未能解决你的问题,请参考以下文章

如何在将JSON文件加载到BigQuery表中时管理/处理架构更改

BigQuery 中的部分 JSON 架构

JSON 表架构到 bigquery.TableSchema 用于 BigQuerySink

使用 bigquery 中的 bigquery select 语句的 JSON 格式的现有表的架构

在 Bigquery 中查询外部表并且新数据到达时没有架构自动检测

BigQuery 架构的定义/文档?