Python multiprocessing.Pool 忽略类方法

Posted

技术标签:

【中文标题】Python multiprocessing.Pool 忽略类方法【英文标题】:Python multiprocessing.Pool ignores class method 【发布时间】:2015-07-01 05:17:29 【问题描述】:

我最近为我的研究编写了一个带有类的程序,并且我尝试将它并行化。当我将 Python 2.7 的 multiprocessing.Process 与 JoinableQueue 和托管数据一起使用时,我的程序最终会因已失效的进程而挂起。

import multiprocessing as mp
import traceback

class Paramfit(object):
    def __init__(self):
        pass

    def _calc_bond(self, index):
        # Calculate data

    def _calc_parallel(self, index):
        self._calc_bond(index)

    def run(self):
        for ts, force in itertools.izip(self.coortrj, self.forcevec):
        try:
            consumers = [mp.Process(target=self._calc_parallel,
                         args=(force,)) for i in range(nprocs)]
            for w in consumers:
                w.start()

            # Enqueue jobs
            for i in range(self.totalsites):
                self.tasks.put(i)

            # Add a poison pill for each consumer
            for i in range(nprocs):
                self.tasks.put(None)

            self.tasks.close()
            self.tasks.join()

    #       for w in consumers:
    #           w.join()
        except:
            traceback.print_exc()

_calc_parallel 调用其他一些类方法。

我什至尝试使用 multiprocessing.Pool 来实现此目的,使用 http://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods 上其他地方的 copy_reg 选项。

import multiprocessing as mp
import traceback

class Paramfit(object):
    def __init__(self):
        pass

    def _calc_bond(self, index):
        # Calculate data

    def _use_force(force):
        # Calculate data

    def _calc_parallel(self, index, force):
        self._calc_bond(index)
        self._use_force(force)

    def run(self):
        try:
            pool = mp.Pool(processes=nprocs, maxtasksperchild=2)
            args = itertools.izip(range(self.totalsites), itertools.repeat(force))
            pool.map_async(self._calc_parallel, args)
            pool.close()
            pool.join()
        except:
            traceback.print_exc()

但是,pool.map_async 似乎没有调用 self._calc_parallel。我知道在这两种情况下(进程和池),我都忽略了一些东西,但我不太清楚是什么。我通常处理超过 40,000 个元素。

感谢您的帮助。

更新

在阅读了其他几篇文章后,我也尝试了 pathos.multiprocessing。

import pathos.multiprocessing as mp
class Paramfit(object):
    def __init__(self):
        pass

    def _calc_bond(self, index):
        # Calculate data

    def _use_force(force):
        # Calculate data

    def _calc_parallel(self, index, force):
        self._calc_bond(index)
        self._use_force(force)

    def run(self):
        try:
            pool = mp.ProcessingPool(nprocs)
            args = itertools.izip(range(self.totalsites), itertools.repeat(force))
            pool.amap(lambda x: self._calc_parallel(*x), args)
        except:
            traceback.print_exc()

而且,与我之前的尝试一样,这似乎也可以在不调用该方法的情况下快速完成。

更新 2

我决定修改代码,将我的庞然大物类拆分成更小、更易于管理的组件。但是,如果我使用 pathos.multiprocessing,我会遇到与之前发布的情况不同的情况(请参阅link)。我的新代码现在有一个可用于计算的对象,然后通过其方法返回一个值。

import itertools
import pandas as pd
import pathos.multiprocessing as mp

class ForceData(object):
    def __init__(self, *args, **kwargs):
        # Setup data
        self.value = pd.DataFrame()
    def calculateBondData(self, index):
        # Calculation
        return self.value
    def calculateNonBondedData(self, index):
        # Calculation
        return self.value
    def calculateAll(self, index):
        # Because self.value is a pandas.DataFrame, changed internally
        self.calculateBondData(index)
        self.calculateNonBondedData(index)
        return self.value

class ForceMatrix(object):
    def __init__(self, *args, **kwargs):
        # Initialize data
        self._matrix = pd.DataFrame()
    def map(self, data):
        for value in data.get():
            for i, j in itertools.product(value.index, repeat=2):
                self._matrix.loc[[i], [j]] += value.values

def calculate(self, *args, **kwargs):
    # Setup initial information.
    fd = ForceData()
    matrix = ForceMatrix()
    pool = mp.ProcessingPool()
    data = pool.amap(fd.calculateAll, range(x))
    matrix.map(data, force)
    return matrix

我认为是一个单独的函数func(dataobj, force),但这似乎也无济于事。按照目前的速度,我估计单个处理器上的完整计算需要 1000 多个小时,这对于应该更快的东西来说太长了。

更新 3(2015 年 4 月 30 日)

由于@MikeMcKerns 有用的见解,我可能已经确定了一个可能的解决方案。在 iMac(四核)或集群的 16 核节点上,我发现对于没有键的粗粒度(CG)系统,双 itertools.imap 似乎是我最好的解决方案(1000 CG站点)以每个轨迹帧大约 5.2 秒的速度进入。当我进入一个包含一些键细节的系统(3000 个代表水的 CG 站点)时,我发现在 iMac(使用 1 个内核)上,itertools.imap 后跟 pathos.ThreadingPool.uimap(4 个线程)的时钟频率约为 85 秒/框架;如果我按照@MikeMcKerns 在 cmets 中的建议尝试进程池(4 核 x 2)/线程池(4 线程),计算时间增加了 2.5 倍。在 16 核集群(32 pp/16 tp)上,这个 CG 系统运行速度也很慢(大约 160 秒/帧)。在 iMac(1 核/4 线程)上具有 42,778 个站点和众多键的 CG 系统可能会以大约 58 分钟/帧的速度运行。我还没有在集群的 16 核节点上测试这个大型系统,但我不确定是否使用进程池/线程池来进一步加快速度。

例子:

# For a CG system with no bond details
for i in range(nframes):
    data1 = itertools.imap(func1, range(nsites))
    data2 = itertools.imap(func2, data1)
    for values in data2:
        func3(values)

# For a system with bond details
import pathos.multiprocessing as mp

tpool = mp.ThreadingPool(mp.cpu_count())
for i in range(nframes):
    data1 = itertools.imap(func1, range(nsites))
    data2 = tpool.uimap(func2, data1)
    for values in data2:
        func3(values)

# Seems to be the slowest in the bunch on iMac and possibly on 16-cores of a node.
ppool = mp.ProcessingPool(mp.cpu_count() * 2)
tpool = mp.ThreadingPool(mp.cpu_count())
for i in range(nframes):
    data1 = ppool.uimap(func1, range(nsites))
    data2 = tpool.uimap(func2, data1)
    for values in data2:
        func3(values)

我怀疑系统越大,我从多处理中获得的好处就越多。我知道大型 CG 系统(42,778 个站点)大约需要 0.08 秒/站点,而 0.02 秒/站点(3000 个 CG 站点)或 0.05 秒/站点(1000 个站点无键)。

在我努力减少计算时间的过程中,我发现了可以减少一些计算的区域(例如,global 变量和算法更改),但如果我可以通过全面的多线程处理进一步减少它,那就太好了。

【问题讨论】:

如果您可以使示例代码最小化(例如,删除大多数与意外行为无关的方法主体),这将使您更轻松 @tobyodavies,我可以做到。我最初把它缩小了,但认为有人可能想知道其余的。我绝对可以删除一些。谢谢。 您似乎期望子进程在父进程中产生副作用。那正确吗?也就是说,子进程以某种方式修改自身,而不是返回其计算数据。您无法通过多处理来做到这一点,并且需要重新考虑您的解决方案。如果是这种情况,那么我将向您展示如何执行此操作的最小工作示例。 @Dunes,所以您是说我应该从每个方法返回值然后进行最终处理,而不是使用托管变量(mp.Manager.dict 等)?在我的方法中,我传递的参数之一是 pandas.DataFrame,但如果我想收集它,我也有返回相同 DataFrame 的位置,但我的一种方法处理数据并将其存储在托管数据结构中。但是,我想这是不正确的想法。感谢您提供的任何见解。 嗨蒂姆,我是pathos作者。你似乎在调用pool.amap,它应该返回一个结果对象result,你没有'保存在任何地方。然后您需要调用result.get() 来获得结果。还有imap,它返回一个迭代器,以及普通的旧map,它直接返回一个计算值列表。 amap 是异步的,所以它不应该阻塞在 map 调用上——它会阻塞在 get 处。如果您想在地图上阻止,请使用map 【参考方案1】:

如果您使用的是 python 2.7,您的选择相当有限

这是一个使用池参数调用对象方法的简短示例。

第一个问题是只有在模块顶层定义的函数才能被pickle。基于 Unix 的系统有办法绕过这个限制,但不应该依赖这个。所以你必须定义一个函数来获取你想要的对象和调用相关方法所需的参数。

例如:

def do_square(args):
    squarer, x = args # unpack args
    return squarer.square(x)

Squarer 类可能如下所示:

class Squarer(object):
    def square(self, x):
        return x * x

现在并行应用平方函数。

if __name__ == "__main__":
    # all pool work must be done inside the if statement otherwise a recursive 
    # cycle of Pool spawning will be created.

    pool = Pool()
    sq = Squarer()
    # create args as a sequence of tuples
    args = [(sq, i) for i in range(10)]
    print pool.map(do_square, args)

    pool.close()
    pool.join()

请注意,平方器不是有状态的。它接收数据、处理数据并返回结果。这是因为孩子的状态与父母的状态是分开的。除非使用多处理提供的队列、管道或其他共享状态类,否则其中一个的更改不会反映在另一个中。一般来说,最好从子进程返回计算数据,然后在父进程中处理该数据,而不是尝试将数据存储在子进程可以访问的某个共享变量中。

有状态平方器不支持多处理的示例:

class StatefulSquarer(object):

    def __init__(self):
        self.results = []

    def square(self, x):
        self.results.append(x * x)

if __name__ == "__main__":

    print("without pool")
    sq = StatefulSquarer()
    map(do_square, [(sq, i) for i in range(10)])
    print(sq.results)

    print("with pool")
    pool = Pool()
    sq = StatefulSquarer()
    pool.map(do_square, [(sq, i) for i in range(10)])
    print(sq.results)

    pool.close()
    pool.join()

如果您想完成这项工作,那么更好的解决方案是:

for result in pool.map(do_square, [(sq, i) for i in range(10)]):
    sq.results.append(result)

如果你的类很大,但不可变怎么办。每次在 map 中启动新任务时,都必须将这个巨大的对象复制到子进程中。但是,您可以通过仅将其复制到子进程一次来节省时间。

from multiprocessing import Pool

def child_init(sq_):
    global sq
    sq = sq_

def do_square(x):
    return sq.square(x)

class Squarer(object):
    def square(self, x):
        return x * x

if __name__ == "__main__":
    sq = Squarer()
    pool = Pool(initializer=child_init, initargs=(sq,))

    print(pool.map(do_square, range(10)))

    pool.close()
    pool.join()

【讨论】:

如何通过在我的示例代码中调用类中的 Pool 来工作?我从一种方法中调用 Pool 来调用另一种方法。我读到 pathos 可能会提供一个解决方案,但是,正如我在更新中指出的那样,这似乎不起作用。将 multiprocessing.Process 与 JoinableQueue 一起使用会更好吗?如果是这样,我该如何避免失效进程问题? 如果您一直关注我与 Mike 的讨论并阅读了我的最新更新,您会注意到我可能已经找到了解决方案。但是,我想让您知道您的解决方案很有帮助。这个游戏让我对我正在从事的另一个项目有一些想法,虽然你的回答可能不是我当前问题的解决方案,但它可能适用于另一个项目。再次感谢您的回答;我很感激。

以上是关于Python multiprocessing.Pool 忽略类方法的主要内容,如果未能解决你的问题,请参考以下文章

Python代写,Python作业代写,代写Python,代做Python

Python开发

Python,python,python

Python 介绍

Python学习之认识python

python初识