在不耗尽 RAM 的情况下使用并发期货

Posted

技术标签:

【中文标题】在不耗尽 RAM 的情况下使用并发期货【英文标题】:Using Concurrent Futures without running out of RAM 【发布时间】:2016-04-18 15:11:10 【问题描述】:

我正在做一些文件解析,这是一个 CPU 密集型任务。无论我在这个过程中扔了多少文件,它使用的 RAM 都不超过 50MB。 该任务是可并行的,我已将其设置为使用下面的并发期货将每个文件解析为单独的进程:

    from concurrent import futures
    with futures.ProcessPoolExecutor(max_workers=6) as executor:
        # A dictionary which will contain a list the future info in the key, and the filename in the value
        jobs = 

        # Loop through the files, and run the parse function for each file, sending the file-name to it.
        # The results of can come back in any order.
        for this_file in files_list:
            job = executor.submit(parse_function, this_file, **parser_variables)
            jobs[job] = this_file

        # Get the completed jobs whenever they are done
        for job in futures.as_completed(jobs):

            # Send the result of the file the job is based on (jobs[job]) and the job (job.result)
            results_list = job.result()
            this_file = jobs[job]

            # delete the result from the dict as we don't need to store it.
            del jobs[job]

            # post-processing (putting the results into a database)
            post_process(this_file, results_list)

问题是,当我使用期货运行它时,RAM 使用量猛增,不久我就用完了,Python 崩溃了。这可能在很大程度上是因为 parse_function 的结果大小为几 MB。一旦结果通过post_processing,应用程序就不再需要它们。如您所见,我正在尝试del jobs[job]jobs 中清除项目,但这没有任何区别,内存使用量保持不变,并且似乎以相同的速度增加。

我还确认这不是因为它仅使用单个进程在等待post_process 函数,并抛出time.sleep(1)

futures 文档中没有关于内存管理的任何内容,虽然简短的搜索表明它之前已经出现在 future 的实际应用中(Clear memory in python loop 和 http://grokbase.com/t/python/python-list/1458ss5etz/real-world-use-of-concurrent-futures) - 答案并没有转化为我的使用-case(他们都关心超时等)。

那么,如何? (Python 3.5)

【问题讨论】:

【参考方案1】:

我会试一试(可能猜错了……)

您可能需要一点一点地提交您的工作,因为每次提交时您都在制作 parser_variables 的副本,这可能最终会占用您的 RAM。

这是有趣的部分带有“

with futures.ProcessPoolExecutor(max_workers=6) as executor:
    # A dictionary which will contain a list the future info in the key, and the filename in the value
    jobs = 

    # Loop through the files, and run the parse function for each file, sending the file-name to it.
    # The results of can come back in any order.
    files_left = len(files_list) #<----
    files_iter = iter(files_list) #<------

    while files_left:
        for this_file in files_iter:
            job = executor.submit(parse_function, this_file, **parser_variables)
            jobs[job] = this_file
            if len(jobs) > MAX_JOBS_IN_QUEUE:
                break #limit the job submission for now job

        # Get the completed jobs whenever they are done
        for job in futures.as_completed(jobs):

            files_left -= 1 #one down - many to go...   <---

            # Send the result of the file the job is based on (jobs[job]) and the job (job.result)
            results_list = job.result()
            this_file = jobs[job]

            # delete the result from the dict as we don't need to store it.
            del jobs[job]

            # post-processing (putting the results into a database)
            post_process(this_file, results_list)
            break; #give a chance to add more jobs <-----

【讨论】:

很好的答案,谢谢。这很好地解决了它,峰值 RAM 使用量飙升至约 140MB;我从不认为输入是问题(你是对的,它们也非常大)。 (那是在花了 20 分钟想知道为什么你的不是真正的多处理之后,你过度缩进了 for job in... 行,所以它是 for this_file in... 的子代(现在更正)。注意 Python设计师:关键语法的隐形字符不是一个好主意! @GIS-Jonathan - 此外,futures.as_completed() 在内部复制了它正在执行的期货。如果parse_function 可以接受并返回文件名jobs 可以在调用as_completed 后立即删除,并且垃圾收集可以在as_completed 及其助手de-referenced后立即取消它> 它。在我看来就是这样,不确定是否有任何实际改进,接受可能在整个过程中将未来及其(文件)名称保持在一起。【参考方案2】:

尝试像这样将del 添加到您的代码中:

for job in futures.as_completed(jobs):
    del jobs[job]  # or `val = jobs.pop(job)`
    # del job  # or `job._result = None`

【讨论】:

这对我有用,内存使用再次稳定。看起来在完成时取消引用每个未来是使用期货时内存管理的关键。之后我还做了一个gc.collect() 以确保。【参考方案3】:

对我来说同样的问题。

就我而言,我需要启动数百万个线程。对于 python2,我会使用 dict 自己编写一个线程池。但是在python3中,当我动态地完成线程时,我遇到了以下错误:

RuntimeError: dictionary changed size during iteration

所以我必须使用concurrent.futures,一开始我是这样编码的:

from concurrent.futures import ThreadPoolExecutor
......
if __name__ == '__main__':
    all_resouces = get_all_resouces()
    with ThreadPoolExecutor(max_workers=50) as pool:
        for r in all_resouces:
            pool.submit(handle_resource, *args)

但很快内存就会耗尽,因为只有在所有线程完成后才会释放内存。我需要在许多线程开始之前删除已完成的线程。所以我在这里阅读了文档:https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures

发现 Executor.shutdown(wait=True) 可能是我需要的。 这是我的最终解决方案:

from concurrent.futures import ThreadPoolExecutor
......
if __name__ == '__main__':
    all_resouces = get_all_resouces()
    i = 0
    while i < len(all_resouces):
        with ThreadPoolExecutor(max_workers=50) as pool:
            for r in all_resouces[i:i+1000]:
                pool.submit(handle_resource, *args)
            i += 1000

如果你使用 with 语句,你可以避免显式调用这个方法,这将关闭 Executor(等待就像 Executor.shutdown() 被调用且等待设置为 True)

【讨论】:

尽管使用了ProcessPoolExecutor,这点因为只有在所有线程完成后才会释放内存。实际上是关键。我有 40K+ 任务要做,每个任务大约需要 2 MB,这...爆炸了我的 RAM【参考方案4】:

查看concurrent.futures.as_completed() 函数,我了解到它足以确保不再有任何对未来的引用。如果您在获得结果后立即分配此引用,您将最大限度地减少内存使用量。

我使用生成器表达式来存储我的 Future 实例,因为我关心的所有内容都已由 future 在其结果中返回(基本上是已调度工作的状态)。其他实现使用 dict 例如在您的情况下,因为您不会将输入文件名作为线程工作者结果的一部分返回。

使用生成器表达式意味着一旦产生结果,就不再有任何对Future 的引用。在内部,as_completed() 在将完整的Future 生成给您之后,已经负责删除自己的引用。

futures = (executor.submit(thread_worker, work) for work in workload)

for future in concurrent.futures.as_completed(futures):
    output = future.result()
    ...  # on next loop iteration, garbage will be collected for the result data, too

编辑:从使用 set 和删除条目简化为仅使用生成器表达式。

【讨论】:

一个更简单的解决方案是使用 generator 而不是 set。那么就不需要删除任何东西了。换句话说,futures = (executor.submit(thread_worker, work) for work in workload)

以上是关于在不耗尽 RAM 的情况下使用并发期货的主要内容,如果未能解决你的问题,请参考以下文章

在不耗尽内存的情况下检索图像

如何在不耗尽电池的情况下监控 MPMoviePlayerController 播放进度?

如何使用 Python Ray 在不耗尽内存的情况下并行处理大量数据?

Google Fit 应用程序如何在不耗尽电池电量的情况下一直测量步数?

如何在不耗尽内存的情况下制作大型 3D 数组?

在不耗尽电池的情况下以良好的准确性获取位置更新的最佳方法