python:以灵活的方式处理深度嵌套数据的有效技术是啥?

Posted

技术标签:

【中文标题】python:以灵活的方式处理深度嵌套数据的有效技术是啥?【英文标题】:python: what are efficient techniques to deal with deeply nested data in a flexible manner?python:以灵活的方式处理深度嵌套数据的有效技术是什么? 【发布时间】:2011-02-02 09:31:28 【问题描述】:

我的问题不是关于特定的代码 sn-p 而是更笼统的,所以请多多包涵:

我应该如何组织我正在分析的数据,我应该使用哪些工具来管理它?

我正在使用 python 和 numpy 来分析数据。因为python文档表明字典在python中是非常优化的,而且由于数据本身非常结构化,所以我将它存储在一个深度嵌套的字典中。

这里是字典的骨架:层次结构中的位置定义了元素的性质,每个新行定义了前级中一个键的内容:

[AS091209M02] [AS091209M01] [AS090901M06] ... 
[100113] [100211] [100128] [100121] 
[R16] [R17] [R03] [R15] [R05] [R04] [R07] ... 
[1263399103] ... 
[ImageSize] [FilePath] [Trials] [Depth] [Frames] [Responses] ... 
[N01] [N04] ... 
[Sequential] [Randomized] 
[Ch1] [Ch2]

编辑:为了更好地解释我的数据集:

[individual] ex: [AS091209M02]
[imaging session (date string)] ex: [100113]
[Region imaged] ex: [R16]
[timestamp of file] ex [1263399103]  
[properties of file] ex: [Responses]
[regions of interest in image ] ex [N01]
[format of data] ex [Sequential]
[channel of acquisition: this key indexes an array of values] ex [Ch1]

我执行的操作类型是例如计算数组的属性(列在 Ch1、Ch2 下),拾取数组以创建新集合,例如分析来自给定区域 16 (R16) 的 N01 的响应个人在不同时间点等。

这种结构对我来说效果很好,而且速度非常快,正如我所承诺的那样。我可以很快分析完整的数据集(而且字典太小,无法填满我电脑的内存:半个演出)。

我的问题来自于我需要对字典的操作进行编程的繁琐方式。我经常有这样一段代码:

for mk in dic.keys():
    for rgk in dic[mk].keys():
        for nk in dic[mk][rgk].keys():
            for ik in dic[mk][rgk][nk].keys():
                for ek in dic[mk][rgk][nk][ik].keys():
                    #do something

它丑陋、笨重、不可重复使用且易碎(需要为字典的任何变体重新编码)。

我尝试使用递归函数,但除了最简单的应用程序之外,我还遇到了一些非常讨厌的错误和奇怪的行为,这导致了大量的时间浪费(我没有设法在 ipython 中使用 pdb 进行调试,这无济于事当我处理深度嵌套的递归函数时)。最后,我经常使用的唯一递归函数如下:

def dicExplorer(dic, depth = -1, stp = 0):
    '''prints the hierarchy of a dictionary.
    if depth not specified, will explore all the dictionary
    '''
    if depth - stp == 0: return
    try : list_keys = dic.keys()
    except AttributeError: return
    stp += 1
    for key in list_keys:
        else: print '+%s> [\'%s\']' %(stp * '---', key)
        dicExplorer(dic[key], depth, stp)

我知道我做错了,因为我的代码很长、简单且不可重用。我需要使用更好的技术来灵活地操作字典,或者将数据放入某种数据库格式(sqlite?)。我的问题是,由于我在编程方面(非常)自学,我缺乏实践经验和背景知识来欣赏可用的选项。我已经准备好学习新工具(SQL、面向对象的编程),不惜一切代价来完成工作,但我不愿意将我的时间和精力投入到对我的需求来说是死胡同的事情上。

那么对于解决这个问题,以及能够以更简洁、灵活和可重用的方式编写我的工具,您有什么建议?

附录:除了对数据字典的特定子字典进行操作之外,以下是我为数据集 dic 或其子字典实现的一些操作示例:

实际上我有一些运行良好的递归函数:

def normalizeSeqDic(dic, norm_dic = , legend = ()):
    '''returns a normalized dictionary from a seq_amp_dic. Normalization is performed using the first time point as reference
    '''
    try : 
        list_keys = dic.keys()
        for key in list_keys:
            next_legend = legend + (key,) 
            normalizeSeqDic(dic[key], norm_dic, next_legend)
    except AttributeError:
        # normalization
        # unpack list
        mk, ek, nk, tpk = legend
        #assign values to amplitude dict
        if mk not in norm_dic: norm_dic[mk] = 
        if ek not in norm_dic[mk]: norm_dic[mk][ek] = 
        if nk not in norm_dic[mk][ek]: norm_dic[mk][ek][nk] = 
        if tpk not in norm_dic[mk][ek][nk]: norm_dic[mk][ek][nk][tpk] = 
        new_array = []
        for x in range(dic.shape[0]):
            new_array.append(dic[x][1:]/dic[x][0])
        new_array = asarray(new_array)
        norm_dic[mk][ek][nk][tpk] = new_array
    return norm_dic

def poolDic(dic):
    '''returns a dic in which all the values are pooled, and root (mk) keys are fused
    these pooled dics can later be combined into another dic
    '''
    pooled_dic = 
    for mk in dic.keys():
        for ek in dic[mk].keys():
            for nk in dic[mk][ek].keys():
                for tpk in dic[mk][ek][nk].keys():
                    #assign values to amplitude dict
                    if ek not in pooled_dic: pooled_dic[ek] = 
                    if nk not in pooled_dic[ek]: pooled_dic[ek][nk] = 
                    if tpk not in pooled_dic[ek][nk]:
                        pooled_dic[ek][nk][tpk] = dic[mk][ek][nk][tpk]
                    else: pooled_dic[ek][nk][tpk]= vstack((pooled_dic[ek][nk][tpk], dic[mk][ek][nk][tpk]))
    return pooled_dic

def timePointsDic(dic):
    '''Determines the timepoints for each individual key at root
    '''
    tp_dic = 
    for mk in dic.keys():
        tp_list = []
        for rgk in dic[mk].keys():
            tp_list.extend(dic[mk][rgk]['Neuropil'].keys())
        tp_dic[mk]=tuple(sorted(list(set(tp_list))))
    return tp_dic

对于某些操作,我发现除了展平字典之外别无他法:

def flattenDic(dic, label):
    '''flattens a dic to produce a list of of tuples containing keys and 'label' values
    '''
    flat_list = []
    for mk in dic.keys():
        for rgk in dic[mk].keys():
            for nk in dic[mk][rgk].keys():
                for ik in dic[mk][rgk][nk].keys():
                    for ek in dic[mk][rgk][nk][ik].keys():
                        flat_list.append((mk, rgk, nk, ik, ek, dic[mk][rgk][nk][ik][ek][label])
    return flat_list

def extractDataSequencePoints(flat_list, mk, nk, tp_list):
        '''produces a list containing arrays of time point values
        time_points is a list of the time points wished (can have 2 or 3 elements)
        '''
        nb_tp = len(tp_list)
        # build tp_seq list
        tp_seq = []
        tp1, tp2, tp3 = [], [], []
        if nk == 'Neuropil':
            tp1.extend(x for x in flat_list if x[0]==mk and x[2] == 'Neuropil' and x[3] == tp_list[0])
            tp2.extend(x for x in flat_list if x[0]==mk and x[2] == 'Neuropil'and  x[3] == tp_list[1])
        else:
            tp1.extend(x for x in flat_list if x[0]==mk and x[2] != 'Neuropil'and x[3] == tp_list[0])
            tp2.extend(x for x in flat_list if x[0]==mk and x[2] != 'Neuropil'and x[3] == tp_list[1])
        if nb_tp == 3:
            if nk == 'Neuropil':
                tp3.extend(x for x in flat_list if x[0]==mk and x[2] == 'Neuropil'and x[3] == tp_list[2])
            else:
                tp3.extend(x for x in flat_list if x[0]==mk and x[2] != 'Neuropil'and x[3] == tp_list[2])
        for x in tp1:
            for y in tp2:
                if x[0:3] == y[0:3] :
                    if nb_tp == 3:
                        for z in tp3:
                            if x[0:3] == z[0:3] :
                                tp_seq.append(asarray([x[4],y[4],z[4]]))
                    else:
                        tp_seq.append(asarray([x[4],y[4]]))
        return tp_seq

【问题讨论】:

@AlexandreS:恐怕我对您的样本数据了解得不够多,无法提供很多建议。请您详细说明您正在分析的数据以及您正在执行的分析? @MattH:我编辑了问题以提供更多详细信息。如果还不够,请告诉我 @AlexandrS:感谢您的澄清。如果您能解释目前如何获取/存储/派生这些数据,可能会有所帮助。我认为前进的道路是让您制作一个抽象图,将您的结构化为具有属性的对象以及对象/属性如何相互关联。当我决定如何编写数据结构时,我经常会勾勒出这些东西。 我不认为花在学习 OOP 和 SQL 上的时间被浪费到了死胡同。自学没什么不好。编码通常归结为为工作选择正确的工具。自学的一个特别困难是熟悉足够广泛的工具集。面向对象编程和提供结构化查询语言的数据库是现代编码工具集的重要组成部分,只有了解它们,您才会成为更好的编码人员。 我实际上是在开始编写分析例程代码之前勾画了数据结构,但我的部分问题是我缺乏足够的文化来了解哪些概念最适合我的需求。当然,学习 OOP 或 SQL 永远不会浪费。但是我需要优化我的学习路径,以便在我可以留出的有限时间内获得最好的收益来提高我的技能。希望通过借鉴其他人的经验,我可以限制我必须经历的试错次数。 【参考方案1】:

“我将它存储在一个嵌套很深的字典中”

而且,正如你所见,效果并不好。

还有什么选择?

    复合键和浅字典。你有一个 8 部分的密钥: (个人、成像会话、成像区域、文件时间戳、文件属性、图像中感兴趣的区域、数据格式、采集通道) 到一个值数组。

     ('AS091209M02', '100113', 'R16', '1263399103', 'Responses', 'N01', 'Sequential', 'Ch1' ): array, 
    ...
    

    问题在于搜索。

    正确的类结构。实际上,完整的类定义可能有点过头了。

“我执行的操作类型是例如计算数组的属性 (列在 Ch1、Ch2 下),拿起数组来做一个新的集合,例如分析 N01 来自给定个体在不同时间点的区域 16 (R16) 的响应等"

推荐

首先,使用namedtuple 作为您的终极对象。

Array = namedtuple( 'Array', 'individual, session, region, timestamp, properties, roi, format, channel, data' )

或者类似的东西。构建这些命名元组对象的简单列表。然后,您可以简单地遍历它们。

其次,在这个数组对象的主列表上使用许多简单的 map-reduce 操作。

过滤:

for a in theMasterArrrayList:
    if a.region = 'R16' and interest = 'N01':
        # do something on these items only.

通过公共密钥减少:

individual_dict = defaultdict(list)
for a in theMasterArrayList:
    individual_dict[ a.individual ].append( a )

这将在地图中创建一个子集,其中包含您想要的项目。

然后您可以执行 indiidual_dict['AS091209M02'] 并拥有他们的所有数据。您可以对任何(或所有)可用键执行此操作。

region_dict = defaultdict(list)
for a in theMasterArrayList:
    region_dict[ a.region ].append( a )

这不会复制任何数据。它速度快,内存相对紧凑。

映射(或转换)数组:

for a in theMasterArrayList:
    someTransformationFunction( a.data )

如果数组本身是一个列表,您可以在不破坏整个元组的情况下更新该列表。如果您需要从现有数组创建新数组,则您正在创建一个新元组。这没有什么问题,但它是一个新的元组。你最终会得到这样的程序。

def region_filter( array_list, region_set ):
    for a in array_list:
        if a.region in region_set:
            yield a

def array_map( array_list, someConstant ):
    for a in array_list:
        yield Array( *(a[:8] + (someTranformation( a.data, someConstant ),) )

def some_result( array_list, region, someConstant ):
    for a in array_map( region_filter( array_list, region ), someConstant ):
        yield a

您可以将转换、归约、映射构建成更精细的事物。

最重要的是只从主列表中创建您需要的字典,这样您就不会再做超出最低限度需要的过滤。

顺便说一句。这可以很容易地映射到关系数据库。它会更慢,但您可以有多个并发更新操作。除了多个并发更新之外,关系数据库不提供任何上述功能。

【讨论】:

这很有帮助。我已经采用扁平化数据字典或(它的子集)+ 列表推导来过滤数据,但是您对命名元组使用的描述使语法和代码比我所拥有的更清晰。另外,它涉及到获得理想结果的最少学习,所以我想我会为此而努力,至少目前是这样。【参考方案2】:

您可以通过替换使您的循环看起来更好:

for mk in dic.keys():
    for rgk in dic[mk].keys():
        for nk in dic[mk][rgk].keys():
            for ik in dic[mk][rgk][nk].keys():
                for ek in dic[mk][rgk][nk][ik].keys():
                    #do something

for mv in dic.values():
    for rgv in mv.values():
        for nv in rgv.values():
            for iv in nv.values():
                for ev in iv.values():
                    #do something

因此,您可以使用相对简洁的代码访问所有值。如果您还需要一些密钥,您可以执行以下操作:

for (mk, mv) in dic.items():
    # etc.

根据您的需要,您还可以考虑创建并使用带有元组键的单个字典:

dic[(mk, rgk, nv, ik, ek)]

【讨论】:

迭代字典对象会返回字典的键。然后,您不能迭代键并期望访问该值。 我尝试了第一个块,但在 for rgv in mv: 我看到 rgv 遍历 mv 字符串,而不是 mv.keys()?【参考方案3】:

我将分享一些关于此的想法。而不是这个函数:

for mk in dic.keys():
    for rgk in dic[mk].keys():
        for nk in dic[mk][rgk].keys():
            for ik in dic[mk][rgk][nk].keys():
                for ek in dic[mk][rgk][nk][ik].keys():
                    #do something

你想简单地写成:

for ek in deep_loop(dic):
    do_something

有两种方法。一是功能性的,二是类似生成器的。第二个是:

def deep_loop(dic):
    for mk in dic.keys():
        for rgk in dic[mk].keys():
            for nk in dic[mk][rgk].keys():
                for ik in dic[mk][rgk][nk].keys():
                    for ek in dic[mk][rgk][nk][ik].keys():
                        yield ek

这允许您捕获遍历字典的逻辑。很容易修改这个函数来支持不同的结构遍历方式。这取决于你的结构改变的方式,如果它只是循环的深度或不同的东西。您能否发布一些更高级的示例,说明您对遍历树的要求是什么?像过滤,搜索等?深度看起来像这样(未经测试)-它将产生一对(键元组),(值):

def deep_loop(dic, depth):
    if depth == 0:
        yield (), dic
    for subkey, subval in dic.items():
        for ktuple, value in deep_loop(subval, depth-1):
            yield (subkey,)+ktuple, value

现在变得更容易了:

for (k1,k2,k3,k4), value in deep_loop(dic, 4):
    # do something

还有其他方法可以自定义它,您可以添加命名元组类型作为 deep_loop 的参数。 Deep_loop 可以自动检测命名元组的深度并返回命名元组。

【讨论】:

我发布了我设法实现的附加示例作为附录,希望对您有所帮助。您的第一个解决方案将搜索过程打包在一个函数中,但不能解决我的大部分问题,因为搜索通常是可变的,我必须定义许多函数来描述每个搜索。您的第二个选项解决了这一点,但使用了递归,这让我在经历了糟糕的体验后有点害怕(尽管我设法编写了一些有效的递归函数)。但是感谢您向我展示了您如何将 dic.items() 与生成器表达式相结合,这对我很有用。【参考方案4】:

你问:我应该如何组织我正在分析的数据,我应该使用哪些工具来管理它?

我怀疑字典虽然经过优化,但并不是该问题的正确答案。我认为您最好使用 XML,或者,如果有 Python 绑定,HDF5,甚至 NetCDF。或者,正如您自己建议的那样,一个数据库。

如果您的项目有足够的持续时间和有用性来保证学习如何使用这些技术,那么我认为您会发现现在学习它们并正确使用数据结构是比与错误数据搏斗更好的成功之路整个项目的结构。学习 XML、HDF5、SQL 或任何你选择的东西,都在建立你的一般专业知识,让你能够更好地处理下一个项目。坚持使用笨拙、特定问题和特殊的数据结构会导致下一次出现同样的问题。

【讨论】:

感谢您让我了解 HDF5 和 NetCDF。 Python 对两者都有绑定,并且许可证是可用的。我想我将从阅读 HDF5 开始,如果它看起来很有希望,我将研究如何将它与我的数据一起使用。这些类型的指针正是我写问题时所希望的。【参考方案5】:

您可以编写一个生成器函数,允许您迭代某个级别的所有元素:

def elementsAt(dic, level):
    if not hasattr(dic, 'itervalues'):
        return
    for element in dic.itervalues():
        if level == 0:
            yield element
        else:
            for subelement in elementsAt(element, level - 1):
                yield subelement

然后可以如下使用:

for element in elementsAt(dic, 4):
    # Do something with element

如果您还需要过滤元素,您可以先获取所有需要过滤的元素(例如,'rgk' 级别):

for rgk in getElementsAt(dic, 1):
    if isValid(rgk):
        for ek in getElementsAt(rgk, 2):
            # Do something with ek

至少这会使使用字典层次结构变得更容易一些。使用更具描述性的名称也会有所帮助。

【讨论】:

感谢您向我展示了 dic.itervalues() 的用法,到目前为止我还没有意识到这一点。问题是它并没有真正解决链的深度嵌套和不灵活的问题。

以上是关于python:以灵活的方式处理深度嵌套数据的有效技术是啥?的主要内容,如果未能解决你的问题,请参考以下文章

数据结构与算法之深入解析“有效括号的嵌套深度”的求解思路与算法示例

在 Python 中处理深度嵌套字典的便捷方法

leetcode-1111-有效括号的嵌套深度

从 Python 编写嵌套拼花格式

1111.有效括号的嵌套深度

有效括号的嵌套深度