使用 PyMongo 将 Pandas 数据框插入 mongodb
Posted
技术标签:
【中文标题】使用 PyMongo 将 Pandas 数据框插入 mongodb【英文标题】:Insert a Pandas Dataframe into mongodb using PyMongo 【发布时间】:2013-12-08 15:38:17 【问题描述】:使用 PyMongo
将 pandas DataFrame 插入 mongodb 的最快方法是什么?
尝试
db.myCollection.insert(df.to_dict())
报错
InvalidDocument: documents must have only string keys, the key was Timestamp('2013-11-23 13:31:00', tz=None)
db.myCollection.insert(df.to_json())
报错
TypeError: 'str' object does not support item assignment
db.myCollection.insert(id: df.to_json())
报错
InvalidDocument: documents must have only string a keys, key was <built-in function id>
df
<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 150 entries, 2013-11-23 13:31:26 to 2013-11-23 13:24:07
Data columns (total 3 columns):
amount 150 non-null values
price 150 non-null values
tid 150 non-null values
dtypes: float64(2), int64(1)
【问题讨论】:
之后你想做什么?你想要每条记录一个文档还是每个数据帧一个文档? 每个 mongo 记录都将包含date
、amount
、price
和 tid 字段。 tid
应该是唯一字段
您可以通过records = json.loads(df.to_json(orient='records'))
将数据帧转换为dict-list,结果将是:['c1': 1, 'c2': 1,'c1': 2, 'c2': 2,'c1': 3, 'c2': 3]
,然后只需使用db.coll.insert_many(records)
。顺便说一句,使用df.to_dict('recoreds')
可能会反击Type error
【参考方案1】:
这个怎么样:
db.myCollection.insert(id: df.to_json())
id 将是该 df 的唯一字符串
【讨论】:
谢谢,我收到了错误InvalidDocument: documents must have only string keys, key was <built-in function id>
你必须自己生成那个id
这个id和mongo文档中常用的_.id
一样吗?如果是这样,它看起来像一个随机哈希,我如何生成它?
@Nyxynyx 失败,因为 id 是 Python 中的内置函数,不建议覆盖。您可以使用 id(df) 生成一个简单的 test-id,但由于对象 ID 在会话中不是持久的,这可能会根据您使用它的方式给您带来问题。虽然适用于测试。
我收到maximum recursion level reached
错误。用sys.setrecursionlimit(1000000)
修复了它【参考方案2】:
我怀疑是否存在 quickest 和 simple 两种方法。如果你不担心数据转换,你可以这样做
>>> import json
>>> df = pd.DataFrame.from_dict('A': 1: datetime.datetime.now())
>>> df
A
1 2013-11-23 21:14:34.118531
>>> records = json.loads(df.T.to_json()).values()
>>> db.myCollection.insert(records)
但如果你尝试load data back,你会得到:
>>> df = read_mongo(db, 'myCollection')
>>> df
A
0 1385241274118531000
>>> df.dtypes
A int64
dtype: object
因此您必须将“A”列转换回datetime
s,以及DataFrame
中所有非int
、float
或str
字段。对于这个例子:
>>> df['A'] = pd.to_datetime(df['A'])
>>> df
A
0 2013-11-23 21:14:34.118531
【讨论】:
db.myCollection.insert(records)
应替换为 db.myCollection.insert_many(records)
参见警告//anaconda/bin/ipython:1: DeprecationWarning: insert is deprecated. Use insert_one or insert_many instead. #!/bin/bash //anaconda/bin/python.app
【参考方案3】:
这里有最快捷的方法。使用 pymongo 3 中的 insert_many
方法和 to_dict
方法的“记录”参数。
db.collection.insert_many(df.to_dict('records'))
【讨论】:
这是 imo 最好的主意,尽管我认为该语法不适用于原始用例。基本问题是 mongo 需要字符串键,而您的 df 有一个 Timestamp 索引。您需要使用传递给to_dict()
的参数来使 mongo 中的键不是日期。我遇到的一个常见用例是,您实际上希望 df 中的每一行都成为带有附加“日期”字段的记录。
您应该更正代码 sn-p 以包含该集合。
这不会保留任何数据类型吗?示例 'numfield': NumberLong("16797951")【参考方案4】:
odo 可以使用
odo(df, db.myCollection)
【讨论】:
我真的很喜欢odo
,但是当 mongo uri 有非 alpha 用户名 passwd 时,它会非常失败。除了使用未经身份验证的 mongo 之外,我不会推荐它。
我认为 odo 的开发最近已经停止或推迟,截至 2019 年。【参考方案5】:
如果您的数据框缺少数据(即 None、nan)并且您不希望文档中有 null 键值:
db.insert_many(df.to_dict("records"))
将插入具有空值的键。如果您不希望文档中出现空键值,您可以使用修改版的 pandas .to_dict("records")
代码如下:
from pandas.core.common import _maybe_box_datetimelike
my_list = [dict((k, _maybe_box_datetimelike(v)) for k, v in zip(df.columns, row) if v != None and v == v) for row in df.values]
db.insert_many(my_list)
if v != None and v == v
我添加了检查以确保在将其放入行字典之前该值不是None
或nan
。现在您的 .insert_many
将只包含文档中具有值的键(并且没有 null
数据类型)。
【讨论】:
这是一个好方法,因为在上传dataframe到mongodb的时候确实需要处理空值,而且这个方法比DataFrame.to_dict()
快,顺便说一句,columns = list(df.columns)
,然后[k: _maybe_box_datetimelike(v) for k, v in zip(columns, row) if v != None and v == v for row in df.values]
更快.【参考方案6】:
我认为这个问题有很酷的想法。就我而言,我一直在花时间更多地处理大型数据帧的移动。在这种情况下,pandas 倾向于允许您选择 chunksize(例如 pandas.DataFrame.to_sql 中的示例)。所以我想我可以通过添加我在这个方向上使用的功能来做出贡献。
def write_df_to_mongoDB( my_df,\
database_name = 'mydatabasename' ,\
collection_name = 'mycollectionname',
server = 'localhost',\
mongodb_port = 27017,\
chunk_size = 100):
#"""
#This function take a list and create a collection in MongoDB (you should
#provide the database name, collection, port to connect to the remoete database,
#server of the remote database, local port to tunnel to the other machine)
#
#---------------------------------------------------------------------------
#Parameters / Input
# my_list: the list to send to MongoDB
# database_name: database name
#
# collection_name: collection name (to create)
# server: the server of where the MongoDB database is hosted
# Example: server = 'XXX.XXX.XX.XX'
# this_machine_port: local machine port.
# For example: this_machine_port = '27017'
# remote_port: the port where the database is operating
# For example: remote_port = '27017'
# chunk_size: The number of items of the list that will be send at the
# some time to the database. Default is 100.
#
#Output
# When finished will print "Done"
#----------------------------------------------------------------------------
#FUTURE modifications.
#1. Write to SQL
#2. Write to csv
#----------------------------------------------------------------------------
#30/11/2017: Rafael Valero-Fernandez. Documentation
#"""
#To connect
# import os
# import pandas as pd
# import pymongo
# from pymongo import MongoClient
client = MongoClient('localhost',int(mongodb_port))
db = client[database_name]
collection = db[collection_name]
# To write
collection.delete_many() # Destroy the collection
#aux_df=aux_df.drop_duplicates(subset=None, keep='last') # To avoid repetitions
my_list = my_df.to_dict('records')
l = len(my_list)
ran = range(l)
steps=ran[chunk_size::chunk_size]
steps.extend([l])
# Inser chunks of the dataframe
i = 0
for j in steps:
print j
collection.insert_many(my_list[i:j]) # fill de collection
i = j
print('Done')
return
【讨论】:
这真的很有用,谢谢。您可能希望使用当前输入更新 Args(输入)部分。 AttributeError: 'range' 对象没有属性 'extend'【参考方案7】:只需制作字符串键!
import json
dfData = json.dumps(df.to_dict('records'))
savaData = '_id': 'a8e42ed79f9dae1cefe8781760231ec0', 'df': dfData
res = client.insert_one(savaData)
##### load dfData
data = client.find_one('_id': 'a8e42ed79f9dae1cefe8781760231ec0').get('df')
dfData = json.loads(data)
df = pd.DataFrame.from_dict(dfData)
【讨论】:
【参考方案8】:如果您想一次发送多个:
db.myCollection.insert_many(df.apply(lambda x: x.to_dict(), axis=1).to_list())
【讨论】:
【参考方案9】:如果您想确保不会引发 InvalidDocument 错误,那么类似以下内容是个好主意。这是因为 mongo 无法识别 np.int64、np.float64 等类型。
from pymongo import MongoClient
client = MongoClient()
db = client.test
col = db.col
def createDocsFromDF(df, collection = None, insertToDB=False):
docs = []
fields = [col for col in df.columns]
for i in range(len(df)):
doc = col:df[col][i] for col in df.columns if col != 'index'
for key, val in doc.items():
# we have to do this, because mongo does not recognize these np. types
if type(val) == np.int64:
doc[key] = int(val)
if type(val) == np.float64:
doc[key] = float(val)
if type(val) == np.bool_:
doc[key] = bool(val)
docs.append(doc)
if insertToDB and collection:
db.collection.insert_many(docs)
return docs
【讨论】:
【参考方案10】:我使用以下部分将数据框插入到数据库中的集合中。
df.reset_index(inplace=True)
data_dict = df.to_dict("records")
myCollection.insert_many(data_dict)
【讨论】:
以上是关于使用 PyMongo 将 Pandas 数据框插入 mongodb的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 pandas.read_csv 将 CSV 文件中的数据插入数据框?
根据 pandas 数据框中的条件将 value_counts 与 groupby 函数一起使用并插入新列
如何通过 executemany() 语句转换 pandas 数据框以进行插入?