如何在python 2.7中使用pymongo进行多处理池
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何在python 2.7中使用pymongo进行多处理池相关的知识,希望对你有一定的参考价值。
我正在与Pymongo和Multiprocessing Pool一起运行10个进程并从API获取数据并将输出插入到mongodb中。
我认为我编写代码的方式做错了,因为python显示双连接打开比通常情况要多;例如:如果我运行10个进程,Mongodb将输出20个或更多连接已建立,我将在启动时收到以下警告:
UserWarning:MongoClient在fork之前打开。使用connect = False创建MongoClient,或者在分叉后创建客户端。有关详细信息,请参阅PyMongo的文档:http://api.mongodb.org/python/current/faq.html#using-pymongo-with-multiprocessing>
甚至我在mongodb的连接器客户端输入connect = False。这是一个示例代码,用于了解我如何使用pymongo并请求API在池中发送请求:
# -*- coding: utf-8 -*-
#!/usr/bin/python
import json # to decode and encode json
import requests # web POST and GET requests.
from pymongo import MongoClient # the mongo driver / connector
from bson import ObjectId # to generate bson object for MongoDB
from multiprocessing import Pool # for the multithreading
# Create the mongoDB Database object, declare collections
client = MongoClient('mongodb://192.168.0.1:27017,192.168.0.2:27017./?replicaSet=rs0', maxPoolSize=20, connect=False)
index = client.database.index
users = client.database.users
def get_user(userid):
params = {"userid":userid}
r = requests.get("https://exampleapi.com/getUser",params=params)
j = json.loads(r.content)
return j
def process(index_line):
user = get_user(index_line["userid"])
if(user):
users.insert(user)
def main():
# limit to 100,000 lines of data each loop
limited = 100
# skip number of lines for the loop (getting updated)
skipped = 0
while True:
# get cursor with data from index collection
cursor = index.find({},no_cursor_timeout=True).skip(skipped).limit(limited)
# prepare the pool with threads
p = Pool(10)
# start multiprocessing the pool with the dataset
p.map(process, cursor)
# after pool finished, kill it with fire
p.close()
p.terminate()
p.join()
# after finishing the 100k lines, go for another round, inifnite.
skipped = skipped + limited
print "[-] Skipping %s " % skipped
if __name__ == '__main__':
main()
我的代码算法有什么问题吗?有没有办法让它更有效率,更好地工作,更好地控制我的游泳池?
我已经研究了很长一段时间但是找不到办法以更好的方式做我想做的事情,希望得到一些帮助。
谢谢。
建议为每个进程创建一次MongoClient
,而不是为每个进程共享相同的客户端。
这是因为MongoClient
还使用连接池处理来自进程的多个连接,并且不是fork安全的。
首先,您希望确保当要处理的集合中的每个文档都已用完时,while循环中断。虽然这不是一个太精细的条件,但如果skipped
大于文档数,则可以打破循环。
其次,在循环外部初始化进程Pool
并在循环内映射进程。 multiprocessing.Pool.map
等待子进程完成并返回,因此加入池将导致异常。如果您想异步运行子进程,可以考虑使用multiprocessing.Pool.async_map
。
您可以使用multiprocessing.Queue
,producer和consumer进程以更好的方式明确地实现它。生产者进程将向队列添加任务以由消费者进程执行。以这种方式实现解决方案的好处并不是很清楚,因为多处理库也使用了队列。
import requests # web POST and GET requests.
from pymongo import MongoClient # the mongo driver / connector
from bson import ObjectId # to generate bson object for MongoDB
from multiprocessing import Pool # for the multithreading
def get_user(userid):
params = {"userid": userid}
rv = requests.get("https://exampleapi.com/getUser", params=params)
json = rv.json()
return json['content']
def create_connect():
return MongoClient(
'mongodb://192.168.0.1:27017,192.168.0.2:27017/?replicaSet=rs0', maxPoolSize=20
)
def consumer(index_line):
client = create_connect()
users = client.database.users
user = get_user(index_line["_id"])
if user:
users.insert(user)
def main():
# limit to 100,000 lines of data each loop
limited = 100
# skip number of lines for the loop (getting updated)
skipped = 0
client = create_connect()
index = client.database.index
pool = Pool(10)
count = index.count()
while True:
if skipped > count:
break
cursor = index.find({}).skip(skipped).limit(limited)
pool.map(consumer, cursor)
skipped = skipped + limited
print("[-] Skipping {}".format(skipped))
if __name__ == '__main__':
main()
以上是关于如何在python 2.7中使用pymongo进行多处理池的主要内容,如果未能解决你的问题,请参考以下文章
如何使用Python 2.7对YoBit API进行身份验证?
如何使用 python mongodb 客户端库(pymongo)更新 mongodb 集合中所有文档的字段“类型”