多处理:如何在类中定义的函数上使用 Pool.map?
Posted
技术标签:
【中文标题】多处理:如何在类中定义的函数上使用 Pool.map?【英文标题】:Multiprocessing: How to use Pool.map on a function defined in a class? 【发布时间】:2011-03-18 08:17:22 【问题描述】:当我运行类似的东西时:
from multiprocessing import Pool
p = Pool(5)
def f(x):
return x*x
p.map(f, [1,2,3])
它工作正常。然而,把它作为一个类的函数:
class calculate(object):
def run(self):
def f(x):
return x*x
p = Pool()
return p.map(f, [1,2,3])
cl = calculate()
print cl.run()
给我以下错误:
Exception in thread Thread-1:
Traceback (most recent call last):
File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner
self.run()
File "/sw/lib/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
我看过 Alex Martelli 的一篇帖子处理了同样的问题,但不够明确。
【问题讨论】:
“这是一个类的函数”?你能发布实际得到实际错误的代码吗?如果没有实际代码,我们只能猜测您做错了什么。 一般来说,存在比 Python 的标准 pickle 模块更强大的 pickling 模块(如 this answer 中提到的 picloud 模块)。 我在IPython.Parallel
中遇到了类似的闭包问题,但是您可以通过将对象推送到节点来解决这个问题。用多处理解决这个问题似乎很烦人。
这里calculate
是可腌制的,所以这似乎可以通过以下方式解决:1)使用复制calculate
实例的构造函数创建一个函数对象,然后2)传递这个实例Pool
的 map
方法的函数对象。没有?
@math 我不相信 Python 的任何“最近的变化”会有任何帮助。 multiprocessing
模块的一些限制是由于它的目标是跨平台实现,并且在 Windows 中缺少类似 fork(2)
的系统调用。如果您不关心 Win32 支持,可能有更简单的基于流程的解决方法。或者,如果您准备使用线程而不是进程,则可以将 from multiprocessing import Pool
替换为 from multiprocessing.pool import ThreadPool as Pool
。
【参考方案1】:
您可以简单地将self
绑定到具有partial(MyClass.worker, self)
的成员函数并调用它:
import multiprocessing as mp
from functools import partial
class MyClass():
def __init__(self):
self.a = 0.1
def function(self, x):
print(self.a * x, self)
def run(self):
self.a = 1.0
f = partial(MyClass.function, self)
f(1)
with mp.Pool(2) as p:
p.map(f, [2, 3, 4, 5])
print(self)
c = MyClass()
c.run()
输出:
1.0 <__main__.MyClass object at 0x7f2cec1d80f0>
3.0 <__main__.MyClass object at 0x7f2cec1db128>
2.0 <__main__.MyClass object at 0x7f2cec1db160>
4.0 <__main__.MyClass object at 0x7f2cec1db0f0>
5.0 <__main__.MyClass object at 0x7f2cec1db3c8>
<__main__.MyClass object at 0x7f2cec1d80f0>
似乎Pool.map
创建了MyClass
对象的副本。因此,您不能写入实例c
的成员。
【讨论】:
【参考方案2】:我还对 pool.map 可以接受的函数类型的限制感到恼火。我写了以下内容来规避这一点。它似乎可以工作,即使对于 parmap 的递归使用也是如此。
from multiprocessing import Process, Pipe
from itertools import izip
def spawn(f):
def fun(pipe, x):
pipe.send(f(x))
pipe.close()
return fun
def parmap(f, X):
pipe = [Pipe() for x in X]
proc = [Process(target=spawn(f), args=(c, x)) for x, (p, c) in izip(X, pipe)]
[p.start() for p in proc]
[p.join() for p in proc]
return [p.recv() for (p, c) in pipe]
if __name__ == '__main__':
print parmap(lambda x: x**x, range(1, 5))
【讨论】:
这对我来说效果很好,谢谢。我发现了一个弱点:我尝试在一些传递 defaultdict 并再次得到 PicklingError 的函数上使用 parmap。我没有找到解决方案,我只是修改了我的代码以不使用 defaultdict。 这在 Python 2.7.2 中不起作用(默认,2011 年 6 月 12 日,15:08:59)[MSC v.1500 32 bit (Intel)] on win32 这确实适用于 Python 2.7.3 2012 年 8 月 1 日 05:14:39。这不适用于巨型迭代 -> 它会导致 OSError: [Errno 24] Too many open files due to the number of pipelines. 此解决方案为每个工作项生成一个进程。下面“克劳斯”的解决方案效率更高。 是我还是这个解决方案中没有类?那么它是否回答了最初的问题?【参考方案3】:这是我为在 python3 中使用多处理池而编写的样板,特别是 python3.7.7 用于运行测试。我使用imap_unordered
获得了最快的运行速度。只需插入您的场景并尝试一下。您可以使用timeit
或仅使用time.time()
来找出最适合您的方法。
import multiprocessing
import time
NUMBER_OF_PROCESSES = multiprocessing.cpu_count()
MP_FUNCTION = 'starmap' # 'imap_unordered' or 'starmap' or 'apply_async'
def process_chunk(a_chunk):
print(f"processig mp chunk a_chunk")
return a_chunk
map_jobs = [1, 2, 3, 4]
result_sum = 0
s = time.time()
if MP_FUNCTION == 'imap_unordered':
pool = multiprocessing.Pool(processes=NUMBER_OF_PROCESSES)
for i in pool.imap_unordered(process_chunk, map_jobs):
result_sum += i
elif MP_FUNCTION == 'starmap':
pool = multiprocessing.Pool(processes=NUMBER_OF_PROCESSES)
try:
map_jobs = [(i, ) for i in map_jobs]
result_sum = pool.starmap(process_chunk, map_jobs)
result_sum = sum(result_sum)
finally:
pool.close()
pool.join()
elif MP_FUNCTION == 'apply_async':
with multiprocessing.Pool(processes=NUMBER_OF_PROCESSES) as pool:
result_sum = [pool.apply_async(process_chunk, [i, ]).get() for i in map_jobs]
result_sum = sum(result_sum)
print(f"result_sum is result_sum, took time.time() - ss")
在上述情况下,imap_unordered
实际上对我来说似乎表现最差。试用您的案例并在您计划运行它的机器上对其进行基准测试。另请阅读Process Pools。干杯!
【讨论】:
【参考方案4】:我知道这个问题是在 8 年零 10 个月前提出的,但我想向您展示我的解决方案:
from multiprocessing import Pool
class Test:
def __init__(self):
self.main()
@staticmethod
def methodForMultiprocessing(x):
print(x*x)
def main(self):
if __name__ == "__main__":
p = Pool()
p.map(Test.methodForMultiprocessing, list(range(1, 11)))
p.close()
TestObject = Test()
你只需要将你的类函数变成一个静态方法。但也可以使用类方法:
from multiprocessing import Pool
class Test:
def __init__(self):
self.main()
@classmethod
def methodForMultiprocessing(cls, x):
print(x*x)
def main(self):
if __name__ == "__main__":
p = Pool()
p.map(Test.methodForMultiprocessing, list(range(1, 11)))
p.close()
TestObject = Test()
在 Python 3.7.3 中测试
【讨论】:
【参考方案5】:这可能不是一个很好的解决方案,但就我而言,我是这样解决的。
from multiprocessing import Pool
def foo1(data):
self = data.get('slf')
lst = data.get('lst')
return sum(lst) + self.foo2()
class Foo(object):
def __init__(self, a, b):
self.a = a
self.b = b
def foo2(self):
return self.a**self.b
def foo(self):
p = Pool(5)
lst = [1, 2, 3]
result = p.map(foo1, (dict(slf=self, lst=lst),))
return result
if __name__ == '__main__':
print(Foo(2, 4).foo())
我必须将self
传递给我的函数,因为我必须通过该函数访问我的类的属性和函数。这对我有用。随时欢迎更正和建议。
【讨论】:
【参考方案6】:如果您以某种方式手动忽略类中对象列表中的Pool
对象,则可以毫无问题地运行您的代码,因为它不是pickle
able,如错误所述。您可以使用__getstate__
函数(也可以查看here)执行此操作,如下所示。 Pool
对象将尝试查找 __getstate__
和 __setstate__
函数并在您运行 map
、map_async
等时执行它们:
class calculate(object):
def __init__(self):
self.p = Pool()
def __getstate__(self):
self_dict = self.__dict__.copy()
del self_dict['p']
return self_dict
def __setstate__(self, state):
self.__dict__.update(state)
def f(self, x):
return x*x
def run(self):
return self.p.map(self.f, [1,2,3])
然后做:
cl = calculate()
cl.run()
会给你输出:
[1, 4, 9]
我已经在 Python 3.x 中测试了上面的代码,它可以工作。
【讨论】:
非常有趣的方法,它确实有效,但由于某种原因,它比仅在全局范围内定义谓词函数要慢几个数量级。【参考方案7】:来自http://www.rueckstiess.net/research/snippets/show/ca1d7d90 和http://qingkaikong.blogspot.com/2016/12/python-parallel-method-in-class.html
我们可以创建一个外部函数并使用类 self 对象对其进行播种:
from joblib import Parallel, delayed
def unwrap_self(arg, **kwarg):
return square_class.square_int(*arg, **kwarg)
class square_class:
def square_int(self, i):
return i * i
def run(self, num):
results = []
results = Parallel(n_jobs= -1, backend="threading")\
(delayed(unwrap_self)(i) for i in zip([self]*len(num), num))
print(results)
或者没有 joblib:
from multiprocessing import Pool
import time
def unwrap_self_f(arg, **kwarg):
return C.f(*arg, **kwarg)
class C:
def f(self, name):
print 'hello %s,'%name
time.sleep(5)
print 'nice to meet you.'
def run(self):
pool = Pool(processes=2)
names = ('frank', 'justin', 'osi', 'thomas')
pool.map(unwrap_self_f, zip([self]*len(names), names))
if __name__ == '__main__':
c = C()
c.run()
【讨论】:
【参考方案8】:这是我的解决方案,我认为它比这里的大多数其他解决方案要少一些。和夜猫子的回答差不多。
someclasses = [MyClass(), MyClass(), MyClass()]
def method_caller(some_object, some_method='the method'):
return getattr(some_object, some_method)()
othermethod = partial(method_caller, some_method='othermethod')
with Pool(6) as pool:
result = pool.map(othermethod, someclasses)
【讨论】:
【参考方案9】:我无法使用到目前为止发布的代码,因为使用“multiprocessing.Pool”的代码不适用于 lambda 表达式,并且不使用“multiprocessing.Pool”的代码会产生与工作项一样多的进程。
我修改了代码 s.t.它产生预定义数量的工作人员,并且仅在存在空闲工作人员时才遍历输入列表。我还为工人 s.t. 启用了“守护程序”模式。 ctrl-c 按预期工作。
import multiprocessing
def fun(f, q_in, q_out):
while True:
i, x = q_in.get()
if i is None:
break
q_out.put((i, f(x)))
def parmap(f, X, nprocs=multiprocessing.cpu_count()):
q_in = multiprocessing.Queue(1)
q_out = multiprocessing.Queue()
proc = [multiprocessing.Process(target=fun, args=(f, q_in, q_out))
for _ in range(nprocs)]
for p in proc:
p.daemon = True
p.start()
sent = [q_in.put((i, x)) for i, x in enumerate(X)]
[q_in.put((None, None)) for _ in range(nprocs)]
res = [q_out.get() for _ in range(len(sent))]
[p.join() for p in proc]
return [x for i, x in sorted(res)]
if __name__ == '__main__':
print(parmap(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8]))
【讨论】:
你如何获得一个进度条来正确使用这个parmap
函数?
一个问题——我使用了这个解决方案,但注意到我生成的 python 进程在内存中保持活动状态。关于如何在你的 parmap 退出时杀死那些人有什么快速的想法吗?
@klaus-se 我知道我们不鼓励在 cmets 中说谢谢,但你的回答对我来说太有价值了,我无法抗拒。我希望我能给你的不仅仅是一个声誉......
@greole 将(None, None)
作为最后一项传递给fun
,表明它已到达每个进程的项目序列的末尾。
@deshtop:如果你自己有足够的声誉,你可以得到赏金:-)【参考方案10】:
除非您跳出标准库,否则多处理和酸洗会受到破坏和限制。
如果使用multiprocessing
的fork,称为pathos.multiprocesssing
,则可以直接在多处理的map
函数中使用类和类方法。这是因为dill
被用来代替pickle
或cPickle
,而dill
几乎可以在python 中序列化任何东西。
pathos.multiprocessing
还提供了一个异步映射函数……它可以map
具有多个参数的函数(例如map(math.pow, [1,2,3], [4,5,6])
)
查看讨论: What can multiprocessing and dill do together?
和: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization
它甚至可以处理您最初编写的代码,无需修改,并且来自解释器。为什么还要做其他更脆弱和特定于单个案例的代码?
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> class calculate(object):
... def run(self):
... def f(x):
... return x*x
... p = Pool()
... return p.map(f, [1,2,3])
...
>>> cl = calculate()
>>> print cl.run()
[1, 4, 9]
在此处获取代码: https://github.com/uqfoundation/pathos
而且,只是为了展示它的更多功能:
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>>
>>> p = Pool(4)
>>>
>>> def add(x,y):
... return x+y
...
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>>
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>>
>>> class Test(object):
... def plus(self, x, y):
... return x+y
...
>>> t = Test()
>>>
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>>
>>> res = p.amap(t.plus, x, y)
>>> res.get()
[4, 6, 8, 10]
【讨论】:
pathos.multiprocessing 还有一个异步映射 (amap
),可以使用进度条和其他异步编程。
我喜欢 pathos.multiprocessing,在享受多重处理的同时,它几乎可以替代非并行地图。我有一个简单的 pathos.multiprocessing.map 包装器,这样在处理跨多个内核的只读大型数据结构时,它的内存效率更高,请参阅this git repository。
看起来很有趣,但它没有安装。这是 pip 给出的消息:Could not find a version that satisfies the requirement pp==1.5.7-pathos (from pathos)
是的。我已经有一段时间没有发布了,因为我已经将功能拆分为单独的包,并且还转换为 2/3 兼容代码。以上大部分内容已在multiprocess
中进行了模块化,兼容 2/3。见***.com/questions/27873093/… 和pypi.python.org/pypi/multiprocess。
@xApple:作为后续,pathos
有了一个新的稳定版本,并且兼容 2.x 和 3.x。【参考方案11】:
我知道这是 6 年前提出的问题,但只是想添加我的解决方案,因为上面的一些建议看起来非常复杂,但我的解决方案实际上非常简单。
我所要做的就是将 pool.map() 调用包装到一个辅助函数中。将类对象与方法的参数一起作为元组传递,看起来有点像这样。
def run_in_parallel(args):
return args[0].method(args[1])
myclass = MyClass()
method_args = [1,2,3,4,5,6]
args_map = [ (myclass, arg) for arg in method_args ]
pool = Pool()
pool.map(run_in_parallel, args_map)
【讨论】:
【参考方案12】:class Calculate(object):
# Your instance method to be executed
def f(self, x, y):
return x*y
if __name__ == '__main__':
inp_list = [1,2,3]
y = 2
cal_obj = Calculate()
pool = Pool(2)
results = pool.map(lambda x: cal_obj.f(x, y), inp_list)
您可能希望对类的每个不同实例应用此函数。那么这也是解决方案
class Calculate(object):
# Your instance method to be executed
def __init__(self, x):
self.x = x
def f(self, y):
return self.x*y
if __name__ == '__main__':
inp_list = [Calculate(i) for i in range(3)]
y = 2
pool = Pool(2)
results = pool.map(lambda x: x.f(y), inp_list)
【讨论】:
【参考方案13】:我接受了 klaus se 和 aganders3 的回答,并制作了一个文档化的模块,该模块更具可读性并保存在一个文件中。您可以将其添加到您的项目中。它甚至还有一个可选的进度条!
"""
The ``processes`` module provides some convenience functions
for using parallel processes in python.
Adapted from http://***.com/a/16071616/287297
Example usage:
print prll_map(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8], 32, verbose=True)
Comments:
"It spawns a predefined amount of workers and only iterates through the input list
if there exists an idle worker. I also enabled the "daemon" mode for the workers so
that KeyboardInterupt works as expected."
Pitfalls: all the stdouts are sent back to the parent stdout, intertwined.
Alternatively, use this fork of multiprocessing:
https://github.com/uqfoundation/multiprocess
"""
# Modules #
import multiprocessing
from tqdm import tqdm
################################################################################
def apply_function(func_to_apply, queue_in, queue_out):
while not queue_in.empty():
num, obj = queue_in.get()
queue_out.put((num, func_to_apply(obj)))
################################################################################
def prll_map(func_to_apply, items, cpus=None, verbose=False):
# Number of processes to use #
if cpus is None: cpus = min(multiprocessing.cpu_count(), 32)
# Create queues #
q_in = multiprocessing.Queue()
q_out = multiprocessing.Queue()
# Process list #
new_proc = lambda t,a: multiprocessing.Process(target=t, args=a)
processes = [new_proc(apply_function, (func_to_apply, q_in, q_out)) for x in range(cpus)]
# Put all the items (objects) in the queue #
sent = [q_in.put((i, x)) for i, x in enumerate(items)]
# Start them all #
for proc in processes:
proc.daemon = True
proc.start()
# Display progress bar or not #
if verbose:
results = [q_out.get() for x in tqdm(range(len(sent)))]
else:
results = [q_out.get() for x in range(len(sent))]
# Wait for them to finish #
for proc in processes: proc.join()
# Return results #
return [x for i, x in sorted(results)]
################################################################################
def test():
def slow_square(x):
import time
time.sleep(2)
return x**2
objs = range(20)
squares = prll_map(slow_square, objs, 4, verbose=True)
print "Result: %s" % squares
编辑:添加了@alexander-mcfarlane 建议和测试功能
【讨论】:
进度条的一个问题... 该进度条仅衡量工作负载在处理器之间分配的效率如何。如果工作负载被完美分割,那么所有处理器将同时join()
,您将在tqdm
显示中完成100%
的闪烁。唯一有用的是每个处理器都有一个有偏差的工作负载
移动 tqdm()
换行:result = [q_out.get() for _ in tqdm(sent)]
效果更好 - 付出了很大的努力,但非常感谢 +1
感谢您的建议,我会尝试然后更新答案!
答案更新了,进度条效果好多了!
我不知道为什么,但是尝试这个 sn-p 时出错! _pickle.PicklingError: Can't pickle <function <lambda> at 0x000001717B311E18>: attribute lookup <lambda> on __main__ failed
【参考方案14】:
我不确定是否已采用这种方法,但我正在使用的解决方法是:
from multiprocessing import Pool
t = None
def run(n):
return t.f(n)
class Test(object):
def __init__(self, number):
self.number = number
def f(self, x):
print x * self.number
def pool(self):
pool = Pool(2)
pool.map(run, range(10))
if __name__ == '__main__':
t = Test(9)
t.pool()
pool = Pool(2)
pool.map(run, range(10))
输出应该是:
0
9
18
27
36
45
54
63
72
81
0
9
18
27
36
45
54
63
72
81
【讨论】:
【参考方案15】:我修改了 klaus se 的方法,因为当它适用于我的小列表时,当项目数约为 1000 或更多时它会挂起。我没有使用None
停止条件一次推送一个作业,而是一次加载所有输入队列,然后让进程咀嚼它直到它为空。
from multiprocessing import cpu_count, Queue, Process
def apply_func(f, q_in, q_out):
while not q_in.empty():
i, x = q_in.get()
q_out.put((i, f(x)))
# map a function using a pool of processes
def parmap(f, X, nprocs = cpu_count()):
q_in, q_out = Queue(), Queue()
proc = [Process(target=apply_func, args=(f, q_in, q_out)) for _ in range(nprocs)]
sent = [q_in.put((i, x)) for i, x in enumerate(X)]
[p.start() for p in proc]
res = [q_out.get() for _ in sent]
[p.join() for p in proc]
return [x for i,x in sorted(res)]
编辑:不幸的是,现在我在我的系统上遇到了这个错误:Multiprocessing Queue maxsize limit is 32767,希望那里的解决方法会有所帮助。
【讨论】:
【参考方案16】:在类中定义的函数(甚至在类中的函数内)并没有真正腌制。但是,这是可行的:
def f(x):
return x*x
class calculate(object):
def run(self):
p = Pool()
return p.map(f, [1,2,3])
cl = calculate()
print cl.run()
【讨论】:
谢谢,但我觉得在类外定义函数有点脏。该类应捆绑完成给定任务所需的所有内容。 @Memoz:“课程应该捆绑所有它需要的东西”真的吗?我找不到很多这样的例子。大多数类依赖于其他类或函数。为什么将类依赖称为“脏”?依赖有什么问题? 好吧,函数不应该修改现有的类数据——因为它会修改其他进程中的版本——所以它可以是一个静态方法。您可以腌制一个静态方法:***.com/questions/1914261/… 或者,对于这种微不足道的事情,您可以使用 lambda。【参考方案17】:mrule的解决方法是正确的,但是有个bug:如果child发回大量数据,会填满pipe的buffer,阻塞child的pipe.send()
,而parent在等待child退出在pipe.join()
。解决方案是在join()
ing 孩子之前读取孩子的数据。此外,孩子应该关闭父母的管道末端以防止死锁。下面的代码解决了这个问题。另请注意,parmap
会为X
中的每个元素创建一个进程。更高级的解决方案是使用multiprocessing.cpu_count()
将X
分成若干个chunk,然后将结果合并后再返回。我将其作为练习留给读者,以免破坏 mrule 好答案的简洁性。 ;)
from multiprocessing import Process, Pipe
from itertools import izip
def spawn(f):
def fun(ppipe, cpipe,x):
ppipe.close()
cpipe.send(f(x))
cpipe.close()
return fun
def parmap(f,X):
pipe=[Pipe() for x in X]
proc=[Process(target=spawn(f),args=(p,c,x)) for x,(p,c) in izip(X,pipe)]
[p.start() for p in proc]
ret = [p.recv() for (p,c) in pipe]
[p.join() for p in proc]
return ret
if __name__ == '__main__':
print parmap(lambda x:x**x,range(1,5))
【讨论】:
进程数如何选择? 但是由于错误OSError: [Errno 24] Too many open files
,它很快就死了。我认为需要对进程数量进行某种限制才能正常工作......【参考方案18】:
我也为此苦苦挣扎。作为一个简化的示例,我将函数作为类的数据成员:
from multiprocessing import Pool
import itertools
pool = Pool()
class Example(object):
def __init__(self, my_add):
self.f = my_add
def add_lists(self, list1, list2):
# Needed to do something like this (the following line won't work)
return pool.map(self.f,list1,list2)
我需要在同一个类中的 Pool.map() 调用中使用函数 self.f,而 self.f 没有将元组作为参数。由于此函数嵌入在一个类中,我不清楚如何编写其他答案建议的包装器类型。
我通过使用不同的包装器解决了这个问题,该包装器采用元组/列表,其中第一个元素是函数,其余元素是该函数的参数,称为 eval_func_tuple(f_args)。使用它,有问题的行可以替换为 return pool.map(eval_func_tuple, itertools.izip(itertools.repeat(self.f), list1, list2))。完整代码如下:
文件:util.py
def add(a, b): return a+b
def eval_func_tuple(f_args):
"""Takes a tuple of a function and args, evaluates and returns result"""
return f_args[0](*f_args[1:])
文件:main.py
from multiprocessing import Pool
import itertools
import util
pool = Pool()
class Example(object):
def __init__(self, my_add):
self.f = my_add
def add_lists(self, list1, list2):
# The following line will now work
return pool.map(util.eval_func_tuple,
itertools.izip(itertools.repeat(self.f), list1, list2))
if __name__ == '__main__':
myExample = Example(util.add)
list1 = [1, 2, 3]
list2 = [10, 20, 30]
print myExample.add_lists(list1, list2)
运行 main.py 将给出 [11, 22, 33]。随意改进这一点,例如 eval_func_tuple 也可以修改为采用关键字参数。
另一方面,在另一个答案中,对于更多进程而不是可用 CPU 数量的情况,可以使函数“parmap”更有效。我正在复制下面的编辑版本。这是我的第一篇文章,我不确定是否应该直接编辑原始答案。我还重命名了一些变量。
from multiprocessing import Process, Pipe
from itertools import izip
def spawn(f):
def fun(pipe,x):
pipe.send(f(x))
pipe.close()
return fun
def parmap(f,X):
pipe=[Pipe() for x in X]
processes=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)]
numProcesses = len(processes)
processNum = 0
outputList = []
while processNum < numProcesses:
endProcessNum = min(processNum+multiprocessing.cpu_count(), numProcesses)
for proc in processes[processNum:endProcessNum]:
proc.start()
for proc in processes[processNum:endProcessNum]:
proc.join()
for proc,c in pipe[processNum:endProcessNum]:
outputList.append(proc.recv())
processNum = endProcessNum
return outputList
if __name__ == '__main__':
print parmap(lambda x:x**x,range(1,5))
【讨论】:
【参考方案19】:据我所知,目前没有解决您的问题的方法:您提供给map()
的函数必须可以通过导入您的模块来访问。这就是robert的代码起作用的原因:函数f()
可以通过导入以下代码得到:
def f(x):
return x*x
class Calculate(object):
def run(self):
p = Pool()
return p.map(f, [1,2,3])
if __name__ == '__main__':
cl = Calculate()
print cl.run()
我实际上添加了一个“主”部分,因为它遵循recommendations for the Windows platform(“确保主模块可以由新的 Python 解释器安全导入而不会导致意外的副作用”)。
我还在Calculate
前面加了一个大写字母,以便跟在PEP 8后面。 :)
【讨论】:
以上是关于多处理:如何在类中定义的函数上使用 Pool.map?的主要内容,如果未能解决你的问题,请参考以下文章