如何将 Python 字典映射到大查询模式

Posted

技术标签:

【中文标题】如何将 Python 字典映射到大查询模式【英文标题】:How to map a Python Dict to a Big Query Schema 【发布时间】:2015-07-29 15:37:01 【问题描述】:

我有一个包含一些嵌套值的字典,如下所示:

my_dict = 
    "id": 1,
    "name": "test",
    "system": "x",
    "date": "2015-07-27",
    "profile": 
        "location": "My City",
        "preferences": [
            
                "code": "5",
                "description": "MyPreference",
            
        ]
    ,
    "logins": [
        "2015-07-27 07:01:03",
        "2015-07-27 08:27:41"
    ]

而且,我有一个大查询表架构,如下所示:

schema = 
    "fields": [
        'name':'id', 'type':'INTEGER', 'mode':'REQUIRED',
        'name':'name', 'type':'STRING', 'mode':'REQUIRED',
        'name':'date', 'type':'TIMESTAMP', 'mode':'REQUIRED',
        'name':'profile', 'type':'RECORD', 'fields':[
            'name':'location', 'type':'STRING', 'mode':'NULLABLE',
            'name':'preferences', 'type':'RECORD', 'mode':'REPEATED', 'fields':[
                'name':'code', 'type':'STRING', 'mode':'NULLABLE',
                'name':'description', 'type':'STRING', 'mode':'NULLABLE'
            ],
        ],
        'name':'logins', 'type':'TIMESTAMP', 'mode':'REPEATED'
    ]

我想遍历所有原始的 my_dict 并基于架构的结构构建一个新的 dict。换句话说,遍历模式并从原始 my_dict 中获取正确的值。

要像这样构建一个新的字典(请注意,架构中不存在的字段“系统”不会被复制):

new_dict = 
    "id": 1,
    "name": "test",
    "date": "2015-07-27",
    "profile": 
        "location": "My City",
        "preferences": [
            
                "code": "5",
                "description": "MyPreference",
            
        ]
    ,
    "logins": [
        "2015-07-27 07:01:03",
        "2015-07-27 08:27:41"
    ]

如果没有嵌套字段迭代简单的 dict.items() 并复制值,这可能会更容易,但是如何构建新的 dict 以递归方式访问原始 dict?

【问题讨论】:

听起来您需要访问者模式的变体...oodesign.com/visitor-pattern.html 【参考方案1】:

我已经构建了一个递归函数来执行此操作。我不确定这是否是更好的方法,但有效:

def map_dict_to_bq_schema(source_dict, schema, dest_dict):
    #iterate every field from current schema
    for field in schema['fields']:
        #only work in existant values
        if field['name'] in source_dict:
            #nested field
            if field['type'].lower()=='record' and 'fields' in field:
                #list
                if 'mode' in field and field['mode'].lower()=='repeated':
                    dest_dict[field['name']] = []
                    for item in source_dict[field['name']]:
                        new_item = 
                        map_dict_to_bq_schema( item, field, new_item )
                        dest_dict[field['name']].append(new_item)
                #record
                else:
                    dest_dict[field['name']] =  
                    map_dict_to_bq_schema( source_dict[field['name']], field, dest_dict[field['name']] )
            #list
            elif 'mode' in field and field['mode'].lower()=='repeated':
                dest_dict[field['name']] = []
                for item in source_dict[field['name']]:
                    dest_dict[field['name']].append(item)
            #plain field
            else:
                dest_dict[field['name']]=source_dict[field['name']]

                format_value_bq(source_dict[field['name']], field['type'])

new_dict = 
map_dict_to_bq_schema (my_dict, schema, new_dict)

【讨论】:

【参考方案2】:

我更新了函数,因为Schemafield 的使用发生了一些变化。

# [START] map_dict_to_bq_schema
# Function to take a dictionary and the bigquery schema
# and return a tuple to feed into bigquery
def map_dict_to_bq_schema(source_dict, schema, dest_dict=None):
    if dest_dict is None:
        dest_dict = dict()
    # Use the existing schema to iterate over all the fields.
    # Note: some fields may be nested (those are then flagged as a RECORD)
    if not isinstance(schema, list):
        # This is an individual field.
        schema = [schema]
    # List of fields...
    for field in schema:
        if field.name in source_dict:
            # Nested object
            if field.field_type == "RECORD" and len(field.fields) > 0:
                # This is a nested field.
                if field.mode == "REPEATED":
                    dest_dict[field.name] = []
                    for item in source_dict[field.name]:
                        new_item = 
                        # Recursive!
                        map_dict_to_bq_schema( item, field, new_item )
                        dest_dict[field.name].append(new_item)
                else:
                    dest_dict[field.name] = 
                    # Recursive!
                    map_dict_to_bq_schema( source_dict[field.name], field, dest_dict[field.name] )
            # Array
            elif field.mode == "REPEATED":
                if field.name in source_dict:
                    dest_dict[field.name] = []
                    for item in source_dict[field.name]:
                        dest_dict[field.name].append(item)
                else:
                    dest_dict[field.name] = [""]
            # Regular field
            else:
                dest_dict[field.name] = source_dict[field.name]
    # Done...
    return dest_dict
# [END] map_dict_to_bq_schema

【讨论】:

【参考方案3】:

考虑使用schema_from_json:

my_schema = bq_client.schema_from_json('path/to/schema/file.json')

如果您需要架构代码,则可以使用复制表示

my_schema
>>> [SchemaField('city', 'STRING', 'NULLABLE', None, (), None),
SchemaField('address', 'STRING', 'NULLABLE', None, (), None)]

并编辑它:

from google.cloud import bigquery as bq
my_edited_schema = [bq.SchemaField('city', 'STRING', 'NULLABLE', None, (), None),
bq.SchemaField('address', 'STRING', 'NULLABLE', None, (), None)]

【讨论】:

以上是关于如何将 Python 字典映射到大查询模式的主要内容,如果未能解决你的问题,请参考以下文章

如何通过创建字典从python中的以下映射访问城市名称

如何将 Graphql 查询转换为 python 字典?

如何将CSV文件转换为python字典

如何在 Python 中将字典转换为查询字符串?

如何在Python中创建一个递归函数来创建一个映射Odoo 8关系字段记录的字典?

如何在不创建架构的情况下将 CSV 文件加载到 BigQuery