将数据从 MongoDB 并行加载到 python
Posted
技术标签:
【中文标题】将数据从 MongoDB 并行加载到 python【英文标题】:Parallelizing loading data from MongoDB into python 【发布时间】:2017-10-19 18:45:12 【问题描述】:我在 MongoDB 中的集合中的所有文档都具有相同的字段。我的目标是将它们加载到 Python 中到 pandas.DataFrame
或 dask.DataFrame
。
我想通过并行化来加快加载过程。我的计划是产生几个进程或线程。每个进程都会加载一个集合的一个块,然后这些块将被合并在一起。
如何正确使用 MongoDB?
我在 PostgreSQL 上尝试过类似的方法。我最初的想法是在 SQL 查询中使用SKIP
和LIMIT
。它失败了,因为为每个特定查询打开的每个游标都从头开始读取数据表并且只是跳过了指定数量的行。所以我不得不创建额外的列,包含记录号,并在查询中指定这些数字的范围。
相反,MongoDB 为每个文档分配唯一的 ObjectID。但是,我发现不可能从另一个 ObjectID 中减去一个 ObjectID,它们只能与排序操作进行比较:小于、大于和等于。
另外,pymongo
返回光标对象,它支持索引操作并有一些方法,对我的任务似乎很有用,例如count
、limit
。
用于 Spark 的 MongoDB 连接器以某种方式完成了这项任务。不幸的是,我对 Scala 并不熟悉,因此,我很难知道他们是如何做到的。
那么,将数据从 Mongo 并行加载到 python 中的正确方法是什么?
到目前为止,我已经得出以下解决方案:
import pandas as pd
import dask.dataframe as dd
from dask.delayed import delayed
# import other modules.
collection = get_mongo_collection()
cursor = collection.find( )
def process_document(in_doc):
out_doc = # process doc keys and values
return pd.DataFrame(out_doc)
df = dd.from_delayed( (delayed(process_document)(d) for d in cursor) )
但是,看起来dask.dataframe.from_delayed
在内部从传递的生成器创建了一个列表,有效地将所有集合加载到一个线程中。
更新。我发现in docs,pymongo.Cursor
的 skip
方法也从集合的开头开始,如 PostgreSQL。同一页面建议在应用程序中使用分页逻辑。到目前为止,我找到的解决方案为此使用 sorted _id
。但是,它们还存储最后一次看到的_id
,这意味着它们也可以在单个线程中工作。
更新2。我在官方的 MongoDb Spark 连接器中找到了 partitioner 的代码:https://github.com/mongodb/mongo-spark/blob/7c76ed1821f70ef2259f8822d812b9c53b6f2b98/src/main/scala/com/mongodb/spark/rdd/partitioner/MongoPaginationPartitioner.scala#L32
看起来,最初这个分区器从集合中的所有文档中读取关键字段并计算值的范围。
Update3:我的不完整解决方案。
不起作用,从 pymongo 获取异常,因为 dask 似乎错误地处理了 Collection
对象:
/home/user/.conda/envs/MBA/lib/python2.7/site-packages/dask/delayed.pyc in <genexpr>(***failed resolving arguments***)
81 return expr,
82 if isinstance(expr, (Iterator, list, tuple, set)):
---> 83 args, dasks = unzip((to_task_dask(e) for e in expr), 2)
84 args = list(args)
85 dsk = sharedict.merge(*dasks)
/home/user/.conda/envs/MBA/lib/python2.7/site-packages/pymongo/collection.pyc in __next__(self)
2342
2343 def __next__(self):
-> 2344 raise TypeError("'Collection' object is not iterable")
2345
2346 next = __next__
TypeError: 'Collection' object is not iterable
引发异常的原因:
def process_document(in_doc, other_arg):
# custom processing of incoming records
return out_doc
def compute_id_ranges(collection, query, partition_size=50):
cur = collection.find(query, '_id': 1).sort('_id', pymongo.ASCENDING)
id_ranges = [cur[0]['_id']]
count = 1
for r in cur:
count += 1
if count > partition_size:
id_ranges.append(r['_id'])
count = 0
id_ranges.append(r['_id'])
return zip(id_ranges[:len(id_ranges)-1], id_ranges[1: ])
def load_chunk(id_pair, collection, query=, projection=None):
q = query
q.update( "_id": "$gte": id_pair[0], "$lt": id_pair[1] )
cur = collection.find(q, projection)
return pd.DataFrame([process_document(d, other_arg) for d in cur])
def parallel_load(*args, **kwargs):
collection = kwargs['collection']
query = kwargs.get('query', )
projection = kwargs.get('projection', None)
id_ranges = compute_id_ranges(collection, query)
dfs = [ delayed(load_chunk)(ir, collection, query, projection) for ir in id_ranges ]
df = dd.from_delayed(dfs)
return df
collection = connect_to_mongo_and_return_collection_object(credentials)
# df = parallel_load(collection=collection)
id_ranges = compute_id_ranges(collection)
dedf = delayed(load_chunk)(id_ranges[0], collection)
load_chunk
直接调用时完美运行。但是,调用 delayed(load_chunk)( blah-blah-blah )
失败并出现异常,如上所述。
【问题讨论】:
我认为您的直觉就在这里,您希望构建几个获取数据集不同部分的 mongo 查询,然后使用 dask.delayed 并行加载这些,最后构建一些像数据框一样的 dask 集合与dask.dataframe.from_delayed
。我认为这里缺少的部分是分页。我会 ping 一位 Mongo 开发人员以获取更多信息。
我编写了一个函数,在两个预定义的_id
s 之间加载带有_id
s 的文档块。 def load_chunk(id_pair, collection, query=, projection=None)
但是如果我将这个函数包装在 delayed
中,它会尝试迭代 Mongo 集合,并获得集合不可迭代的异常。 dfs = delayed(load_chunk)(id_pair, collection, query, projection)
抱歉,现在没有时间重复示例。
【参考方案1】:
我正在研究 pymongo 并行化,这对我有用。我用了我不起眼的游戏笔记本电脑将近 100 分钟来处理我的 4000 万个文档的 mongodb。 CPU 使用率为 100%,我必须打开空调 :)
我使用跳过和限制函数来拆分数据库,然后将批次分配给进程。代码是为 Python 3 编写的:
import multiprocessing
from pymongo import MongoClient
def your_function(something):
<...>
return result
def process_cursor(skip_n,limit_n):
print('Starting process',skip_n//limit_n,'...')
collection = MongoClient().<db_name>.<collection_name>
cursor = collection.find().skip(skip_n).limit(limit_n)
for doc in cursor:
<do your magic>
# for example:
result = your_function(doc['your_field'] # do some processing on each document
# update that document by adding the result into a new field
collection.update_one('_id': doc['_id'], '$set': '<new_field_eg>': result )
print('Completed process',skip_n//limit_n,'...')
if __name__ == '__main__':
n_cores = 7 # number of splits (logical cores of the CPU-1)
collection_size = 40126904 # your collection size
batch_size = round(collection_size/n_cores+0.5)
skips = range(0, n_cores*batch_size, batch_size)
processes = [ multiprocessing.Process(target=process_cursor, args=(skip_n,batch_size)) for skip_n in skips]
for process in processes:
process.start()
for process in processes:
process.join()
最后一次拆分的限制将大于其余文档,但这不会引发错误
【讨论】:
这是一个非常低效的解决方案。每个游标都从收集开始,只是丢弃了一些记录。 Mongo手册里有说。 @wl2776 没错,我知道手册中提到的这一点。然而,跳过只在进程开始时进行一次,最坏的情况(即最后一批)大约需要 15 秒才能跳过 3500 万条记录。一旦跳过,光标将从该点继续。与总处理时间(100 分钟)相比,40 百万收集的跳过时间可以忽略不计。在性能方面,我在每条记录上使用 30 行正则表达式命令,平均每分钟处理 400,000 个文档,或每秒处理 6600 个文档 一年后我得出的结论是,对于科学应用来说,这是一个很好的解决方案,在这种应用中,只需将数据加载到 RAM 中一次。 推文 NLP 处理的绝佳选择。【参考方案2】:我认为dask-mongo
会在这里完成工作。您可以使用 pip 或 conda 安装它,在 repo 中您可以在 notebook 中找到一些示例。
dask-mongo
会将您在 MongoDB 中的数据作为 Dask 包读取,但随后您可以使用 df = b.to_dataframe()
从 Dask 包转到 Dask Dataframe,其中 b
是您使用 @987654326 从 mongo 读取的包@
【讨论】:
【参考方案3】:“阅读手册,这是规则”:)
pymongo.Collection
具有返回游标列表的方法 parallel_scan
。
更新。如果集合不经常更改并且查询始终相同(我的情况),则此功能可以完成工作。可以将查询结果存储在不同的集合中并运行并行扫描。
【讨论】:
还没有。一些限制仍然存在。 groups.google.com/d/msg/mongodb-user/8qshghoR4WU/9d3Chf7fFAAJ - 相同的想法以上是关于将数据从 MongoDB 并行加载到 python的主要内容,如果未能解决你的问题,请参考以下文章
使用 python 3.6 将多个文件并行加载到内存中的最佳方法是啥?
使用 mongoimport 将数据从 HDFS 导入 MongoDB
从外部 .json 文件加载默认数据并存储到 MongoDB