多处理:如何在类中定义的函数上使用 Pool.map?

Posted

技术标签:

【中文标题】多处理:如何在类中定义的函数上使用 Pool.map?【英文标题】:Multiprocessing: How to use Pool.map on a function defined in a class? 【发布时间】:2011-03-18 08:17:22 【问题描述】:

当我运行类似的东西时:

from multiprocessing import Pool

p = Pool(5)
def f(x):
     return x*x

p.map(f, [1,2,3])

它工作正常。然而,把它作为一个类的函数:

class calculate(object):
    def run(self):
        def f(x):
            return x*x

        p = Pool()
        return p.map(f, [1,2,3])

cl = calculate()
print cl.run()

给我以下错误:

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner
    self.run()
  File "/sw/lib/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

我看过 Alex Martelli 的一篇帖子处理了同样的问题,但不够明确。

【问题讨论】:

“这是一个类的函数”?你能发布实际得到实际错误的代码吗?如果没有实际代码,我们只能猜测您做错了什么。 一般来说,存在比 Python 的标准 pickle 模块更强大的 pickling 模块(如 this answer 中提到的 picloud 模块)。 我在IPython.Parallel 中遇到了类似的闭包问题,但是您可以通过将对象推送到节点来解决这个问题。用多处理解决这个问题似乎很烦人。 这里calculate 是可腌制的,所以这似乎可以通过以下方式解决:1)使用复制calculate 实例的构造函数创建一个函数对象,然后2)传递这个实例Poolmap 方法的函数对象。没有? @math 我不相信 Python 的任何“最近的变化”会有任何帮助。 multiprocessing 模块的一些限制是由于它的目标是跨平台实现,并且在 Windows 中缺少类似 fork(2) 的系统调用。如果您不关心 Win32 支持,可能有更简单的基于流程的解决方法。或者,如果您准备使用线程而不是进程,则可以将 from multiprocessing import Pool 替换为 from multiprocessing.pool import ThreadPool as Pool 【参考方案1】:

您可以简单地将self 绑定到具有partial(MyClass.worker, self) 的成员函数并调用它:

import multiprocessing as mp
from functools import partial

class MyClass():
    def __init__(self):
        self.a = 0.1
    
    def function(self, x):
        print(self.a * x, self)
    
    def run(self):
        self.a = 1.0
        f = partial(MyClass.function, self)
        f(1)
        with mp.Pool(2) as p:
            p.map(f, [2, 3, 4, 5])
        print(self)

c = MyClass()
c.run()

输出:

1.0 <__main__.MyClass object at 0x7f2cec1d80f0>
3.0 <__main__.MyClass object at 0x7f2cec1db128>
2.0 <__main__.MyClass object at 0x7f2cec1db160>
4.0 <__main__.MyClass object at 0x7f2cec1db0f0>
5.0 <__main__.MyClass object at 0x7f2cec1db3c8>
<__main__.MyClass object at 0x7f2cec1d80f0>

似乎Pool.map 创建了MyClass 对象的副本。因此,您不能写入实例c 的成员。

【讨论】:

【参考方案2】:

我还对 pool.map 可以接受的函数类型的限制感到恼火。我写了以下内容来规避这一点。它似乎可以工作,即使对于 parmap 的递归使用也是如此。

from multiprocessing import Process, Pipe
from itertools import izip

def spawn(f):
    def fun(pipe, x):
        pipe.send(f(x))
        pipe.close()
    return fun

def parmap(f, X):
    pipe = [Pipe() for x in X]
    proc = [Process(target=spawn(f), args=(c, x)) for x, (p, c) in izip(X, pipe)]
    [p.start() for p in proc]
    [p.join() for p in proc]
    return [p.recv() for (p, c) in pipe]

if __name__ == '__main__':
    print parmap(lambda x: x**x, range(1, 5))

【讨论】:

这对我来说效果很好,谢谢。我发现了一个弱点:我尝试在一些传递 defaultdict 并再次得到 PicklingError 的函数上使用 parmap。我没有找到解决方案,我只是修改了我的代码以不使用 defaultdict。 这在 Python 2.7.2 中不起作用(默认,2011 年 6 月 12 日,15:08:59)[MSC v.1500 32 bit (Intel)] on win32 这确实适用于 Python 2.7.3 2012 年 8 月 1 日 05:14:39。这不适用于巨型迭代 -> 它会导致 OSError: [Errno 24] Too many open files due to the number of pipelines. 此解决方案为每个工作项生成一个进程。下面“克劳斯”的解决方案效率更高。 是我还是这个解决方案中没有类?那么它是否回答了最初的问题?【参考方案3】:

这是我为在 python3 中使用多处理池而编写的样板,特别是 python3.7.7 用于运行测试。我使用imap_unordered 获得了最快的运行速度。只需插入您的场景并尝试一下。您可以使用timeit 或仅使用time.time() 来找出最适合您的方法。

import multiprocessing
import time

NUMBER_OF_PROCESSES = multiprocessing.cpu_count()
MP_FUNCTION = 'starmap'  # 'imap_unordered' or 'starmap' or 'apply_async'

def process_chunk(a_chunk):
    print(f"processig mp chunk a_chunk")
    return a_chunk


map_jobs = [1, 2, 3, 4]

result_sum = 0

s = time.time()
if MP_FUNCTION == 'imap_unordered':
    pool = multiprocessing.Pool(processes=NUMBER_OF_PROCESSES)
    for i in pool.imap_unordered(process_chunk, map_jobs):
        result_sum += i
elif MP_FUNCTION == 'starmap':
    pool = multiprocessing.Pool(processes=NUMBER_OF_PROCESSES)
    try:
        map_jobs = [(i, ) for i in map_jobs]
        result_sum = pool.starmap(process_chunk, map_jobs)
        result_sum = sum(result_sum)
    finally:
        pool.close()
        pool.join()
elif MP_FUNCTION == 'apply_async':
    with multiprocessing.Pool(processes=NUMBER_OF_PROCESSES) as pool:
        result_sum = [pool.apply_async(process_chunk, [i, ]).get() for i in map_jobs]
    result_sum = sum(result_sum)
print(f"result_sum is result_sum, took time.time() - ss")

在上述情况下,imap_unordered 实际上对我来说似乎表现最差。试用您的案例并在您计划运行它的机器上对其进行基准测试。另请阅读Process Pools。干杯!

【讨论】:

【参考方案4】:

我知道这个问题是在 8 年零 10 个月前提出的,但我想向您展示我的解决方案:

from multiprocessing import Pool

class Test:

    def __init__(self):
        self.main()

    @staticmethod
    def methodForMultiprocessing(x):
        print(x*x)

    def main(self):
        if __name__ == "__main__":
            p = Pool()
            p.map(Test.methodForMultiprocessing, list(range(1, 11)))
            p.close()

TestObject = Test()

你只需要将你的类函数变成一个静态方法。但也可以使用类方法:

from multiprocessing import Pool

class Test:

    def __init__(self):
        self.main()

    @classmethod
    def methodForMultiprocessing(cls, x):
        print(x*x)

    def main(self):
        if __name__ == "__main__":
            p = Pool()
            p.map(Test.methodForMultiprocessing, list(range(1, 11)))
            p.close()

TestObject = Test()

在 Python 3.7.3 中测试

【讨论】:

【参考方案5】:

这可能不是一个很好的解决方案,但就我而言,我是这样解决的。

from multiprocessing import Pool

def foo1(data):
    self = data.get('slf')
    lst = data.get('lst')
    return sum(lst) + self.foo2()

class Foo(object):
    def __init__(self, a, b):
        self.a = a
        self.b = b

    def foo2(self):
        return self.a**self.b   

    def foo(self):
        p = Pool(5)
        lst = [1, 2, 3]
        result = p.map(foo1, (dict(slf=self, lst=lst),))
        return result

if __name__ == '__main__':
    print(Foo(2, 4).foo())

我必须将self 传递给我的函数,因为我必须通过该函数访问我的类的属性和函数。这对我有用。随时欢迎更正和建议。

【讨论】:

【参考方案6】:

如果您以某种方式手动忽略类中对象列表中的Pool 对象,则可以毫无问题地运行您的代码,因为它不是pickleable,如错误所述。您可以使用__getstate__ 函数(也可以查看here)执行此操作,如下所示。 Pool 对象将尝试查找 __getstate____setstate__ 函数并在您运行 mapmap_async 等时执行它们:

class calculate(object):
    def __init__(self):
        self.p = Pool()
    def __getstate__(self):
        self_dict = self.__dict__.copy()
        del self_dict['p']
        return self_dict
    def __setstate__(self, state):
        self.__dict__.update(state)

    def f(self, x):
        return x*x
    def run(self):
        return self.p.map(self.f, [1,2,3])

然后做:

cl = calculate()
cl.run()

会给你输出:

[1, 4, 9]

我已经在 Python 3.x 中测试了上面的代码,它可以工作。

【讨论】:

非常有趣的方法,它确实有效,但由于某种原因,它比仅在全局范围内定义谓词函数要慢几个数量级。【参考方案7】:

来自http://www.rueckstiess.net/research/snippets/show/ca1d7d90 和http://qingkaikong.blogspot.com/2016/12/python-parallel-method-in-class.html

我们可以创建一个外部函数并使用类 self 对象对其进行播种:

from joblib import Parallel, delayed
def unwrap_self(arg, **kwarg):
    return square_class.square_int(*arg, **kwarg)

class square_class:
    def square_int(self, i):
        return i * i

    def run(self, num):
        results = []
        results = Parallel(n_jobs= -1, backend="threading")\
            (delayed(unwrap_self)(i) for i in zip([self]*len(num), num))
        print(results)

或者没有 joblib:

from multiprocessing import Pool
import time

def unwrap_self_f(arg, **kwarg):
    return C.f(*arg, **kwarg)

class C:
    def f(self, name):
        print 'hello %s,'%name
        time.sleep(5)
        print 'nice to meet you.'

    def run(self):
        pool = Pool(processes=2)
        names = ('frank', 'justin', 'osi', 'thomas')
        pool.map(unwrap_self_f, zip([self]*len(names), names))

if __name__ == '__main__':
    c = C()
    c.run()

【讨论】:

【参考方案8】:

这是我的解决方案,我认为它比这里的大多数其他解决方案要少一些。和夜猫子的回答差不多。

someclasses = [MyClass(), MyClass(), MyClass()]

def method_caller(some_object, some_method='the method'):
    return getattr(some_object, some_method)()

othermethod = partial(method_caller, some_method='othermethod')

with Pool(6) as pool:
    result = pool.map(othermethod, someclasses)

【讨论】:

【参考方案9】:

我无法使用到目前为止发布的代码,因为使用“multiprocessing.Pool”的代码不适用于 lambda 表达式,并且不使用“multiprocessing.Pool”的代码会产生与工作项一样多的进程。

我修改了代码 s.t.它产生预定义数量的工作人员,并且仅在存在空闲工作人员时才遍历输入列表。我还为工人 s.t. 启用了“守护程序”模式。 ctrl-c 按预期工作。

import multiprocessing


def fun(f, q_in, q_out):
    while True:
        i, x = q_in.get()
        if i is None:
            break
        q_out.put((i, f(x)))


def parmap(f, X, nprocs=multiprocessing.cpu_count()):
    q_in = multiprocessing.Queue(1)
    q_out = multiprocessing.Queue()

    proc = [multiprocessing.Process(target=fun, args=(f, q_in, q_out))
            for _ in range(nprocs)]
    for p in proc:
        p.daemon = True
        p.start()

    sent = [q_in.put((i, x)) for i, x in enumerate(X)]
    [q_in.put((None, None)) for _ in range(nprocs)]
    res = [q_out.get() for _ in range(len(sent))]

    [p.join() for p in proc]

    return [x for i, x in sorted(res)]


if __name__ == '__main__':
    print(parmap(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8]))

【讨论】:

你如何获得一个进度条来正确使用这个parmap函数? 一个问题——我使用了这个解决方案,但注意到我生成的 python 进程在内存中保持活动状态。关于如何在你的 parmap 退出时杀死那些人有什么快速的想法吗? @klaus-se 我知道我们不鼓励在 cmets 中说谢谢,但你的回答对我来说太有价值了,我无法抗拒。我希望我能给你的不仅仅是一个声誉...... @greole 将(None, None) 作为最后一项传递给fun,表明它已到达每个进程的项目序列的末尾。 @deshtop:如果你自己有足够的声誉,你可以得到赏金:-)【参考方案10】:

除非您跳出标准库,否则多处理和酸洗会受到破坏和限制。

如果使用multiprocessing 的fork,称为pathos.multiprocesssing,则可以直接在多处理的map 函数中使用类和类方法。这是因为dill 被用来代替picklecPickle,而dill 几乎可以在python 中序列化任何东西。

pathos.multiprocessing 还提供了一个异步映射函数……它可以map 具有多个参数的函数(例如map(math.pow, [1,2,3], [4,5,6])

查看讨论: What can multiprocessing and dill do together?

和: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization

它甚至可以处理您最初编写的代码,无需修改,并且来自解释器。为什么还要做其他更脆弱和特定于单个案例的代码?

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> class calculate(object):
...  def run(self):
...   def f(x):
...    return x*x
...   p = Pool()
...   return p.map(f, [1,2,3])
... 
>>> cl = calculate()
>>> print cl.run()
[1, 4, 9]

在此处获取代码: https://github.com/uqfoundation/pathos

而且,只是为了展示它的更多功能:

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> 
>>> p = Pool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> res = p.amap(t.plus, x, y)
>>> res.get()
[4, 6, 8, 10]

【讨论】:

pathos.multiprocessing 还有一个异步映射 (amap),可以使用进度条和其他异步编程。 我喜欢 pathos.multiprocessing,在享受多重处理的同时,它几乎可以替代非并行地图。我有一个简单的 pathos.multiprocessing.map 包装器,这样在处理跨多个内核的只读大型数据结构时,它的内存效率更高,请参阅this git repository。 看起来很有趣,但它没有安装。这是 pip 给出的消息:Could not find a version that satisfies the requirement pp==1.5.7-pathos (from pathos) 是的。我已经有一段时间没有发布了,因为我已经将功能拆分为单独的包,并且还转换为 2/3 兼容代码。以上大部分内容已在multiprocess 中进行了模块化,兼容 2/3。见***.com/questions/27873093/… 和pypi.python.org/pypi/multiprocess。 @xApple:作为后续,pathos 有了一个新的稳定版本,并且兼容 2.x 和 3.x。【参考方案11】:

我知道这是 6 年前提出的问题,但只是想添加我的解决方案,因为上面的一些建议看起来非常复杂,但我的解决方案实际上非常简单。

我所要做的就是将 pool.map() 调用包装到一个辅助函数中。将类对象与方法的参数一起作为元组传递,看起来有点像这样。

def run_in_parallel(args):
    return args[0].method(args[1])

myclass = MyClass()
method_args = [1,2,3,4,5,6]
args_map = [ (myclass, arg) for arg in method_args ]
pool = Pool()
pool.map(run_in_parallel, args_map)

【讨论】:

【参考方案12】:
class Calculate(object):
  # Your instance method to be executed
  def f(self, x, y):
    return x*y

if __name__ == '__main__':
  inp_list = [1,2,3]
  y = 2
  cal_obj = Calculate()
  pool = Pool(2)
  results = pool.map(lambda x: cal_obj.f(x, y), inp_list)

您可能希望对类的每个不同实例应用此函数。那么这也是解决方案

class Calculate(object):
  # Your instance method to be executed
  def __init__(self, x):
    self.x = x

  def f(self, y):
    return self.x*y

if __name__ == '__main__':
  inp_list = [Calculate(i) for i in range(3)]
  y = 2
  pool = Pool(2)
  results = pool.map(lambda x: x.f(y), inp_list)

【讨论】:

【参考方案13】:

我接受了 klaus se 和 aganders3 的回答,并制作了一个文档化的模块,该模块更具可读性并保存在一个文件中。您可以将其添加到您的项目中。它甚至还有一个可选的进度条!

"""
The ``processes`` module provides some convenience functions
for using parallel processes in python.

Adapted from http://***.com/a/16071616/287297

Example usage:

    print prll_map(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8], 32, verbose=True)

Comments:

"It spawns a predefined amount of workers and only iterates through the input list
 if there exists an idle worker. I also enabled the "daemon" mode for the workers so
 that KeyboardInterupt works as expected."

Pitfalls: all the stdouts are sent back to the parent stdout, intertwined.

Alternatively, use this fork of multiprocessing: 
https://github.com/uqfoundation/multiprocess
"""

# Modules #
import multiprocessing
from tqdm import tqdm

################################################################################
def apply_function(func_to_apply, queue_in, queue_out):
    while not queue_in.empty():
        num, obj = queue_in.get()
        queue_out.put((num, func_to_apply(obj)))

################################################################################
def prll_map(func_to_apply, items, cpus=None, verbose=False):
    # Number of processes to use #
    if cpus is None: cpus = min(multiprocessing.cpu_count(), 32)
    # Create queues #
    q_in  = multiprocessing.Queue()
    q_out = multiprocessing.Queue()
    # Process list #
    new_proc  = lambda t,a: multiprocessing.Process(target=t, args=a)
    processes = [new_proc(apply_function, (func_to_apply, q_in, q_out)) for x in range(cpus)]
    # Put all the items (objects) in the queue #
    sent = [q_in.put((i, x)) for i, x in enumerate(items)]
    # Start them all #
    for proc in processes:
        proc.daemon = True
        proc.start()
    # Display progress bar or not #
    if verbose:
        results = [q_out.get() for x in tqdm(range(len(sent)))]
    else:
        results = [q_out.get() for x in range(len(sent))]
    # Wait for them to finish #
    for proc in processes: proc.join()
    # Return results #
    return [x for i, x in sorted(results)]

################################################################################
def test():
    def slow_square(x):
        import time
        time.sleep(2)
        return x**2
    objs    = range(20)
    squares = prll_map(slow_square, objs, 4, verbose=True)
    print "Result: %s" % squares

编辑:添加了@alexander-mcfarlane 建议和测试功能

【讨论】:

进度条的一个问题... 该进度条仅衡量工作负载在处理器之间分配的效率如何。如果工作负载被完美分割,那么所有处理器将同时join(),您将在tqdm 显示中完成100% 的闪烁。唯一有用的是每个处理器都有一个有偏差的工作负载 移动 tqdm() 换行:result = [q_out.get() for _ in tqdm(sent)] 效果更好 - 付出了很大的努力,但非常感谢 +1 感谢您的建议,我会尝试然后更新答案! 答案更新了,进度条效果好多了! 我不知道为什么,但是尝试这个 sn-p 时出错! _pickle.PicklingError: Can't pickle &lt;function &lt;lambda&gt; at 0x000001717B311E18&gt;: attribute lookup &lt;lambda&gt; on __main__ failed【参考方案14】:

我不确定是否已采用这种方法,但我正在使用的解决方法是:

from multiprocessing import Pool

t = None

def run(n):
    return t.f(n)

class Test(object):
    def __init__(self, number):
        self.number = number

    def f(self, x):
        print x * self.number

    def pool(self):
        pool = Pool(2)
        pool.map(run, range(10))

if __name__ == '__main__':
    t = Test(9)
    t.pool()
    pool = Pool(2)
    pool.map(run, range(10))

输出应该是:

0
9
18
27
36
45
54
63
72
81
0
9
18
27
36
45
54
63
72
81

【讨论】:

【参考方案15】:

我修改了 klaus se 的方法,因为当它适用于我的小列表时,当项目数约为 1000 或更多时它会挂起。我没有使用None 停止条件一次推送一个作业,而是一次加载所有输入队列,然后让进程咀嚼它直到它为空。

from multiprocessing import cpu_count, Queue, Process

def apply_func(f, q_in, q_out):
    while not q_in.empty():
        i, x = q_in.get()
        q_out.put((i, f(x)))

# map a function using a pool of processes
def parmap(f, X, nprocs = cpu_count()):
    q_in, q_out   = Queue(), Queue()
    proc = [Process(target=apply_func, args=(f, q_in, q_out)) for _ in range(nprocs)]
    sent = [q_in.put((i, x)) for i, x in enumerate(X)]
    [p.start() for p in proc]
    res = [q_out.get() for _ in sent]
    [p.join() for p in proc]

    return [x for i,x in sorted(res)]

编辑:不幸的是,现在我在我的系统上遇到了这个错误:Multiprocessing Queue maxsize limit is 32767,希望那里的解决方法会有所帮助。

【讨论】:

【参考方案16】:

在类中定义的函数(甚至在类中的函数内)并没有真正腌制。但是,这是可行的:

def f(x):
    return x*x

class calculate(object):
    def run(self):
        p = Pool()
    return p.map(f, [1,2,3])

cl = calculate()
print cl.run()

【讨论】:

谢谢,但我觉得在类外定义函数有点脏。该类应捆绑完成给定任务所需的所有内容。 @Memoz:“课程应该捆绑所有它需要的东西”真的吗?我找不到很多这样的例子。大多数类依赖于其他类或函数。为什么将类依赖称为“脏”?依赖有什么问题? 好吧,函数不应该修改现有的类数据——因为它会修改其他进程中的版本——所以它可以是一个静态方法。您可以腌制一个静态方法:***.com/questions/1914261/… 或者,对于这种微不足道的事情,您可以使用 lambda。【参考方案17】:

mrule的解决方法是正确的,但是有个bug:如果child发回大量数据,会填满pipe的buffer,阻塞child的pipe.send(),而parent在等待child退出在pipe.join()。解决方案是在join()ing 孩子之前读取孩子的数据。此外,孩子应该关闭父母的管道末端以防止死锁。下面的代码解决了这个问题。另请注意,parmap 会为X 中的每个元素创建一个进程。更高级的解决方案是使用multiprocessing.cpu_count()X 分成若干个chunk,然后将结果合并后再返回。我将其作为练习留给读者,以免破坏 mrule 好答案的简洁性。 ;)

from multiprocessing import Process, Pipe
from itertools import izip

def spawn(f):
    def fun(ppipe, cpipe,x):
        ppipe.close()
        cpipe.send(f(x))
        cpipe.close()
    return fun

def parmap(f,X):
    pipe=[Pipe() for x in X]
    proc=[Process(target=spawn(f),args=(p,c,x)) for x,(p,c) in izip(X,pipe)]
    [p.start() for p in proc]
    ret = [p.recv() for (p,c) in pipe]
    [p.join() for p in proc]
    return ret

if __name__ == '__main__':
    print parmap(lambda x:x**x,range(1,5))

【讨论】:

进程数如何选择? 但是由于错误OSError: [Errno 24] Too many open files,它很快就死了。我认为需要对进程数量进行某种限制才能正常工作......【参考方案18】:

我也为此苦苦挣扎。作为一个简化的示例,我将函数作为类的数据成员:

from multiprocessing import Pool
import itertools
pool = Pool()
class Example(object):
    def __init__(self, my_add): 
        self.f = my_add  
    def add_lists(self, list1, list2):
        # Needed to do something like this (the following line won't work)
        return pool.map(self.f,list1,list2)  

我需要在同一个类中的 Pool.map() 调用中使用函数 self.f,而 self.f 没有将元组作为参数。由于此函数嵌入在一个类中,我不清楚如何编写其他答案建议的包装器类型。

我通过使用不同的包装器解决了这个问题,该包装器采用元组/列表,其中第一个元素是函数,其余元素是该函数的参数,称为 eval_func_tuple(f_args)。使用它,有问题的行可以替换为 return pool.map(eval_func_tuple, itertools.izip(itertools.repeat(self.f), list1, list2))。完整代码如下:

文件:util.py

def add(a, b): return a+b

def eval_func_tuple(f_args):
    """Takes a tuple of a function and args, evaluates and returns result"""
    return f_args[0](*f_args[1:])  

文件:main.py

from multiprocessing import Pool
import itertools
import util  

pool = Pool()
class Example(object):
    def __init__(self, my_add): 
        self.f = my_add  
    def add_lists(self, list1, list2):
        # The following line will now work
        return pool.map(util.eval_func_tuple, 
            itertools.izip(itertools.repeat(self.f), list1, list2)) 

if __name__ == '__main__':
    myExample = Example(util.add)
    list1 = [1, 2, 3]
    list2 = [10, 20, 30]
    print myExample.add_lists(list1, list2)  

运行 main.py 将给出 [11, 22, 33]。随意改进这一点,例如 eval_func_tuple 也可以修改为采用关键字参数。

另一方面,在另一个答案中,对于更多进程而不是可用 CPU 数量的情况,可以使函数“parmap”更有效。我正在复制下面的编辑版本。这是我的第一篇文章,我不确定是否应该直接编辑原始答案。我还重命名了一些变量。

from multiprocessing import Process, Pipe  
from itertools import izip  

def spawn(f):  
    def fun(pipe,x):  
        pipe.send(f(x))  
        pipe.close()  
    return fun  

def parmap(f,X):  
    pipe=[Pipe() for x in X]  
    processes=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)]  
    numProcesses = len(processes)  
    processNum = 0  
    outputList = []  
    while processNum < numProcesses:  
        endProcessNum = min(processNum+multiprocessing.cpu_count(), numProcesses)  
        for proc in processes[processNum:endProcessNum]:  
            proc.start()  
        for proc in processes[processNum:endProcessNum]:  
            proc.join()  
        for proc,c in pipe[processNum:endProcessNum]:  
            outputList.append(proc.recv())  
        processNum = endProcessNum  
    return outputList    

if __name__ == '__main__':  
    print parmap(lambda x:x**x,range(1,5))         

【讨论】:

【参考方案19】:

据我所知,目前没有解决您的问题的方法:您提供给map() 的函数必须可以通过导入您的模块来访问。这就是robert的代码起作用的原因:函数f()可以通过导入以下代码得到:

def f(x):
    return x*x

class Calculate(object):
    def run(self):
        p = Pool()
        return p.map(f, [1,2,3])

if __name__ == '__main__':
    cl = Calculate()
    print cl.run()

我实际上添加了一个“主”部分,因为它遵循recommendations for the Windows platform(“确保主模块可以由新的 Python 解释器安全导入而不会导致意外的副作用”)。

我还在Calculate前面加了一个大写字母,以便跟在PEP 8后面。 :)

【讨论】:

以上是关于多处理:如何在类中定义的函数上使用 Pool.map?的主要内容,如果未能解决你的问题,请参考以下文章

多处理具有多个输入的函数

使用在类中定义的有返回值的函数,如何得到执行结果反馈?

如何在类向量中存储/使用外部函数指针

如何在封装在类中的回调函数上发送指针

python_如何在类中定义装饰器

使用变量在类中存储自定义函数