如何将数据从 mongodb 导入到 pandas?
Posted
技术标签:
【中文标题】如何将数据从 mongodb 导入到 pandas?【英文标题】:How to import data from mongodb to pandas? 【发布时间】:2013-04-21 09:54:21 【问题描述】:我在 mongodb 的集合中有大量数据需要分析。如何将这些数据导入 pandas?
我是 pandas 和 numpy 的新手。
编辑: mongodb 集合包含带有日期和时间标记的传感器值。传感器值是浮点数据类型。
样本数据:
"_cls" : "SensorReport",
"_id" : ObjectId("515a963b78f6a035d9fa531b"),
"_types" : [
"SensorReport"
],
"Readings" : [
"a" : 0.958069536790466,
"_types" : [
"Reading"
],
"ReadingUpdatedDate" : ISODate("2013-04-02T08:26:35.297Z"),
"b" : 6.296118156595,
"_cls" : "Reading"
,
"a" : 0.95574014778624,
"_types" : [
"Reading"
],
"ReadingUpdatedDate" : ISODate("2013-04-02T08:27:09.963Z"),
"b" : 6.29651468650064,
"_cls" : "Reading"
,
"a" : 0.953648289182713,
"_types" : [
"Reading"
],
"ReadingUpdatedDate" : ISODate("2013-04-02T08:27:37.545Z"),
"b" : 7.29679823731148,
"_cls" : "Reading"
,
"a" : 0.955931884300997,
"_types" : [
"Reading"
],
"ReadingUpdatedDate" : ISODate("2013-04-02T08:28:21.369Z"),
"b" : 6.29642922525632,
"_cls" : "Reading"
,
"a" : 0.95821381,
"_types" : [
"Reading"
],
"ReadingUpdatedDate" : ISODate("2013-04-02T08:41:20.801Z"),
"b" : 7.28956613,
"_cls" : "Reading"
,
"a" : 4.95821335,
"_types" : [
"Reading"
],
"ReadingUpdatedDate" : ISODate("2013-04-02T08:41:36.931Z"),
"b" : 6.28956574,
"_cls" : "Reading"
,
"a" : 9.95821341,
"_types" : [
"Reading"
],
"ReadingUpdatedDate" : ISODate("2013-04-02T08:42:09.971Z"),
"b" : 0.28956488,
"_cls" : "Reading"
,
"a" : 1.95667927,
"_types" : [
"Reading"
],
"ReadingUpdatedDate" : ISODate("2013-04-02T08:43:55.463Z"),
"b" : 0.29115237,
"_cls" : "Reading"
],
"latestReportTime" : ISODate("2013-04-02T08:43:55.463Z"),
"sensorName" : "56847890-0",
"reportCount" : 8
【问题讨论】:
使用 a custom field type 和 MongoEngine 可以像mongo_doc.data_frame = my_pandas_df
一样简单地存储和检索 Pandas DataFrames
【参考方案1】:
你可以使用the "pandas.json_normalize" method:
import pandas as pd
display(pd.json_normalize( x ))
display(pd.json_normalize( x , record_path="Readings" ))
它应该显示两个表格,其中 x 是您的光标或:
from bson import ObjectId
def ISODate(st):
return st
x =
"_cls" : "SensorReport",
"_id" : ObjectId("515a963b78f6a035d9fa531b"),
"_types" : [
"SensorReport"
],
"Readings" : [
"a" : 0.958069536790466,
"_types" : [
"Reading"
],
"ReadingUpdatedDate" : ISODate("2013-04-02T08:26:35.297Z"),
"b" : 6.296118156595,
"_cls" : "Reading"
,
"a" : 0.95574014778624,
"_types" : [
"Reading"
],
"ReadingUpdatedDate" : ISODate("2013-04-02T08:27:09.963Z"),
"b" : 6.29651468650064,
"_cls" : "Reading"
,
"a" : 0.953648289182713,
"_types" : [
"Reading"
],
"ReadingUpdatedDate" : ISODate("2013-04-02T08:27:37.545Z"),
"b" : 7.29679823731148,
"_cls" : "Reading"
,
"a" : 0.955931884300997,
"_types" : [
"Reading"
],
"ReadingUpdatedDate" : ISODate("2013-04-02T08:28:21.369Z"),
"b" : 6.29642922525632,
"_cls" : "Reading"
,
"a" : 0.95821381,
"_types" : [
"Reading"
],
"ReadingUpdatedDate" : ISODate("2013-04-02T08:41:20.801Z"),
"b" : 7.28956613,
"_cls" : "Reading"
,
"a" : 4.95821335,
"_types" : [
"Reading"
],
"ReadingUpdatedDate" : ISODate("2013-04-02T08:41:36.931Z"),
"b" : 6.28956574,
"_cls" : "Reading"
,
"a" : 9.95821341,
"_types" : [
"Reading"
],
"ReadingUpdatedDate" : ISODate("2013-04-02T08:42:09.971Z"),
"b" : 0.28956488,
"_cls" : "Reading"
,
"a" : 1.95667927,
"_types" : [
"Reading"
],
"ReadingUpdatedDate" : ISODate("2013-04-02T08:43:55.463Z"),
"b" : 0.29115237,
"_cls" : "Reading"
],
"latestReportTime" : ISODate("2013-04-02T08:43:55.463Z"),
"sensorName" : "56847890-0",
"reportCount" : 8
【讨论】:
【参考方案2】:您也可以使用pymongoarrow -- 这是 MongoDB 提供的官方库,用于将 mongodb 数据导出到 pandas、numPy、parquet 文件等。
【讨论】:
由于类型支持非常有限,这个库几乎没用,它甚至不支持 str。【参考方案3】:我发现另一个非常有用的选项是:
from pandas.io.json import json_normalize
cursor = my_collection.find()
df = json_normalize(cursor)
(或json_normalize(list(cursor))
,取决于您的 python/pandas 版本)。
这样您就可以免费展开嵌套的 mongodb 文档。
【讨论】:
这个方法有错误TypeError: data argument can't be an iterator
奇怪,这适用于我的 python 3.6.7
使用 pandas 0.24.2
。也许你可以试试df = json_normalize(list(cursor))
?
对于 +1。 docs,max_level 参数定义了字典深度的最大级别。我刚刚做了一个测试,但它不是真的,所以有些列需要用 .str 访问器拆分。不过,使用 mongodb 的功能非常好。【参考方案4】:
你可以用pdmongo在三行中实现你想要的:
import pdmongo as pdm
import pandas as pd
df = pdm.read_mongo("MyCollection", [], "mongodb://localhost:27017/mydb")
如果您的数据非常大,您可以先通过过滤不需要的数据进行聚合查询,然后将它们映射到您想要的列。
这是一个将Readings.a
映射到a
列并按reportCount
列过滤的示例:
import pdmongo as pdm
import pandas as pd
df = pdm.read_mongo("MyCollection", ['$match': 'reportCount': '$gt': 6, '$unwind': '$Readings', '$project': 'a': '$Readings.a'], "mongodb://localhost:27017/mydb")
read_mongo
接受与pymongo aggregate 相同的参数
【讨论】:
【参考方案5】:http://docs.mongodb.org/manual/reference/mongoexport
导出到 csv 并使用 read_csv
或 JSON 并使用 DataFrame.from_records()
【讨论】:
它是DataFrame.from_records()
。【参考方案6】:
类似于 Rafael Valero、waitingkuo 和 Deu Leung 使用 pagination:
def read_mongo(
# db,
collection, query=None,
# host='localhost', port=27017, username=None, password=None,
chunksize = 100, page_num=1, no_id=True):
# Connect to MongoDB
db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)
# Calculate number of documents to skip
skips = chunksize * (page_num - 1)
# Sorry, this is in spanish
# https://www.toptal.com/python/c%C3%B3digo-buggy-python-los-10-errores-m%C3%A1s-comunes-que-cometen-los-desarrolladores-python/es
if not query:
query =
# Make a query to the specific DB and Collection
cursor = db[collection].find(query).skip(skips).limit(chunksize)
# Expand the cursor and construct the DataFrame
df = pd.DataFrame(list(cursor))
# Delete the _id
if no_id:
del df['_id']
return df
【讨论】:
【参考方案7】:在waitingkuo 给出这个很好的答案之后,我想添加使用符合.read_sql() 和.read_csv() 的chunksize 的可能性。我通过避免逐个遍历“迭代器”/“光标”的每个“记录”来扩大Deu Leung 的答案。 我会借用之前的read_mongo函数。
def read_mongo(db,
collection, query=,
host='localhost', port=27017,
username=None, password=None,
chunksize = 100, no_id=True):
""" Read from Mongo and Store into DataFrame """
# Connect to MongoDB
#db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)
client = MongoClient(host=host, port=port)
# Make a query to the specific DB and Collection
db_aux = client[db]
# Some variables to create the chunks
skips_variable = range(0, db_aux[collection].find(query).count(), int(chunksize))
if len(skips_variable)<=1:
skips_variable = [0,len(skips_variable)]
# Iteration to create the dataframe in chunks.
for i in range(1,len(skips_variable)):
# Expand the cursor and construct the DataFrame
#df_aux =pd.DataFrame(list(cursor_aux[skips_variable[i-1]:skips_variable[i]]))
df_aux =pd.DataFrame(list(db_aux[collection].find(query)[skips_variable[i-1]:skips_variable[i]]))
if no_id:
del df_aux['_id']
# Concatenate the chunks into a unique df
if 'df' not in locals():
df = df_aux
else:
df = pd.concat([df, df_aux], ignore_index=True)
return df
【讨论】:
【参考方案8】:根据 PEP,简单胜于复杂:
import pandas as pd
df = pd.DataFrame.from_records(db.<database_name>.<collection_name>.find())
您可以像使用常规 mongoDB 数据库一样包含条件,甚至可以使用 find_one() 从数据库中仅获取一个元素,等等。
瞧!
【讨论】:
pd.DataFrame.from_records 似乎和 DataFrame(list()) 一样慢,但结果却很不一致。 %%time 显示从 800 毫秒到 1.9 秒的任何时间 这不适合大记录,因为这不会显示内存错误,instread 会因数据太大而挂起系统。而 pd.DataFrame(list(cursor)) 显示内存错误。【参考方案9】:import pandas as pd
from odo import odo
data = odo('mongodb://localhost/db::collection', pd.DataFrame)
【讨论】:
【参考方案10】:为了有效地处理核外(不适合 RAM)数据(即并行执行),您可以尝试 Python Blaze ecosystem: Blaze / Dask / Odo。
Blaze(和Odo)具有处理 MongoDB 的开箱即用功能。
一些有用的文章开始:
Introducing Blaze Expessions(带有 MongoDB 查询示例) ReproduceIt: Reddit word count Difference between Dask Arrays and Blaze还有一篇文章展示了 Blaze 堆栈可以实现哪些惊人的事情:Analyzing 1.7 Billion Reddit Comments with Blaze and Impala(本质上是在几秒钟内查询 975 Gb 的 Reddit cmets)。
附:我不隶属于任何这些技术。
【讨论】:
我还使用 Jupyter Notebook 编写了post,其中举例说明了 Dask 如何通过在单台机器上使用多个内核来帮助加快执行速度,即使数据适合内存。【参考方案11】:使用
pandas.DataFrame(list(...))
如果迭代器/生成器结果很大会消耗大量内存
最好生成小块并在最后连接
def iterator2dataframes(iterator, chunk_size: int):
"""Turn an iterator into multiple small pandas.DataFrame
This is a balance between memory and efficiency
"""
records = []
frames = []
for i, record in enumerate(iterator):
records.append(record)
if i % chunk_size == chunk_size - 1:
frames.append(pd.DataFrame(records))
records = []
if records:
frames.append(pd.DataFrame(records))
return pd.concat(frames)
【讨论】:
【参考方案12】:您可以使用此代码将您的 mongodb 数据加载到 pandas DataFrame。这个对我有用。希望你也一样。
import pymongo
import pandas as pd
from pymongo import MongoClient
client = MongoClient()
db = client.database_name
collection = db.collection_name
data = pd.DataFrame(list(collection.find()))
【讨论】:
【参考方案13】:Monary
正是这样做的,而且它超级快。 (another link)
请参阅this cool post,其中包括快速教程和一些时间安排。
【讨论】:
Monary 是否支持字符串数据类型? 我试过 Monary,但它需要很多时间。我错过了一些优化吗?尝试client = Monary(host, 27017, database="db_tmp") columns = ["col1", "col2"] data_type = ["int64", "int64"] arrays = client.query("db_tmp", "coll", , columns, data_type)
对于50000
记录大约需要200s
。
听起来很慢...坦率地说,我不知道这个项目的状态,现在,4年后...【参考方案14】:
pymongo
可能会帮助您,以下是我正在使用的一些代码:
import pandas as pd
from pymongo import MongoClient
def _connect_mongo(host, port, username, password, db):
""" A util for making a connection to mongo """
if username and password:
mongo_uri = 'mongodb://%s:%s@%s:%s/%s' % (username, password, host, port, db)
conn = MongoClient(mongo_uri)
else:
conn = MongoClient(host, port)
return conn[db]
def read_mongo(db, collection, query=, host='localhost', port=27017, username=None, password=None, no_id=True):
""" Read from Mongo and Store into DataFrame """
# Connect to MongoDB
db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)
# Make a query to the specific DB and Collection
cursor = db[collection].find(query)
# Expand the cursor and construct the DataFrame
df = pd.DataFrame(list(cursor))
# Delete the _id
if no_id:
del df['_id']
return df
【讨论】:
谢谢,这是我最终使用的方法。我还在每一行中都有一组嵌入式文档。所以我必须在每一行中迭代它。有没有更好的方法来做到这一点?? 可以提供一些你的mongodb结构的样本吗? 请注意df = pd.DataFrame(list(cursor))
内部的list()
评估为列表或生成器,以保持CPU 冷却。如果你有无数个数据项,并且接下来的几行将合理划分、详细程度并剪裁它们,那么整个 shmegegge 仍然可以安全放入。很好。
很慢@df = pd.DataFrame(list(cursor))
。纯数据库查询要快得多。我们可以将list
转换为其他内容吗?
@Peter 那句话也引起了我的注意。将设计为可迭代并可能包装大量数据的数据库游标转换为内存列表对我来说似乎并不聪明。以上是关于如何将数据从 mongodb 导入到 pandas?的主要内容,如果未能解决你的问题,请参考以下文章
如何使用mongo-connector将数据从mongodb导入到apache solr