内存使用量随着 Python 的 multiprocessing.pool 不断增长

Posted

技术标签:

【中文标题】内存使用量随着 Python 的 multiprocessing.pool 不断增长【英文标题】:Memory usage keep growing with Python's multiprocessing.pool 【发布时间】:2013-08-27 03:58:30 【问题描述】:

这是程序:

#!/usr/bin/python

import multiprocessing

def dummy_func(r):
    pass

def worker():
    pass

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=16)
    for index in range(0,100000):
        pool.apply_async(worker, callback=dummy_func)

    # clean up
    pool.close()
    pool.join()

我发现内存使用量(VIRT 和 RES)一直在增长,直到 close()/join(),有什么解决方案可以解决这个问题吗?我尝试使用 2.7 的 maxtasksperchild,但它也没有帮助。

我有一个更复杂的程序,它调用 apply_async() 约 6M 次,并且在约 1.5M 点我已经获得了 6G+ RES,为避免所有其他因素,我将程序简化为上述版本。

编辑:

原来这个版本效果更好,感谢大家的意见:

#!/usr/bin/python

import multiprocessing

ready_list = []
def dummy_func(index):
    global ready_list
    ready_list.append(index)

def worker(index):
    return index

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=16)
    result = 
    for index in range(0,1000000):
        result[index] = (pool.apply_async(worker, (index,), callback=dummy_func))
        for ready in ready_list:
            result[ready].wait()
            del result[ready]
        ready_list = []

    # clean up
    pool.close()
    pool.join()

我没有把任何锁放在那里,因为我相信主进程是单线程的(回调或多或少像我阅读的文档中的事件驱动的东西)。

我将 v1 的索引范围更改为 1,000,000,与 v2 相同并进行了一些测试 - 对我来说很奇怪 v2 甚至比 v1 快约 10%(33 秒对 37 秒),也许 v1 做了太多内部列表维护工作。 v2在内存使用上绝对是赢家,它从未超过300M(VIRT)和50M(RES),而v1曾经是370M/120M,最好的是330M/85M。所有数字仅为 3~4 次测试,仅供参考。

【问题讨论】:

这里只是推测,但是排队一百万个对象会占用空间。也许批处理它们会有所帮助。文档不是确定的,但example(搜索测试回调)显示正在等待 apply_async 结果,即使有回调也是如此。清除结果队列可能需要等待。 所以 multiprocessing.pool 可能不适合我,因为回调实际上不做清理工作,是否可以在回调中进行清理?问题是,apply_async() 调用后我无法等待,因为在现实世界中,worker() 每个请求大约需要 0.1 秒(几个 HTTP 请求)。 大胆猜测:apply_asynch 创建了一个 AsynchResult 实例。 Pool 可能对这些对象有一些引用,因为它们必须能够在计算完成时返回结果,但在您的循环中,您只是将它们扔掉。也许你应该在某个时候在异步结果上调用get()wait(),也许使用apply_asynchcallback 参数。 我认为当您覆盖 ready_list 时,EDIT 版本存在竞争条件。有一个线程处理来自AsyncResults (docs.python.org/2/library/…) 的结果并且该线程调用回调。仅仅因为您丢弃了结果,它可能会更快。此外,使用带有小的随机延迟的 time.sleep() 来模拟工作,并在代码中添加睡眠以捕捉竞争条件。 maxtasksperchild似乎已经修复了3.7上apply_async造成的内存泄漏问题。 【参考方案1】:

我最近遇到了内存问题,因为我多次使用多处理功能,所以它不断产生进程,并将它们留在内存中。

这是我现在使用的解决方案:

def myParallelProcess(ahugearray):
    from multiprocessing import Pool
    from contextlib import closing
    with closing(Pool(15)) as p:
        res = p.imap_unordered(simple_matching, ahugearray, 100)
    return res

【讨论】:

在这个问题上花了几天时间解决了我的问题!非常感谢!我在一个循环中创建了一个池,所以我最终产生了太多的进程,每个进程都消耗了大量的内存并且永远不会退出。我只需要在循环结束时执行 mypool.close() with Pool 不会自动关闭吗? 你能解释一下“simple_matching”和“100”的用法吗?【参考方案2】:

只需在循环中创建池并在循环结束时将其关闭 pool.close().

【讨论】:

虽然创建池很昂贵【参考方案3】:

使用map_async 而不是apply_async 以避免过多的内存使用。

对于您的第一个示例,更改以下两行:

for index in range(0,100000):
    pool.apply_async(worker, callback=dummy_func)

pool.map_async(worker, range(100000), callback=dummy_func)

它会在一瞬间完成,然后您才能在 top 中看到它的内存使用情况。将列表更改为更大的列表以查看差异。但请注意map_async 将首先将您传递给它的可迭代对象转换为列表以计算其长度(如果它没有__len__ 方法)。如果您有一个包含大量元素的迭代器,您可以使用itertools.islice 将它们分成更小的块来处理。

我在一个包含更多数据的实际程序中遇到了内存问题,最后发现罪魁祸首是apply_async

P.S.,在内存使用方面,你的两个例子没有明显区别。

【讨论】:

您能否告诉我们为什么map_async 不会引起与apply_async 相同的内存问题?【参考方案4】:

我正在处理一个非常大的 3d 点云数据集。我尝试使用多处理模块来加快处理速度,但我开始出现内存不足错误。经过一些研究和测试后,我确定我正在填充要处理的任务队列,比子流程清空它的速度要快得多。我确定通过分块或使用 map_async 或其他可以调整负载的方法,但我不想对周围的逻辑进行重大更改。

我遇到的愚蠢的解决方案是间歇性地检查pool._cache的长度,如果缓存太大,则等待队列清空。

在我的主循环中,我已经有了一个计数器和一个状态码:

# Update status
count += 1
if count%10000 == 0:
    sys.stdout.write('.')
    if len(pool._cache) > 1e6:
        print "waiting for cache to clear..."
        last.wait() # Where last is assigned the latest ApplyResult

因此,每 10k 插入池中,我检查是否有超过 100 万个操作排队(主进程中使用了大约 1G 的内存)。当队列已满时,我只需等待最后插入的作业完成。

现在我的程序可以运行几个小时而不会耗尽内存。主进程只是偶尔暂停,而工作人员继续处理数据。

顺便说一句,_cache 成员记录在多处理模块池示例中:

#
# Check there are no outstanding tasks
#

assert not pool._cache, 'cache = %r' % pool._cache

【讨论】:

【参考方案5】:

我认为这类似于the question I posted,但我不确定您是否有相同的延迟。我的问题是我从多处理池中产生结果的速度比我消耗它们的速度要快,所以它们在内存中建立起来。为了避免这种情况,我使用semaphore 来限制池中的输入,这样它们就不会比我消耗的输出领先太多。

【讨论】:

以上是关于内存使用量随着 Python 的 multiprocessing.pool 不断增长的主要内容,如果未能解决你的问题,请参考以下文章

netcdf4-python:随着从 netcdf 对象多次调用切片数据,内存增加

Python multiprocessing.Pool:AttributeError

python进程编程

二 python并发编程之多进程实现

Python:内存泄漏调试

Python 学习2