如何使用 Apache Beam (Python) 将多个嵌套的 JSON 写入 BigQuery 表
Posted
技术标签:
【中文标题】如何使用 Apache Beam (Python) 将多个嵌套的 JSON 写入 BigQuery 表【英文标题】:How to write multiple nested JSON to BigQuery table using Apache Beam (Python) 【发布时间】:2021-01-07 06:48:11 【问题描述】:我正在使用 Python 从 Dataflow 将复杂的 JSON 对象集合写入 BigQuery 表。像下面这样手动创建表模式太复杂了,因为我的 json 对象嵌套了多个层。
from apache_beam.io.gcp.internal.clients import bigquery
table_schema=bigquery.TableSchema()
id_schema = bigquery.TableFieldSchema()
id_schema.name = 'ID'
id_schema.type = 'integer'
id_schema.mode = 'nullable'
table_schema.fields.append(id_schema)
...
于是我尝试了Writing nested schema to BigQuery from Dataflow (Python)推荐的方法。 首先我在云控制台中运行以下命令来获取架构
bq --format=json show project:dataset.table > output_schema.json
然后我运行以下代码获取表架构,
table_schema = parse_table_schema_from_json(json.dumps(json.load(open("output_schema.json"))["schema"]))
这完全符合预期。该表最初是从 Jupyter 笔记本创建的,我可以在其中使用带有自动检测功能的 bigquery.LoadJobConfig 来写入 BigQuery,而无需提供架构。
现在我使用 Apache Beam 管道尝试使用此架构写入 BigQuery,但不知何故出现了一些错误,例如:
WARNING:apache_beam.io.gcp.bigquery:There were errors inserting to BigQuery. Will retry. Errors were [<InsertErrorsValueListEntry
errors: [<ErrorProto
debugInfo: ''
location: 'sectiontokens.documents'
message: 'Array specified for non-repeated field.'
reason: 'invalid'>]
index: 0>, <InsertErrorsValueListEntry
errors: [<ErrorProto
debugInfo: ''
location: 'sectiontokens.errors'
message: 'Array specified for non-repeated field.'
reason: 'invalid'>]
index: 1>, <InsertErrorsValueListEntry
errors: [<ErrorProto
debugInfo: ''
location: 'sectiontokens.documents'
message: 'Array specified for non-repeated field.'
reason: 'invalid'>]
index: 2>]
我的表架构是:
table_schema =
"fields": [
"name": "ID", "type": "INTEGER", "mode": "NULLABLE",
"name": "SourceResourceID","type": "STRING","mode": "NULLABLE",
"name": "DocumentText","type": "STRING","mode": "NULLABLE",
"name": "DocumentName","type": "STRING","mode": "NULLABLE",
"name": "EncounterNumber","type": "FLOAT","mode": "NULLABLE",
"name": "EncounterResourceID","type": "STRING","mode": "NULLABLE",
"name": "DocumentId","type": "STRING","mode": "NULLABLE",
"name": "DocumentDate","type": "TIMESTAMP","mode": "NULLABLE",
"name": "SectionTitle","type": "STRING","mode": "NULLABLE",
"name": "SectionHeader","type": "STRING","mode": "NULLABLE",
"name": "SectionText","type": "STRING","mode": "NULLABLE",
"name": "SectionTokens","type": "RECORD","mode": "NULLABLE",
"fields": [
"name": "documents","type": "RECORD","mode": "NULLABLE",
"fields": [
"name": "list","type": "RECORD","mode": "REPEATED",
"fields": [
"name": "item","type": "RECORD","mode": "NULLABLE",
"fields": [
"name": "entities","type": "RECORD","mode": "NULLABLE",
"fields": [
"name": "list","type": "RECORD","mode": "REPEATED",
"fields": [
"name": "item","type": "RECORD","mode": "NULLABLE",
"fields": [
"name": "category","type": "STRING","mode": "NULLABLE",
"name": "confidenceScore","type": "FLOAT","mode": "NULLABLE",
"name": "id","type": "STRING","mode": "NULLABLE",
"name": "isNegated","type": "BOOLEAN","mode": "NULLABLE",
"name": "length","type": "INTEGER","mode": "NULLABLE",
"name": "links","type": "RECORD","mode": "NULLABLE",
"fields": [
"name": "list","type": "RECORD","mode": "REPEATED",
"fields": [
"name": "item","type": "RECORD","mode": "NULLABLE",
"fields": [
"name": "dataSource","type": "STRING","mode":"NULLABLE",
"name": "id","type": "STRING","mode": "NULLABLE"
]
]
]
,
"name": "offset","type": "INTEGER","mode": "NULLABLE",
"name": "text","type": "STRING","mode": "NULLABLE"
]
]
]
,
"name": "id","type": "STRING","mode": "NULLABLE",
"name": "relations","type": "RECORD","mode": "NULLABLE",
"fields": [
"name": "list","type": "RECORD","mode": "REPEATED",
"fields": [
"name": "item","type": "RECORD","mode": "NULLABLE",
"fields": [
"name": "bidirectional","type": "BOOLEAN","mode": "NULLABLE",
"name": "relationType","type": "STRING","mode": "NULLABLE",
"name": "source","type": "STRING","mode": "NULLABLE",
"name": "target","type": "STRING","mode": "NULLABLE"
]
]
]
]
]
]
,
"name": "errors","type": "RECORD","mode": "NULLABLE",
"fields": [
"name": "list","type": "RECORD","mode": "REPEATED",
"fields": [
"name": "item","type": "RECORD","mode": "NULLABLE",
"fields": [
"name": "error","type": "RECORD","mode": "NULLABLE",
"fields": [
"name": "code","type": "STRING","mode": "NULLABLE",
"name": "innererror","type": "RECORD","mode": "NULLABLE",
"fields": [
"name": "code","type": "STRING","mode": "NULLABLE",
"name": "message","type": "STRING","mode": "NULLABLE"
]
,
"name": "message","type": "STRING","mode": "NULLABLE"
]
,
"name": "id","type": "STRING","mode": "NULLABLE"
]
]
]
,
"name": "modelVersion","type": "STRING","mode": "NULLABLE"
]
]
这里是一些示例数据:
'ID': 123, 'SourceResourceID': 'Resource/3c81b4d2-3ee9-11eb-8bf6-0242ac100303', 'DocumentText': 'EXAM: CT CHEST IC \n\n\nPROCEDURE DATE: 12/11/2020 \n', 'DocumentName': 'CT CHEST IC', 'EncounterNumber': None, 'EncounterResourceID': 'Encounter/123', 'DocumentId': '123', 'DocumentDate': '2020-12-15 10:21:00 UTC', 'SectionTitle': 'physical_exam', 'SectionHeader': 'EXAM:', 'SectionText': 'EXAM: CT CHEST IC \n\n\nPROCEDURE DATE: 12/11/2020 \n \n\n\n', 'SectionTokens': 'documents': ['id': '1', 'entities': ['id': '0', 'offset': 7, 'length': 11, 'text': 'CT CHEST IC', 'category': 'ExaminationName', 'confidenceScore': 0.98, 'isNegated': False]], 'errors': [], 'modelVersion': '2020-09-03'
谁能帮忙找出我做错了什么?谢谢。
【问题讨论】:
【参考方案1】:在您的架构中,sectiontokens.documents
和 sectiontokens.errors
被指定为类型 RECORD,这意味着 BigQuery 预计该字段只有一条记录,但在您的数据中,这些键实际上是对象列表。
如果你想定义一个列来接受一个对象列表,你需要有"mode":"REPEATED"
https://cloud.google.com/bigquery/docs/nested-repeated
【讨论】:
以上是关于如何使用 Apache Beam (Python) 将多个嵌套的 JSON 写入 BigQuery 表的主要内容,如果未能解决你的问题,请参考以下文章
如何从 PCollection Apache Beam Python 创建 N 个元素组
我们如何使用 python sdk 在 Apache Beam 中读取带有附件的 CSV 文件?
使用 Python / Apache Beam 进行 Google Cloud Storage 并发控制?