如何恢复传递给 multiprocessing.Process 的函数的返回值?
Posted
技术标签:
【中文标题】如何恢复传递给 multiprocessing.Process 的函数的返回值?【英文标题】:How can I recover the return value of a function passed to multiprocessing.Process? 【发布时间】:2012-05-12 00:04:11 【问题描述】:在下面的示例代码中,我想恢复函数worker
的返回值。我该怎么做呢?这个值存储在哪里?
示例代码:
import multiprocessing
def worker(procnum):
'''worker function'''
print str(procnum) + ' represent!'
return procnum
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
print jobs
输出:
0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]
我似乎在jobs
中存储的对象中找不到相关属性。
【问题讨论】:
【参考方案1】:使用shared variable 进行交流。比如这样:
import multiprocessing
def worker(procnum, return_dict):
"""worker function"""
print(str(procnum) + " represent!")
return_dict[procnum] = procnum
if __name__ == "__main__":
manager = multiprocessing.Manager()
return_dict = manager.dict()
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i, return_dict))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
print(return_dict.values())
【讨论】:
我建议在这里使用multiprocessing.Queue
,而不是Manager
。使用Manager
需要生成一个全新的进程,而Queue
可以这样做。
@dano : 我想知道,如果我们使用 Queue() 对象,我们无法确定每个进程返回值的顺序。我的意思是,如果我们需要结果中的顺序,来做下一个工作。我们如何确定哪个输出来自哪个进程
@Catbuilts 您可以从每个进程返回一个元组,其中一个值是您关心的实际返回值,另一个是进程的唯一标识符。但我也想知道为什么你需要知道哪个进程正在返回哪个值。如果这是您实际需要了解的流程,或者您是否需要在输入列表和输出列表之间建立关联?在这种情况下,我建议使用multiprocessing.Pool.map
来处理您的工作项列表。
只有一个参数的函数的警告:应该使用args=(my_function_argument, )
。注意这里的,
逗号!否则 Python 会抱怨“缺少位置参数”。我花了10分钟才弄清楚。还要检查manual usage(在“进程类”部分下)。
@vartec 使用 multipriocessing.Manager() 字典的一个缺点是它会腌制(序列化)它返回的对象,因此它具有由对象最大 2GiB 大小的腌制库给出的瓶颈返回。有没有其他方法可以避免返回对象的序列化?【参考方案2】:
我认为@sega_sai 建议的方法更好。但它确实需要一个代码示例,所以这里是:
import multiprocessing
from os import getpid
def worker(procnum):
print('I am number %d in process %d' % (procnum, getpid()))
return getpid()
if __name__ == '__main__':
pool = multiprocessing.Pool(processes = 3)
print(pool.map(worker, range(5)))
将打印返回值:
I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]
如果您熟悉map
(Python 2 内置),这应该不会太具有挑战性。否则看看sega_Sai's link。
注意只需要很少的代码。 (还要注意如何重用流程)。
【讨论】:
任何想法为什么我的getpid()
返回所有相同的值?我正在运行 Python3
我不确定 Pool 如何将任务分配给工作人员。如果他们真的很快,也许他们都可以在同一个工人身上结束?它是否始终如一地发生?另外,如果您添加延迟?
我也认为这是与速度有关的事情,但是当我使用 10 多个进程向 pool.map
提供 1,000,000 的范围时,我最多看到两个不同的 pid。
那我不确定。我认为为此打开一个单独的问题会很有趣。
如果你想给每个进程发送不同的函数,使用pool.apply_async
:docs.python.org/3/library/…【参考方案3】:
对于寻求如何使用 Queue
从 Process
获取值的其他人:
import multiprocessing
ret = 'foo': False
def worker(queue):
ret = queue.get()
ret['foo'] = True
queue.put(ret)
if __name__ == '__main__':
queue = multiprocessing.Queue()
queue.put(ret)
p = multiprocessing.Process(target=worker, args=(queue,))
p.start()
p.join()
print(queue.get()) # Prints "foo": True
请注意,在 Windows 或 Jupyter Notebook 中,使用 multithreading
您必须将其保存为文件并执行该文件。如果您在命令提示符下执行此操作,您将看到如下错误:
AttributeError: Can't get attribute 'worker' on <module '__main__' (built-in)>
【讨论】:
当我在工作进程的队列中放入一些东西时,我的加入永远不会到达。知道这是怎么回事吗? @LaurensKoppenol 你的意思是你的主代码永久挂在 p.join() 并且永远不会继续吗?你的进程有无限循环吗? 是的,它无限挂在那里。我的工人全部完成(工人函数内的循环结束,之后为所有工人打印打印语句)。加入不做任何事情。如果我从我的函数中删除Queue
它确实让我通过 join()
@LaurensKoppenol 你是不是在打电话给p.start()
之前没有打电话给queue.put(ret)
?在这种情况下,工作线程将永远挂在queue.get()
。你可以通过复制我上面的 sn-p 来复制这个,同时注释掉 queue.put(ret)
。
@Bendemann 有人编辑了答案,并通过将queue.get
放在 queue.join 之前使其不正确。我现在通过在p.join
之后放置queue.get
来修复它。请重试。【参考方案4】:
出于某种原因,我在任何地方都找不到如何使用Queue
执行此操作的一般示例(即使 Python 的文档示例也不会产生多个进程),所以这是我尝试 10 次后得到的结果:
def add_helper(queue, arg1, arg2): # the func called in child processes
ret = arg1 + arg2
queue.put(ret)
def multi_add(): # spawns child processes
q = Queue()
processes = []
rets = []
for _ in range(0, 100):
p = Process(target=add_helper, args=(q, 1, 2))
processes.append(p)
p.start()
for p in processes:
ret = q.get() # will block
rets.append(ret)
for p in processes:
p.join()
return rets
Queue
是一个阻塞的线程安全队列,可用于存储子进程的返回值。所以你必须将队列传递给每个进程。这里不太明显的是,你必须先从队列中 get()
join
Process
es,否则队列会填满并阻塞所有内容。
更新面向对象的人(在 Python 3.4 中测试):
from multiprocessing import Process, Queue
class Multiprocessor():
def __init__(self):
self.processes = []
self.queue = Queue()
@staticmethod
def _wrapper(func, queue, args, kwargs):
ret = func(*args, **kwargs)
queue.put(ret)
def run(self, func, *args, **kwargs):
args2 = [func, self.queue, args, kwargs]
p = Process(target=self._wrapper, args=args2)
self.processes.append(p)
p.start()
def wait(self):
rets = []
for p in self.processes:
ret = self.queue.get()
rets.append(ret)
for p in self.processes:
p.join()
return rets
# tester
if __name__ == "__main__":
mp = Multiprocessor()
num_proc = 64
for _ in range(num_proc): # queue up multiple tasks running `sum`
mp.run(sum, [1, 2, 3, 4, 5])
ret = mp.wait() # get all results
print(ret)
assert len(ret) == num_proc and all(r == 15 for r in ret)
【讨论】:
【参考方案5】:这个例子展示了如何使用multiprocessing.Pipe 实例列表从任意数量的进程返回字符串:
import multiprocessing
def worker(procnum, send_end):
'''worker function'''
result = str(procnum) + ' represent!'
print result
send_end.send(result)
def main():
jobs = []
pipe_list = []
for i in range(5):
recv_end, send_end = multiprocessing.Pipe(False)
p = multiprocessing.Process(target=worker, args=(i, send_end))
jobs.append(p)
pipe_list.append(recv_end)
p.start()
for proc in jobs:
proc.join()
result_list = [x.recv() for x in pipe_list]
print result_list
if __name__ == '__main__':
main()
输出:
0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!']
此解决方案使用的资源比 multiprocessing.Queue 使用的资源少
一根管子 至少一个锁 缓冲区 一个线程或使用multiprocessing.SimpleQueue
一根管子 至少一个锁查看每种类型的来源非常有启发性。
【讨论】:
不使管道成为全局变量的最佳方法是什么? 我把所有的全局数据和代码放到一个主函数中,它的工作原理是一样的。这能回答你的问题吗? 在向管道添加(发送)任何新值之前是否总是必须读取管道? 如果返回的对象很大,这个答案会导致死锁。与其先做 proc.join(),不如先尝试 recv() 返回值,然后再做 join。 我支持@L.Pes。可能是特定于操作系统的,但我将此示例改编为我的用例,并且尝试 send_end.send(result) 以获得较大结果的工作人员将无限期挂起。收到固定后加入。如果 N=2 对您来说太有趣了,我们很乐意提供一个例子。【参考方案6】:您可以使用内置的exit
来设置进程的退出代码。可以从进程的exitcode
属性中获取:
import multiprocessing
def worker(procnum):
print str(procnum) + ' represent!'
exit(procnum)
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()
result = []
for proc in jobs:
proc.join()
result.append(proc.exitcode)
print result
输出:
0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]
【讨论】:
请注意,这种方法可能会令人困惑。进程通常应该以退出代码 0 退出,如果它们完成且没有错误。如果您有任何监控系统进程退出代码的东西,那么您可能会看到这些报告为错误。 如果您只想在父进程出错时引发异常,那就完美了。【参考方案7】:看来您应该改用multiprocessing.Pool 类并使用方法 .apply() .apply_async(), map()
http://docs.python.org/library/multiprocessing.html?highlight=pool#multiprocessing.pool.AsyncResult
【讨论】:
我有 tensorflow 代码,multiprocessing.Pool 将挂起但 multiprocessing.Process 不会挂起【参考方案8】:pebble 包有一个很好的抽象利用 multiprocessing.Pipe
,这使得这非常简单:
from pebble import concurrent
@concurrent.process
def function(arg, kwarg=0):
return arg + kwarg
future = function(1, kwarg=1)
print(future.result())
示例来自:https://pythonhosted.org/Pebble/#concurrent-decorators
【讨论】:
【参考方案9】:我想我会简化从上面复制的最简单的例子,在 Py3.6 上为我工作。最简单的是multiprocessing.Pool
:
import multiprocessing
import time
def worker(x):
time.sleep(1)
return x
pool = multiprocessing.Pool()
print(pool.map(worker, range(10)))
您可以设置池中的进程数,例如,Pool(processes=5)
。但是它默认为 CPU 计数,因此对于 CPU 密集型任务将其留空。 (无论如何,I/O 密集型任务通常适合线程,因为线程大部分时间都在等待,因此可以共享一个 CPU 内核。)Pool
也适用于chunking optimization。
(请注意,worker 方法不能嵌套在方法中。我最初在调用pool.map
的方法中定义了我的worker 方法,以使其完全独立,但随后进程无法导入它,并抛出“AttributeError:无法腌制本地对象outer_method..inner_method”。更多here。它可以在一个类中。)
(感谢指定打印 'represent!'
而不是 time.sleep()
的原始问题,但没有它,我认为某些代码不是同时运行的。)
Py3 的ProcessPoolExecutor
也是两行(.map
返回一个生成器,所以你需要list()
):
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
print(list(executor.map(worker, range(10))))
用普通的Process
es:
import multiprocessing
import time
def worker(x, queue):
time.sleep(1)
queue.put(x)
queue = multiprocessing.SimpleQueue()
tasks = range(10)
for task in tasks:
multiprocessing.Process(target=worker, args=(task, queue,)).start()
for _ in tasks:
print(queue.get())
如果您只需要put
和get
,请使用SimpleQueue
。第一个循环启动所有进程,然后第二个循环进行阻塞 queue.get
调用。我认为也没有任何理由打电话给p.join()
。
【讨论】:
【参考方案10】:一个简单的解决方案:
import multiprocessing
output=[]
data = range(0,10)
def f(x):
return x**2
def handler():
p = multiprocessing.Pool(64)
r=p.map(f, data)
return r
if __name__ == '__main__':
output.append(handler())
print(output[0])
输出:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
【讨论】:
【参考方案11】:如果您使用的是 Python 3,则可以使用 concurrent.futures.ProcessPoolExecutor
作为方便的抽象:
from concurrent.futures import ProcessPoolExecutor
def worker(procnum):
'''worker function'''
print(str(procnum) + ' represent!')
return procnum
if __name__ == '__main__':
with ProcessPoolExecutor() as executor:
print(list(executor.map(worker, range(5))))
输出:
0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]
【讨论】:
【参考方案12】:我稍微修改了 vartec 的答案,因为我需要从函数中获取错误代码。 (感谢vertec!!!这是一个很棒的技巧)
这也可以通过manager.list
来完成,但我认为最好将它放在一个字典中并在其中存储一个列表。这样,我们就可以保留函数和结果,因为我们无法确定列表的填充顺序。
from multiprocessing import Process
import time
import datetime
import multiprocessing
def func1(fn, m_list):
print 'func1: starting'
time.sleep(1)
m_list[fn] = "this is the first function"
print 'func1: finishing'
# return "func1" # no need for return since Multiprocess doesnt return it =(
def func2(fn, m_list):
print 'func2: starting'
time.sleep(3)
m_list[fn] = "this is function 2"
print 'func2: finishing'
# return "func2"
def func3(fn, m_list):
print 'func3: starting'
time.sleep(9)
# if fail wont join the rest because it never populate the dict
# or do a try/except to get something in return.
raise ValueError("failed here")
# if we want to get the error in the manager dict we can catch the error
try:
raise ValueError("failed here")
m_list[fn] = "this is third"
except:
m_list[fn] = "this is third and it fail horrible"
# print 'func3: finishing'
# return "func3"
def runInParallel(*fns): # * is to accept any input in list
start_time = datetime.datetime.now()
proc = []
manager = multiprocessing.Manager()
m_list = manager.dict()
for fn in fns:
# print fn
# print dir(fn)
p = Process(target=fn, name=fn.func_name, args=(fn, m_list))
p.start()
proc.append(p)
for p in proc:
p.join() # 5 is the time out
print datetime.datetime.now() - start_time
return m_list, proc
if __name__ == '__main__':
manager, proc = runInParallel(func1, func2, func3)
# print dir(proc[0])
# print proc[0]._name
# print proc[0].name
# print proc[0].exitcode
# here you can check what did fail
for i in proc:
print i.name, i.exitcode # name was set up in the Process line 53
# here will only show the function that worked and where able to populate the
# manager dict
for i, j in manager.items():
print dir(i) # things you can do to the function
print i, j
【讨论】:
以上是关于如何恢复传递给 multiprocessing.Process 的函数的返回值?的主要内容,如果未能解决你的问题,请参考以下文章
连接暂停/恢复后未触发传递给 RemoteMediaPlayer.load(...).setResultCallback(...) 的回调