多进程:持久池?
Posted
技术标签:
【中文标题】多进程:持久池?【英文标题】:Multiprocess : Persistent Pool? 【发布时间】:2021-08-13 06:26:13 【问题描述】:我有如下代码:
def expensive(self,c,v):
.....
def inner_loop(self,c,collector):
self.db.query('SELECT ...',(c,))
for v in self.db.cursor.fetchall() :
collector.append( self.expensive(c,v) )
def method(self):
# create a Pool
#join the Pool ??
self.db.query('SELECT ...')
for c in self.db.cursor.fetchall() :
collector = []
#RUN the whole cycle in parallel in separate processes
self.inner_loop(c, collector)
#do stuff with the collector
#! close the pool ?
外循环和内循环都有数千步...... 我想我了解如何运行一个包含几个进程的池。 我发现的所有例子都或多或少地表明了这一点。
但在我的情况下,我需要午餐一个持久池,然后提供数据(c 值)。一旦内循环过程完成,我必须提供下一个可用的 c 值。 并保持流程运行并收集结果。
我该怎么做?
我有个笨拙的想法是:
def method(self):
ws = 4
with Pool(processes=ws) as pool :
cs = []
for i,c in enumerate(..) :
cs.append(c)
if i % ws == 0 :
res = [pool.apply(self.inner_loop, (c)) for i in range(ws)]
cs = []
collector.append(res)
这会保持同一个池运行吗?即不是每次都午餐新进程?i
我是否需要 'if i % ws == 0' 部分,或者我可以使用 imap()、map_async() 和 Pool obj 将在可用工作人员用尽时阻塞循环,并在释放一些工作人员时继续?
【问题讨论】:
你见过docs.python.org/3/library/… 吗?如果是这样,您能否详细说明您特别困惑的地方? 我的主要困惑是你如何划分 N-c 值并保持 N-进程的馈送。 for 循环也必须一次输入 N 个值,但它一次生成一个 【参考方案1】:是的,multiprocessing.Pool
的工作方式是:
池中的工作进程通常在池的工作队列的整个持续时间内都存在。
因此,只需通过imap
将所有工作提交到池中就足够了:
with Pool(processes=4) as pool:
initial_results = db.fetchall("SELECT c FROM outer")
results = [pool.imap(self.inner_loop, (c,)) for c in initial_results]
也就是说,如果您真的这样做是为了从数据库中获取数据,那么可能将更多的处理向下移到该层(将计算带到数据中,而不是将数据到计算)。
【讨论】:
谢谢。在我的情况下,对大查询进行分区并预缓存初始批次并使用池使我的工作在 1 小时内完成,而 5++ 小时;)以上是关于多进程:持久池?的主要内容,如果未能解决你的问题,请参考以下文章
python:多进程,多进程队列,多进程管道,Manager,进程锁,进程池