使用 PyMongo 将 Pandas 数据框插入 mongodb

Posted

技术标签:

【中文标题】使用 PyMongo 将 Pandas 数据框插入 mongodb【英文标题】:Insert a Pandas Dataframe into mongodb using PyMongo 【发布时间】:2013-12-08 15:38:17 【问题描述】:

使用 PyMongo 将 pandas DataFrame 插入 mongodb 的最快方法是什么?

尝试

db.myCollection.insert(df.to_dict())

报错

InvalidDocument: documents must have only string keys, the key was Timestamp('2013-11-23 13:31:00', tz=None)


 db.myCollection.insert(df.to_json())

报错

TypeError: 'str' object does not support item assignment


 db.myCollection.insert(id: df.to_json())

报错

InvalidDocument: documents must have only string a keys, key was <built-in function id>


df

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 150 entries, 2013-11-23 13:31:26 to 2013-11-23 13:24:07
Data columns (total 3 columns):
amount    150  non-null values
price     150  non-null values
tid       150  non-null values
dtypes: float64(2), int64(1)

【问题讨论】:

之后你想做什么?你想要每条记录一个文档还是每个数据帧一个文档? 每个 mongo 记录都将包含 dateamountprice 和 tid 字段。 tid 应该是唯一字段 您可以通过records = json.loads(df.to_json(orient='records'))将数据帧转换为dict-list,结果将是:['c1': 1, 'c2': 1,'c1': 2, 'c2': 2,'c1': 3, 'c2': 3],然后只需使用db.coll.insert_many(records)。顺便说一句,使用df.to_dict('recoreds') 可能会反击Type error 【参考方案1】:

这个怎么样:

db.myCollection.insert(id: df.to_json())

id 将是该 df 的唯一字符串

【讨论】:

谢谢,我收到了错误InvalidDocument: documents must have only string keys, key was &lt;built-in function id&gt; 你必须自己生成那个id 这个id和mongo文档中常用的_.id一样吗?如果是这样,它看起来像一个随机哈希,我如何生成它? @Nyxynyx 失败,因为 id 是 Python 中的内置函数,不建议覆盖。您可以使用 id(df) 生成一个简单的 test-id,但由于对象 ID 在会话中不是持久的,这可能会根据您使用它的方式给您带来问题。虽然适用于测试。 我收到maximum recursion level reached 错误。用sys.setrecursionlimit(1000000) 修复了它【参考方案2】:

我怀疑是否存在 quickestsimple 两种方法。如果你不担心数据转换,你可以这样做

>>> import json
>>> df = pd.DataFrame.from_dict('A': 1: datetime.datetime.now())
>>> df
                           A
1 2013-11-23 21:14:34.118531

>>> records = json.loads(df.T.to_json()).values()
>>> db.myCollection.insert(records)

但如果你尝试load data back,你会得到:

>>> df = read_mongo(db, 'myCollection')
>>> df
                     A
0  1385241274118531000
>>> df.dtypes
A    int64
dtype: object

因此您必须将“A”列转换回datetimes,以及DataFrame 中所有非intfloatstr 字段。对于这个例子:

>>> df['A'] = pd.to_datetime(df['A'])
>>> df
                           A
0 2013-11-23 21:14:34.118531

【讨论】:

db.myCollection.insert(records) 应替换为 db.myCollection.insert_many(records) 参见警告//anaconda/bin/ipython:1: DeprecationWarning: insert is deprecated. Use insert_one or insert_many instead. #!/bin/bash //anaconda/bin/python.app【参考方案3】:

这里有最快捷的方法。使用 pymongo 3 中的 insert_many 方法和 to_dict 方法的“记录”参数。

db.collection.insert_many(df.to_dict('records'))

【讨论】:

这是 imo 最好的主意,尽管我认为该语法不适用于原始用例。基本问题是 mongo 需要字符串键,而您的 df 有一个 Timestamp 索引。您需要使用传递给 to_dict() 的参数来使 mongo 中的键不是日期。我遇到的一个常见用例是,您实际上希望 df 中的每一行都成为带有附加“日期”字段的记录。 您应该更正代码 sn-p 以包含该集合。 这不会保留任何数据类型吗?示例 'numfield': NumberLong("16797951")【参考方案4】:

odo 可以使用

odo(df, db.myCollection)

【讨论】:

我真的很喜欢odo,但是当 mongo uri 有非 alpha 用户名 passwd 时,它会非常失败。除了使用未经身份验证的 mongo 之外,我不会推荐它。 我认为 odo 的开发最近已经停止或推迟,截至 2019 年。【参考方案5】:

如果您的数据框缺少数据(即 None、nan)并且您不希望文档中有 null 键值:

db.insert_many(df.to_dict("records")) 将插入具有空值的键。如果您不希望文档中出现空键值,您可以使用修改版的 pandas .to_dict("records") 代码如下:

from pandas.core.common import _maybe_box_datetimelike
my_list = [dict((k, _maybe_box_datetimelike(v)) for k, v in zip(df.columns, row) if v != None and v == v) for row in df.values]
db.insert_many(my_list)

if v != None and v == v 我添加了检查以确保在将其放入行字典之前该值不是Nonenan。现在您的 .insert_many 将只包含文档中具有值的键(并且没有 null 数据类型)。

【讨论】:

这是一个好方法,因为在上传dataframe到mongodb的时候确实需要处理空值,而且这个方法比DataFrame.to_dict()快,顺便说一句,columns = list(df.columns),然后[k: _maybe_box_datetimelike(v) for k, v in zip(columns, row) if v != None and v == v for row in df.values]更快.【参考方案6】:

我认为这个问题有很酷的想法。就我而言,我一直在花时间更多地处理大型数据帧的移动。在这种情况下,pandas 倾向于允许您选择 chunksize(例如 pandas.DataFrame.to_sql 中的示例)。所以我想我可以通过添加我在这个方向上使用的功能来做出贡献。

def write_df_to_mongoDB(  my_df,\
                          database_name = 'mydatabasename' ,\
                          collection_name = 'mycollectionname',
                          server = 'localhost',\
                          mongodb_port = 27017,\
                          chunk_size = 100):
    #"""
    #This function take a list and create a collection in MongoDB (you should
    #provide the database name, collection, port to connect to the remoete database,
    #server of the remote database, local port to tunnel to the other machine)
    #
    #---------------------------------------------------------------------------
    #Parameters / Input
    #    my_list: the list to send to MongoDB
    #    database_name:  database name
    #
    #    collection_name: collection name (to create)
    #    server: the server of where the MongoDB database is hosted
    #        Example: server = 'XXX.XXX.XX.XX'
    #    this_machine_port: local machine port.
    #        For example: this_machine_port = '27017'
    #    remote_port: the port where the database is operating
    #        For example: remote_port = '27017'
    #    chunk_size: The number of items of the list that will be send at the
    #        some time to the database. Default is 100.
    #
    #Output
    #    When finished will print "Done"
    #----------------------------------------------------------------------------
    #FUTURE modifications.
    #1. Write to SQL
    #2. Write to csv
    #----------------------------------------------------------------------------
    #30/11/2017: Rafael Valero-Fernandez. Documentation
    #"""



    #To connect
    # import os
    # import pandas as pd
    # import pymongo
    # from pymongo import MongoClient

    client = MongoClient('localhost',int(mongodb_port))
    db = client[database_name]
    collection = db[collection_name]
    # To write
    collection.delete_many()  # Destroy the collection
    #aux_df=aux_df.drop_duplicates(subset=None, keep='last') # To avoid repetitions
    my_list = my_df.to_dict('records')
    l =  len(my_list)
    ran = range(l)
    steps=ran[chunk_size::chunk_size]
    steps.extend([l])

    # Inser chunks of the dataframe
    i = 0
    for j in steps:
        print j
        collection.insert_many(my_list[i:j]) # fill de collection
        i = j

    print('Done')
    return

【讨论】:

这真的很有用,谢谢。您可能希望使用当前输入更新 Args(输入)部分。 AttributeError: 'range' 对象没有属性 'extend'【参考方案7】:

只需制作字符串键!

import json
dfData = json.dumps(df.to_dict('records'))
savaData = '_id': 'a8e42ed79f9dae1cefe8781760231ec0', 'df': dfData
res = client.insert_one(savaData)

##### load dfData
data = client.find_one('_id': 'a8e42ed79f9dae1cefe8781760231ec0').get('df')
dfData = json.loads(data)
df = pd.DataFrame.from_dict(dfData)

【讨论】:

【参考方案8】:

如果您想一次发送多个:

db.myCollection.insert_many(df.apply(lambda x: x.to_dict(), axis=1).to_list())

【讨论】:

【参考方案9】:

如果您想确保不会引发 InvalidDocument 错误,那么类似以下内容是个好主意。这是因为 mongo 无法识别 np.int64、np.float64 等类型。

from pymongo import MongoClient
client = MongoClient()
db = client.test 
col = db.col


def createDocsFromDF(df, collection = None, insertToDB=False):
    docs = [] 
    fields = [col for col in df.columns]
    for i in range(len(df)):
        doc = col:df[col][i] for col in df.columns if col != 'index'
        for key, val in doc.items():
            # we have to do this, because mongo does not recognize these np. types
            if type(val) == np.int64:
                doc[key] = int(val)
            if type(val) == np.float64:
                doc[key] = float(val)
            if type(val) == np.bool_:
                doc[key] = bool(val)
        docs.append(doc) 
    if insertToDB and collection:
        db.collection.insert_many(docs)
    return docs 

【讨论】:

【参考方案10】:

我使用以下部分将数据框插入到数据库中的集合中。

df.reset_index(inplace=True)
data_dict = df.to_dict("records")
myCollection.insert_many(data_dict)

【讨论】:

以上是关于使用 PyMongo 将 Pandas 数据框插入 mongodb的主要内容,如果未能解决你的问题,请参考以下文章

将 Pandas 数据框插入 Cassandra 表

如何使用 pandas.read_csv 将 CSV 文件中的数据插入数据框?

根据 pandas 数据框中的条件将 value_counts 与 groupby 函数一起使用并插入新列

如何通过 executemany() 语句转换 pandas 数据框以进行插入?

在 pandas 数据框中插入 sklearn CountVectorizer 的结果

使用 SQLAlchemy 和 Pandas 插入数据 - Python