如何使用 Apache Beam (Python) 将多个嵌套的 JSON 写入 BigQuery 表

Posted

技术标签:

【中文标题】如何使用 Apache Beam (Python) 将多个嵌套的 JSON 写入 BigQuery 表【英文标题】:How to write multiple nested JSON to BigQuery table using Apache Beam (Python) 【发布时间】:2021-01-07 06:48:11 【问题描述】:

我正在使用 Python 从 Dataflow 将复杂的 JSON 对象集合写入 BigQuery 表。像下面这样手动创建表模式太复杂了,因为我的 json 对象嵌套了多个层。

from apache_beam.io.gcp.internal.clients import bigquery

table_schema=bigquery.TableSchema()

id_schema = bigquery.TableFieldSchema()
id_schema.name = 'ID'
id_schema.type = 'integer'
id_schema.mode = 'nullable'
table_schema.fields.append(id_schema)
...

于是我尝试了Writing nested schema to BigQuery from Dataflow (Python)推荐的方法。 首先我在云控制台中运行以下命令来获取架构

bq --format=json show project:dataset.table > output_schema.json

然后我运行以下代码获取表架构,

table_schema = parse_table_schema_from_json(json.dumps(json.load(open("output_schema.json"))["schema"]))

这完全符合预期。该表最初是从 Jupyter 笔记本创建的,我可以在其中使用带有自动检测功能的 bigquery.LoadJobConfig 来写入 BigQuery,而无需提供架构。

现在我使用 Apache Beam 管道尝试使用此架构写入 BigQuery,但不知何故出现了一些错误,例如:

WARNING:apache_beam.io.gcp.bigquery:There were errors inserting to BigQuery. Will retry. Errors were [<InsertErrorsValueListEntry
 errors: [<ErrorProto
 debugInfo: ''
 location: 'sectiontokens.documents'
 message: 'Array specified for non-repeated field.'
 reason: 'invalid'>]
 index: 0>, <InsertErrorsValueListEntry
 errors: [<ErrorProto
 debugInfo: ''
 location: 'sectiontokens.errors'
 message: 'Array specified for non-repeated field.'
 reason: 'invalid'>]
 index: 1>, <InsertErrorsValueListEntry
 errors: [<ErrorProto
 debugInfo: ''
 location: 'sectiontokens.documents'
 message: 'Array specified for non-repeated field.'
 reason: 'invalid'>]
 index: 2>]

我的表架构是:

table_schema = 
  "fields": [
    "name": "ID", "type": "INTEGER", "mode": "NULLABLE",
    "name": "SourceResourceID","type": "STRING","mode": "NULLABLE",
    "name": "DocumentText","type": "STRING","mode": "NULLABLE",
    "name": "DocumentName","type": "STRING","mode": "NULLABLE",
    "name": "EncounterNumber","type": "FLOAT","mode": "NULLABLE",
    "name": "EncounterResourceID","type": "STRING","mode": "NULLABLE",
    "name": "DocumentId","type": "STRING","mode": "NULLABLE",
    "name": "DocumentDate","type": "TIMESTAMP","mode": "NULLABLE",
    "name": "SectionTitle","type": "STRING","mode": "NULLABLE",
    "name": "SectionHeader","type": "STRING","mode": "NULLABLE",
    "name": "SectionText","type": "STRING","mode": "NULLABLE",
    "name": "SectionTokens","type": "RECORD","mode": "NULLABLE",
      "fields": [
        "name": "documents","type": "RECORD","mode": "NULLABLE",
          "fields": [
            "name": "list","type": "RECORD","mode": "REPEATED",
              "fields": [
                "name": "item","type": "RECORD","mode": "NULLABLE",
                  "fields": [
                    "name": "entities","type": "RECORD","mode": "NULLABLE",
                      "fields": [
                        "name": "list","type": "RECORD","mode": "REPEATED",
                          "fields": [
                            "name": "item","type": "RECORD","mode": "NULLABLE",
                              "fields": [
                                "name": "category","type": "STRING","mode": "NULLABLE",
                                "name": "confidenceScore","type": "FLOAT","mode": "NULLABLE",
                                "name": "id","type": "STRING","mode": "NULLABLE",
                                "name": "isNegated","type": "BOOLEAN","mode": "NULLABLE",
                                "name": "length","type": "INTEGER","mode": "NULLABLE",
                                "name": "links","type": "RECORD","mode": "NULLABLE",
                                  "fields": [
                                    "name": "list","type": "RECORD","mode": "REPEATED",
                                      "fields": [
                                        "name": "item","type": "RECORD","mode": "NULLABLE",
                                          "fields": [
                                            "name": "dataSource","type": "STRING","mode":"NULLABLE",
                                            "name": "id","type": "STRING","mode": "NULLABLE"
                                          ]
                                        
                                      ]
                                    
                                  ]
                                ,
                                "name": "offset","type": "INTEGER","mode": "NULLABLE",
                                "name": "text","type": "STRING","mode": "NULLABLE"
                              ]
                            
                          ]
                        
                      ]
                    ,
                    "name": "id","type": "STRING","mode": "NULLABLE",
                    "name": "relations","type": "RECORD","mode": "NULLABLE",
                      "fields": [
                        "name": "list","type": "RECORD","mode": "REPEATED",
                          "fields": [
                            "name": "item","type": "RECORD","mode": "NULLABLE",
                              "fields": [
                                "name": "bidirectional","type": "BOOLEAN","mode": "NULLABLE",
                                "name": "relationType","type": "STRING","mode": "NULLABLE",
                                "name": "source","type": "STRING","mode": "NULLABLE",
                                "name": "target","type": "STRING","mode": "NULLABLE"
                              ]
                            
                          ]
                        
                      ]
                    
                  ]
                
              ]
            
          ]
        ,
        "name": "errors","type": "RECORD","mode": "NULLABLE",
          "fields": [
            "name": "list","type": "RECORD","mode": "REPEATED",
              "fields": [
                "name": "item","type": "RECORD","mode": "NULLABLE",
                  "fields": [
                    "name": "error","type": "RECORD","mode": "NULLABLE",
                      "fields": [
                        "name": "code","type": "STRING","mode": "NULLABLE",
                        "name": "innererror","type": "RECORD","mode": "NULLABLE",
                          "fields": [
                            "name": "code","type": "STRING","mode": "NULLABLE",
                            "name": "message","type": "STRING","mode": "NULLABLE"
                          ]
                        ,
                        "name": "message","type": "STRING","mode": "NULLABLE"
                      ]
                    ,
                    "name": "id","type": "STRING","mode": "NULLABLE"
                  ]
                
              ]
            
          ]
        ,
        "name": "modelVersion","type": "STRING","mode": "NULLABLE"
      ]
    
  ]

这里是一些示例数据:

'ID': 123, 'SourceResourceID': 'Resource/3c81b4d2-3ee9-11eb-8bf6-0242ac100303', 'DocumentText': 'EXAM:  CT CHEST IC  \n\n\nPROCEDURE DATE:  12/11/2020  \n', 'DocumentName': 'CT CHEST IC', 'EncounterNumber': None, 'EncounterResourceID': 'Encounter/123', 'DocumentId': '123', 'DocumentDate': '2020-12-15 10:21:00 UTC', 'SectionTitle': 'physical_exam', 'SectionHeader': 'EXAM:', 'SectionText': 'EXAM:  CT CHEST IC  \n\n\nPROCEDURE DATE:  12/11/2020  \n \n\n\n', 'SectionTokens': 'documents': ['id': '1', 'entities': ['id': '0', 'offset': 7, 'length': 11, 'text': 'CT CHEST IC', 'category': 'ExaminationName', 'confidenceScore': 0.98, 'isNegated': False]], 'errors': [], 'modelVersion': '2020-09-03'

谁能帮忙找出我做错了什么?谢谢。

【问题讨论】:

【参考方案1】:

在您的架构中,sectiontokens.documentssectiontokens.errors 被指定为类型 RECORD,这意味着 BigQuery 预计该字段只有一条记录,但在您的数据中,这些键实际上是对象列表。

如果你想定义一个列来接受一个对象列表,你需要有"mode":"REPEATED" https://cloud.google.com/bigquery/docs/nested-repeated

【讨论】:

以上是关于如何使用 Apache Beam (Python) 将多个嵌套的 JSON 写入 BigQuery 表的主要内容,如果未能解决你的问题,请参考以下文章

如何从 PCollection Apache Beam Python 创建 N 个元素组

如何运行 Apache Beam 集成测试?

我们如何使用 python sdk 在 Apache Beam 中读取带有附件的 CSV 文件?

使用 Python / Apache Beam 进行 Google Cloud Storage 并发控制?

如何在 python apache Beam 的窗口中订购元素?

使用Apache-beam在Python中删除字典中的第一项[重复]