如何将 Python 字典映射到大查询模式
Posted
技术标签:
【中文标题】如何将 Python 字典映射到大查询模式【英文标题】:How to map a Python Dict to a Big Query Schema 【发布时间】:2015-07-29 15:37:01 【问题描述】:我有一个包含一些嵌套值的字典,如下所示:
my_dict =
"id": 1,
"name": "test",
"system": "x",
"date": "2015-07-27",
"profile":
"location": "My City",
"preferences": [
"code": "5",
"description": "MyPreference",
]
,
"logins": [
"2015-07-27 07:01:03",
"2015-07-27 08:27:41"
]
而且,我有一个大查询表架构,如下所示:
schema =
"fields": [
'name':'id', 'type':'INTEGER', 'mode':'REQUIRED',
'name':'name', 'type':'STRING', 'mode':'REQUIRED',
'name':'date', 'type':'TIMESTAMP', 'mode':'REQUIRED',
'name':'profile', 'type':'RECORD', 'fields':[
'name':'location', 'type':'STRING', 'mode':'NULLABLE',
'name':'preferences', 'type':'RECORD', 'mode':'REPEATED', 'fields':[
'name':'code', 'type':'STRING', 'mode':'NULLABLE',
'name':'description', 'type':'STRING', 'mode':'NULLABLE'
],
],
'name':'logins', 'type':'TIMESTAMP', 'mode':'REPEATED'
]
我想遍历所有原始的 my_dict 并基于架构的结构构建一个新的 dict。换句话说,遍历模式并从原始 my_dict 中获取正确的值。
要像这样构建一个新的字典(请注意,架构中不存在的字段“系统”不会被复制):
new_dict =
"id": 1,
"name": "test",
"date": "2015-07-27",
"profile":
"location": "My City",
"preferences": [
"code": "5",
"description": "MyPreference",
]
,
"logins": [
"2015-07-27 07:01:03",
"2015-07-27 08:27:41"
]
如果没有嵌套字段迭代简单的 dict.items() 并复制值,这可能会更容易,但是如何构建新的 dict 以递归方式访问原始 dict?
【问题讨论】:
听起来您需要访问者模式的变体...oodesign.com/visitor-pattern.html 【参考方案1】:我已经构建了一个递归函数来执行此操作。我不确定这是否是更好的方法,但有效:
def map_dict_to_bq_schema(source_dict, schema, dest_dict):
#iterate every field from current schema
for field in schema['fields']:
#only work in existant values
if field['name'] in source_dict:
#nested field
if field['type'].lower()=='record' and 'fields' in field:
#list
if 'mode' in field and field['mode'].lower()=='repeated':
dest_dict[field['name']] = []
for item in source_dict[field['name']]:
new_item =
map_dict_to_bq_schema( item, field, new_item )
dest_dict[field['name']].append(new_item)
#record
else:
dest_dict[field['name']] =
map_dict_to_bq_schema( source_dict[field['name']], field, dest_dict[field['name']] )
#list
elif 'mode' in field and field['mode'].lower()=='repeated':
dest_dict[field['name']] = []
for item in source_dict[field['name']]:
dest_dict[field['name']].append(item)
#plain field
else:
dest_dict[field['name']]=source_dict[field['name']]
format_value_bq(source_dict[field['name']], field['type'])
new_dict =
map_dict_to_bq_schema (my_dict, schema, new_dict)
【讨论】:
【参考方案2】:我更新了函数,因为Schemafield
的使用发生了一些变化。
# [START] map_dict_to_bq_schema
# Function to take a dictionary and the bigquery schema
# and return a tuple to feed into bigquery
def map_dict_to_bq_schema(source_dict, schema, dest_dict=None):
if dest_dict is None:
dest_dict = dict()
# Use the existing schema to iterate over all the fields.
# Note: some fields may be nested (those are then flagged as a RECORD)
if not isinstance(schema, list):
# This is an individual field.
schema = [schema]
# List of fields...
for field in schema:
if field.name in source_dict:
# Nested object
if field.field_type == "RECORD" and len(field.fields) > 0:
# This is a nested field.
if field.mode == "REPEATED":
dest_dict[field.name] = []
for item in source_dict[field.name]:
new_item =
# Recursive!
map_dict_to_bq_schema( item, field, new_item )
dest_dict[field.name].append(new_item)
else:
dest_dict[field.name] =
# Recursive!
map_dict_to_bq_schema( source_dict[field.name], field, dest_dict[field.name] )
# Array
elif field.mode == "REPEATED":
if field.name in source_dict:
dest_dict[field.name] = []
for item in source_dict[field.name]:
dest_dict[field.name].append(item)
else:
dest_dict[field.name] = [""]
# Regular field
else:
dest_dict[field.name] = source_dict[field.name]
# Done...
return dest_dict
# [END] map_dict_to_bq_schema
【讨论】:
【参考方案3】:考虑使用schema_from_json:
my_schema = bq_client.schema_from_json('path/to/schema/file.json')
如果您需要架构代码,则可以使用复制表示
my_schema
>>> [SchemaField('city', 'STRING', 'NULLABLE', None, (), None),
SchemaField('address', 'STRING', 'NULLABLE', None, (), None)]
并编辑它:
from google.cloud import bigquery as bq
my_edited_schema = [bq.SchemaField('city', 'STRING', 'NULLABLE', None, (), None),
bq.SchemaField('address', 'STRING', 'NULLABLE', None, (), None)]
【讨论】:
以上是关于如何将 Python 字典映射到大查询模式的主要内容,如果未能解决你的问题,请参考以下文章