将嵌套的 MongoDB 导入到 Pandas

Posted

技术标签:

【中文标题】将嵌套的 MongoDB 导入到 Pandas【英文标题】:Import nested MongoDB to Pandas 【发布时间】:2019-01-12 21:02:42 【问题描述】:

我在 MongoDB 中有一个包含大量嵌套文档的集合,我想展平并导入到 Pandas。有一些嵌套的字典,还有一个我想转换为列的字典列表(有关详细信息,请参见下面的示例)。

我已经有了适用于小批量文档的功能。但是解决方案(我发现它in the answer to this question)使用 json。 json.loads 操作的问题是,它在集合中更大的选择上失败,MemoryError

我尝试了许多建议其他 json 解析器(例如 ijson)的解决方案,但由于不同的原因,它们都没有解决我的问题。如果我想通过 json 保持转换,剩下的唯一方法是将更大的选择分块成更小的文档组并迭代解析。

此时我想,-这是我的主要问题-也许有一种更聪明的方法来进行取消嵌套,而无需直接在 MongoDB 或 Pandas 中通过 json 或以某种方式结合使用?

这是一个简短的示例文档:


  '_id': ObjectId('5b40fcc4affb061b8871cbc5'),
  'eventId': 2,
  'sId' : 6833,
  'stage': 
    'value': 1,
    'Name': 'FirstStage'
  ,
  'quality': [
    
      'type': 
        'value': 2,
        'Name': 'Color'
      ,
      'value': '124'
    ,
    
      'type': 
        'value': 7,
        'Name': 'Length'
      ,
      'value': 'Short'
    ,
    
      'type': 
        'value': 15,
        'Name': 'Printed'
      
    

这就是成功的数据框表示的样子(为了便于阅读,我跳过了列“_id”和“sId”:

    eventId    stage.value    stage.name    q_color    q_length    q_printed
1   2          1              'FirstStage'  124        'Short'     1 

到目前为止我的代码(遇到内存问题 - 见上文):

def load_events(filter = 'sId', id = 6833, all = False):
  if all:
    print('Loading all events.')
    cursor = events.find()
  else:
    print('Loading events with %s equal to %s.' %(filter, id))
    print('Filtering...')
    cursor = events.find(filter : id)

  print('Loading...')
  l = list(cursor)

  print('Parsing json...')
  sanitized = json.loads(json_util.dumps(l))

  print('Parsing quality...')
  for ev in sanitized:
    for q in ev['quality']:
        name = 'q_' + str(q['type']['Name'])
        value = q.pop('value', 1)
        ev[name] = value
    ev.pop('quality',None)

  normalized = json_normalize(sanitized)

  df = pd.DataFrame(normalized)

  return df

【问题讨论】:

【参考方案1】:

您不需要使用 json 解析器转换嵌套结构。只需从记录列表中创建您的数据框:

df = DataFrame(list(cursor))

然后使用 pandas 来解压您的列表和字典:

import pandas
from itertools import chain
import numpy

df = pandas.DataFrame(t)
df['stage.value'] = df['stage'].apply(lambda cell: cell['value'])
df['stage.name'] = df['stage'].apply(lambda cell: cell['Name'])
df['q_']= df['quality'].apply(lambda cell: [(m['type']['Name'], m['value'] if 'value' in m.keys() else 1) for m in cell])
df['q_'] = df['q_'].apply(lambda cell: dict((k, v) for k, v in cell))
keys = set(chain(*df['q_'].apply(lambda column: column.keys())))
for key in keys:
    column_name = 'q_'.format(key).lower()
    df[column_name] = df['q_'].apply(lambda cell: cell[key] if key in cell.keys() else numpy.NaN) 
df.drop(['stage', 'quality', 'q_'], axis=1, inplace=True)

我使用三个步骤来解压缩嵌套数据类型。首先,名称和值用于创建对(元组)的平面列表。在第二步中,基于元组的字典从元组的第一个位置获取键,从第二个位置获取值。然后使用集合提取所有现有属性名称一次。每个属性都使用循环获取一个新列。在循环内部,每对的值都映射到相应的列单元格。

【讨论】:

谢谢,这或多或少是我想要的。仍然存在一个问题,如上所示,该问题在数据中不可见。质量下有很多可能的条目(大约 100 个)。文档总是只有 0-10 个中的一小部分。如果 q_ 不在文档中,则尊重列的尊重字段中应该有一个 NaN。所以 1) 我想知道,如果您的解决方案会引发 KeyError? 2)我可以对它们中的每一个都进行硬编码,这似乎需要大量的工作并且不是很优雅。 PS:由于这似乎是一个全新的角度,我将打开一个后续问题并将您的答案标记为已接受。谢谢! 我更新了我的答案。现在它应该更好地满足您的需求。也许使用 Pymongo 查询来过滤所有键会更快。 哦,是的,它有效,非常感谢!我还尝试了 Pymongo 查询,它比你的“链魔法”慢得多。 ;-)

以上是关于将嵌套的 MongoDB 导入到 Pandas的主要内容,如果未能解决你的问题,请参考以下文章

似乎无法更新导入数据库中的嵌套值

从 mongoDB 迁移到 clickhouse 中的嵌套数据结构

如何将数据从mongodb导入hive

如何将数据从 mongodb 导入到 pandas?

实战使用 Kettle 工具将 mysql 数据增量导入到 MongoDB 中

使用列映射将 Excel 工作表导入到 Mongodb 数据库