如何展平多级/嵌套 JSON?

Posted

技术标签:

【中文标题】如何展平多级/嵌套 JSON?【英文标题】:How to flatten multilevel/nested JSON? 【发布时间】:2018-12-23 21:08:21 【问题描述】:

我正在尝试将 JSON 转换为 CSV 文件,以便进行进一步分析。我的结构的问题是,当我转换我的 JSON 文件时,我有很多嵌套的字典/列表。

我尝试使用 pandas json_normalize(),但它只压平了第一级。

import json
import pandas as pd
from pandas.io.json import json_normalize
from cs import CloudStack

api_key = xxxx
secret = xxxx
endpoint = xxxx

cs = CloudStack(endpoint=endpoint,
                key=api_key,
                secret=secret)

virtual_machines = cs.virtMach()

test = json_normalize(virtual_machines["virtualmachine"])

test.to_csv("test.csv", sep="|", index=False)

知道如何整理整个 JSON 文件,以便为单个(在本例中为虚拟机)条目创建 CSV 文件的单行输入吗?我尝试了这里发布的几个解决方案,但我的结果总是只有第一级被压平。

这是示例 JSON(在这种情况下,我仍然得到 JSON 格式的“securitygroup”和“nic”输出:


    "count": 13,
    "virtualmachine": [
        
            "id": "1082e2ed-ff66-40b1-a41b-26061afd4a0b",
            "name": "test-2",
            "displayname": "test-2",
            "securitygroup": [
                
                    "id": "9e649fbc-3e64-4395-9629-5e1215b34e58",
                    "name": "test",
                    "tags": []
                
            ],
            "nic": [
                
                    "id": "79568b14-b377-4d4f-b024-87dc22492b8e",
                    "networkid": "05c0e278-7ab4-4a6d-aa9c-3158620b6471"
                ,
                
                    "id": "3d7f2818-1f19-46e7-aa98-956526c5b1ad",
                    "networkid": "b4648cfd-0795-43fc-9e50-6ee9ddefc5bd"
                    "traffictype": "Guest"
                
            ],
            "hypervisor": "KVM",
            "affinitygroup": [],
            "isdynamicallyscalable": false
        
    ]

【问题讨论】:

有很好的例子here - 那里提到的 flatten json 函数应该完全符合您的要求。让我知道这是否有帮助 您好,这个链接确实很有帮助。部分解决了我的问题,尽管现在一切都变平了,而不仅仅是内部字典。但是我在那里也发现了完全相同的问题,这导致了 json_normalization() 的文档,它表明您可以指定导出深度。 link 是的json_normalize 非常有用!试一试,告诉我们进展如何。 所以基本上这行得通 - 我使用了 json_normalization(),我在上面的链接中定义了输出结构。再次感谢 gyx 的所有帮助。您也可以将其发布为答案,以便我将其标记为解决方案吗? 太好了。不,您可以发布自己的答案(因为您已回答)并将其标记为答案:) 【参考方案1】:

我使用了如下函数(详情可查看here):

def flatten_data(y):
    out = 

    def flatten(x, name=''):
        if type(x) is dict:
            for a in x:
                flatten(x[a], name + a + '_')
        elif type(x) is list:
            i = 0
            for a in x:
                flatten(a, name + str(i) + '_')
                i += 1
        else:
            out[name[:-1]] = x

    flatten(y)
    return out

不幸的是,这将整个 JSON 完全展平,这意味着如果您有多级 JSON(许多嵌套字典),它可能会将所有内容展平为包含大量列的单行。

最后我使用的是json_normalize() 并指定了我需要的结构。可以在here 找到一个很好的例子来说明如何做到这一点。

【讨论】:

【参考方案2】:

IMO accepted answer 无法正确处理 JSON 数组。

如果 JSON 对象具有数组作为值,那么它应该被展平为对象数组,例如

'a': [1, 2] -> ['a': 1, 'a': 2]

而不是向键添加索引。

嵌套对象应该通过连接键(例如用点作为分隔符)来展平

'a': 'b': 1 -> 'a.b': 1

(这在接受的情况下是正确完成的)。

根据所有这些要求,我最终得到了以下(在 CPython3.5.3 中开发和使用):

from functools import (partial,
                       singledispatch)
from itertools import chain
from typing import (Dict,
                    List,
                    TypeVar)

Serializable = TypeVar('Serializable', None, int, bool, float, str, 
                       dict, list, tuple)
Array = List[Serializable]
Object = Dict[str, Serializable]


def flatten(object_: Object,
            *,
            path_separator: str = '.') -> Array[Object]:
    """
    Flattens given JSON object into list of objects with non-nested values.

    >>> flatten('a': 1)
    ['a': 1]
    >>> flatten('a': [1, 2])
    ['a': 1, 'a': 2]
    >>> flatten('a': 'b': None)
    ['a.b': None]
    """
    keys = set(object_)
    result = [dict(object_)]
    while keys:
        key = keys.pop()
        new_result = []
        for index, record in enumerate(result):
            try:
                value = record[key]
            except KeyError:
                new_result.append(record)
            else:
                if isinstance(value, dict):
                    del record[key]
                    new_value = flatten_nested_objects(
                            value,
                            prefix=key + path_separator,
                            path_separator=path_separator)
                    keys.update(new_value.keys())
                    new_result.append(**new_value, **record)
                elif isinstance(value, list):
                    del record[key]
                    new_records = [
                        flatten_nested_objects(sub_value,
                                               prefix=key + path_separator,
                                               path_separator=path_separator)
                        for sub_value in value]
                    keys.update(chain.from_iterable(map(dict.keys,
                                                        new_records)))
                    new_result.extend(**new_record, **record
                                      for new_record in new_records)
                else:
                    new_result.append(record)
        result = new_result
    return result


@singledispatch
def flatten_nested_objects(object_: Serializable,
                           *,
                           prefix: str = '',
                           path_separator: str) -> Object:
    return prefix[:-len(path_separator)]: object_


@flatten_nested_objects.register(dict)
def _(object_: Object,
      *,
      prefix: str = '',
      path_separator: str) -> Object:
    result = dict(object_)
    for key in list(result):
        result.update(flatten_nested_objects(result.pop(key),
                                             prefix=(prefix + key
                                                     + path_separator),
                                             path_separator=path_separator))
    return result


@flatten_nested_objects.register(list)
def _(object_: Array,
      *,
      prefix: str = '',
      path_separator: str) -> Object:
    return prefix[:-len(path_separator)]: list(map(partial(
            flatten_nested_objects,
            path_separator=path_separator),
            object_))

【讨论】:

如果我在一个嵌套对象上调用 flatten,我不会再期待任何子元素了。我希望单个***可迭代,而不是 可能 包含子结构的可迭代。我想说你的实现返回一个扁平项目的列表,但不是返回的值是扁平的(除非你提供一个已经扁平的列表,例如:[1,2,3,4,5])。【参考方案3】:

从 https://***.com/a/62186053/4355695 交叉发布(但随后进一步调整):在这个 repo:https://github.com/ScriptSmith/socialreaper/blob/master/socialreaper/tools.py#L8 中,我找到了 list-inclusion comment by @roneo 到 answer posted by @Imran 的实现。

我添加了检查以捕获空列表和空字典。并且还添加了打印行,这将帮助人们准确理解此功能的工作原理。你可以通过设置crumbs=False来关闭那些打印语句

from collections import MutableMapping
crumbs = True
def flatten(dictionary, parent_key=False, separator='.'):
    """
    Turn a nested dictionary into a flattened dictionary
    :param dictionary: The dictionary to flatten
    :param parent_key: The string to prepend to dictionary's keys
    :param separator: The string used to separate flattened keys
    :return: A flattened dictionary
    """

    items = []
    for key, value in dictionary.items():
        if crumbs: print('checking:',key)
        new_key = str(parent_key) + separator + key if parent_key else key
        if isinstance(value, MutableMapping):
            if crumbs: print(new_key,': dict found')
            if not value.items():
                if crumbs: print('Adding key-value pair:',new_key,None)
                items.append((new_key,None))
            else:
                items.extend(flatten(value, new_key, separator).items())
        elif isinstance(value, list):
            if crumbs: print(new_key,': list found')
            if len(value):
                for k, v in enumerate(value):
                    items.extend(flatten(str(k): v, new_key).items())
            else:
                if crumbs: print('Adding key-value pair:',new_key,None)
                items.append((new_key,None))
        else:
            if crumbs: print('Adding key-value pair:',new_key,value)
            items.append((new_key, value))
    return dict(items)

测试一下:

ans = flatten('a': 1, 'c': 'a': 2, 'b': 'x': 5, 'y' : 10, 'd': [1, 2, 3], 'e':'f':[], 'g': )
print('\nflattened:',ans)

输出:

checking: a
Adding key-value pair: a 1
checking: c
c : dict found
checking: a
Adding key-value pair: c.a 2
checking: b
c.b : dict found
checking: x
Adding key-value pair: c.b.x 5
checking: y
Adding key-value pair: c.b.y 10
checking: d
d : list found
checking: 0
Adding key-value pair: d.0 1
checking: 1
Adding key-value pair: d.1 2
checking: 2
Adding key-value pair: d.2 3
checking: e
e : dict found
checking: f
e.f : list found
Adding key-value pair: e.f None
checking: g
e.g : dict found
Adding key-value pair: e.g None

flattened: 'a': 1, 'c.a': 2, 'c.b.x': 5, 'c.b.y': 10, 'd.0': 1, 'd.1': 2, 'd.2': 3, 'e.f': None, 'e.g': None

这完成了我需要完成的工作:我将任何复杂的 json 扔到这里,它对我来说是扁平的。我也在原始代码中添加了一个检查来处理空列表

感谢https://github.com/ScriptSmith,我在其仓库中找到了最初的 flatten 函数。

测试 OP 的示例 json,输出如下:

'count': 13,
 'virtualmachine.0.id': '1082e2ed-ff66-40b1-a41b-26061afd4a0b',
 'virtualmachine.0.name': 'test-2',
 'virtualmachine.0.displayname': 'test-2',
 'virtualmachine.0.securitygroup.0.id': '9e649fbc-3e64-4395-9629-5e1215b34e58',
 'virtualmachine.0.securitygroup.0.name': 'test',
 'virtualmachine.0.securitygroup.0.tags': None,
 'virtualmachine.0.nic.0.id': '79568b14-b377-4d4f-b024-87dc22492b8e',
 'virtualmachine.0.nic.0.networkid': '05c0e278-7ab4-4a6d-aa9c-3158620b6471',
 'virtualmachine.0.nic.1.id': '3d7f2818-1f19-46e7-aa98-956526c5b1ad',
 'virtualmachine.0.nic.1.networkid': 'b4648cfd-0795-43fc-9e50-6ee9ddefc5bd',
 'virtualmachine.0.nic.1.traffictype': 'Guest',
 'virtualmachine.0.hypervisor': 'KVM',
 'virtualmachine.0.affinitygroup': None,
 'virtualmachine.0.isdynamicallyscalable': False

所以您会看到“tags”和“affinitygroup”键也被处理并添加到输出中。原始代码省略了它们。

2021-05-30 :更新:collections.MutableMapping 更改为 collections.abc.MutableMapping

【讨论】:

【参考方案4】:

如果其他人发现自己在这里并正在寻找更适合后续程序化处理的解决方案:

扁平化列表需要处理列表长度的标题等。我想要一个解决方案,如果有 2 个列表,例如2 个元素,然后将生成四行,产生每个有效的潜在数据行(请参阅下面的实际示例):

class MapFlattener:

    def __init__(self):
        self.headings = []
        self.rows = []

    def add_rows(self, headings, rows):
        self.headings = [*self.headings, *headings]
        if self.rows:
            new_rows = []
            for base_row in self.rows:
                for row in rows:
                    new_rows.append([*base_row, *row])
            self.rows = new_rows
        else:
            self.rows = rows

    def __call__(self, mapping):
        for heading, value in mapping.items():
            if isinstance(value, Mapping):
                sub_headings, sub_rows = MapFlattener()(value)
                sub_headings = [f'heading:sub_heading' for sub_heading in sub_headings]
                self.add_rows(sub_headings, sub_rows)
                continue

            if isinstance(value, list):
                self.add_rows([heading], [[e] for e in value])
                continue

            self.add_rows([heading], [[value]])

        return self.headings, self.rows


def map_flatten(mapping):
    return MapFlattener()(mapping)

这会创建更符合关系数据的输出:

In [22]: map_flatten('l': [1,2])                                                                                                          
Out[22]: (['l'], [[1], [2]])

In [23]: map_flatten('l': [1,2], 'n': 7)                                                                                                  
Out[23]: (['l', 'n'], [[1, 7], [2, 7]])

In [24]: map_flatten('l': [1,2], 'n': 7, 'o': 'a': 1, 'b': 2)                                                                           
Out[24]: (['l', 'n', 'o:a', 'o:b'], [[1, 7, 1, 2], [2, 7, 1, 2]])

如果您在电子表格等中使用 csv 并需要处理展平数据,这将特别有用。

【讨论】:

【参考方案5】:

我尝试使用 BFS 方法,仅当 val 是 dict 类型时,我才将 (parent,val) 存储在队列中。

def flattern_json(d):
    if len(d) == 0:
        return 
    from collections import deque
    q = deque()
    res = dict()
    for key, val in d.items(): # This loop push the top most keys and values into queue.
        if not isinstance(val, dict):  # If it's not dict
            if isinstance(val, list):  # If it's list then check list values if it contains dict object.
                temp = list()  # Creating temp list for storing the values that we will need which are not dict.
                for v in val:
                    if not isinstance(v, dict):
                        temp.append(v)
                    else:
                        q.append((key, v))  # if it's value is dict type then we push along with parent which is key.
                if len(temp) > 0:
                    res[key] = temp
            else:
                res[key] = val
        else:
            q.append((key, val))
    while q:
        k, v = q.popleft()  # Taking parent and the value out of queue
        for key, val in v.items():
            new_parent = k + "_" + key  # New parent will be old parent_currentval
            if isinstance(val, list):
                temp = list()
                for v in val:
                    if not isinstance(v, dict):
                        temp.append(v)
                    else:
                        q.append((new_parent, v))
                if len(temp) >= 0:
                    res[new_parent] = temp
            elif not isinstance(val, dict):
                res[new_parent] = val
            else:
                q.append((new_parent, val))
    return res

它使用给定的 JSON,我附加 _ 用于展平 JSON,而不是使用 0 1 列表索引。

from pprint import pprint

print(pprint.pprint(flattern_json(d)))

它给出了以下输出:

'count': 13,
 'virtualmachine_affinitygroup': [],
 'virtualmachine_displayname': 'test-2',
 'virtualmachine_hypervisor': 'KVM',
 'virtualmachine_id': '1082e2ed-ff66-40b1-a41b-26061afd4a0b',
 'virtualmachine_isdynamicallyscalable': False,
 'virtualmachine_name': 'test-2',
 'virtualmachine_nic': [],
 'virtualmachine_nic_id': '3d7f2818-1f19-46e7-aa98-956526c5b1ad',
 'virtualmachine_nic_networkid': 'b4648cfd-0795-43fc-9e50-6ee9ddefc5bd',
 'virtualmachine_nic_traffictype': 'Guest',
 'virtualmachine_securitygroup': [],
 'virtualmachine_securitygroup_id': '9e649fbc-3e64-4395-9629-5e1215b34e58',
 'virtualmachine_securitygroup_name': 'test',
 'virtualmachine_securitygroup_tags': []

【讨论】:

【参考方案6】:

我使用这个简单的函数将数据标准化和展平为 json。 它接受列表、字典、元组并将其展平为 json。

def normalize_data_to_json(raw_data: [list, dict, tuple], parent=""):
    from datetime import datetime
    from decimal import Decimal

    result = 
    # key name normalise to snake case (single underscore)
    parent = parent.lower().replace(" ", "_") if isinstance(parent, str) else parent
    if isinstance(parent, str) and parent.startswith("__"):
        # if parent has no parent remove double underscore and treat as int if digit else as str
        # treating as int is better if passed data is a list so you output is index based dict
        parent = int(parent.lstrip("_")) if parent.lstrip("_").isdigit() else parent.lstrip("_")

    # handle str, int, float, and decimal.
    # you can easily add more data types as er your data
    if type(raw_data) in [str, int, float, Decimal]:
        result[parent] = float(raw_data) if isinstance(raw_data, Decimal) else raw_data

    # normalise datetime object
    elif isinstance(raw_data, datetime):
        result[parent] = raw_data.strftime("%Y-%m-%d %H:%M:%S")

    # normalise dict and all nested dicts.
    # all nests are joined with double underscore to identify parent key name with it's children
    elif isinstance(raw_data, dict):
        for k, v in raw_data.items():
            k = f'parent__k' if parent else k
            result.update(normalize_data_to_json(v, parent=k))

    # normalise list and tuple
    elif type(raw_data) in [list, tuple]:
        for i, sub_item in enumerate(raw_data, start=1):
            result.update(normalize_data_to_json(sub_item, f"parent__i"))

    # any data which did not matched above data types, normalise them using it's __str__
    else:
        result[parent] = str(raw_data)

    return result

【讨论】:

【参考方案7】:

在这里传递你的字典:

def getKeyValuePair(dic,master_dic = ,master_key = None):
    keys = list(dic.keys())
    for key in keys:
        if type(dic[key]) == dict:
                getKeyValuePair(dic[key],master_dic = master_dic,master_key = key)
        else:
            if master_key == None:
                master_dic[key] = dic[key]
            else:
                master_dic[str(master_key)+'_'+str(key)] = dic[key]

   return master_dic

【讨论】:

请更正你的缩进,也请解释你的代码在通过一些字典后不起作用。【参考方案8】:

以jsonpath格式输出:

def convert(f):
    out = 
    def flatten(x, name=None):
        if type(x) is dict:
            for a in x:
                val = '.'.join((name, a)) if name else a
                flatten(x[a], val)
        elif type(x) is list:
            for (i, a) in enumerate(x):
                flatten(a, name + f'[str(i)]')
        else:
            out[name] = x if x else ""
    flatten(f)
    return out

【讨论】:

以上是关于如何展平多级/嵌套 JSON?的主要内容,如果未能解决你的问题,请参考以下文章

如何在python中将展平表转换为嵌套(分层)json?

如何使用 Java 手动展平 Elasticsearch 嵌套的 JSON 文档?

如何在 Azure 流分析中展平嵌套的 json 数据

如何使用 flatten_json 递归地展平嵌套的 JSON

如何使用 pyspark 在 aws 胶水中展平嵌套 json 中的数组?

生成嵌套 JSON(反向横向展平)