多进程:持久池?

Posted

技术标签:

【中文标题】多进程:持久池?【英文标题】:Multiprocess : Persistent Pool? 【发布时间】:2021-08-13 06:26:13 【问题描述】:

我有如下代码:

def expensive(self,c,v):
    .....

def inner_loop(self,c,collector):
    self.db.query('SELECT ...',(c,))
    for v in self.db.cursor.fetchall() :
        collector.append( self.expensive(c,v) ) 

def method(self):

    # create a Pool
    #join the Pool ??

    self.db.query('SELECT ...')
    for c in self.db.cursor.fetchall() :
        collector = []

        #RUN the whole cycle in parallel in separate processes
        self.inner_loop(c, collector)

        #do stuff with the collector

    #! close the pool ?

外循环和内循环都有数千步...... 我想我了解如何运行一个包含几个进程的池。 我发现的所有例子都或多或少地表明了这一点。

但在我的情况下,我需要午餐一个持久池,然后提供数据(c 值)。一旦内循环过程完成,我必须提供下一个可用的 c 值。 并保持流程运行并收集结果。

我该怎么做?


我有个笨拙的想法是:

def method(self):
 ws = 4
 with Pool(processes=ws) as pool :
     cs = []
     for i,c in enumerate(..) :
       cs.append(c)  
       if i % ws == 0 :
         res = [pool.apply(self.inner_loop, (c)) for i in range(ws)]
         cs = []
         collector.append(res)

这会保持同一个池运行吗?即不是每次都午餐新进程?i


我是否需要 'if i % ws == 0' 部分,或者我可以使用 imap()、map_async() 和 Pool obj 将在可用工作人员用尽时阻塞循环,并在释放一些工作人员时继续?

【问题讨论】:

你见过docs.python.org/3/library/… 吗?如果是这样,您能否详细说明您特别困惑的地方? 我的主要困惑是你如何划分 N-c 值并保持 N-进程的馈送。 for 循环也必须一次输入 N 个值,但它一次生成一个 【参考方案1】:

是的,multiprocessing.Pool 的工作方式是:

池中的工作进程通常在池的工作队列的整个持续时间内都存在。

因此,只需通过imap 将所有工作提交到池中就足够了:

with Pool(processes=4) as pool:
    initial_results = db.fetchall("SELECT c FROM outer")
    results = [pool.imap(self.inner_loop, (c,)) for c in initial_results]

也就是说,如果您真的这样做是为了从数据库中获取数据,那么可能将更多的处理向下移到该层(将计算带到数据中,而不是将数据到计算)。

【讨论】:

谢谢。在我的情况下,对大查询进行分区并预缓存初始批次并使用池使我的工作在 1 小时内完成,而 5++ 小时;)

以上是关于多进程:持久池?的主要内容,如果未能解决你的问题,请参考以下文章

python:多进程,多进程队列,多进程管道,Manager,进程锁,进程池

python3多进程和进程池

多进程 multiprocessing 多线程Threading 线程池和进程池concurrent.futures

多线程多进程和线程池编程

Python 多进程进程池Queue进程通信

Python_多进程_pool进程池