使用多处理从列表中删除项目

Posted

技术标签:

【中文标题】使用多处理从列表中删除项目【英文标题】:using multiprocessing to remove items from a list 【发布时间】:2020-11-16 08:08:31 【问题描述】:

我需要获取大量列表并删除“不合适”的列表。

当使用Pool.apply_async时,任务管理器声称只使用了大约 10% 的 cpu 和 97% 的内存,整个过程需要很长时间。 我对此不是很了解,但如果我使用所有内核,我觉得它应该使用超过 10% 的 cpu。 所以我的问题如下:

    Pool.apply_sync 是实现我目标的最佳方式吗?每次我想通过回调删除项目时,我都觉得回到主流程会增加太多时间/开销。 是什么导致了内存的过度使用?

这是我的代码示例,使用较小的列表进行演示

w_list = [[1, 0, 1], [1, 1, 0], [1, 1, 1]]
budget = 299
cost = [100, 100, 100]

def cost_interior(w):

    total_cost = 0
    for item in range(0, len(w)):
        if w[item] == 1:
            total_cost = total_cost + cost[item]

    if total_cost > budget or total_cost < (0.5 * budget):
        w_list.remove(w)

def remove_unfit(unfit):
    if unfit is not None:
        w_list.remove(unfit)

if __name__ == "__main__":

    p = Pool(2)
    for w in w_list:
        p.apply_async(cost_interior, args=(w,), callback=remove_unfit)

    p.close()
    p.join()

    print(w_list)

【问题讨论】:

如果你有一个“海量列表”,为什么会惊讶于它占用了大量内存?跨 CPU 协调工作可能会增加更多开销,而根本不会减少处理时间。 所以你想为庞大列表的每个元素启动一个单独的“并行”过程???不是一个很好的主意。顺便说一句:进程还是线程? 看看this QA和this QA 【参考方案1】:

通过使用Pool.map(function, iterable),您将获得更好的性能,它将可迭代对象(在本例中为w_list)分成多个块并将函数应用于每个块,每个块有一个线程。

一个更关键的优化是不要重复调用列表中的remove(),因为这是一项非常昂贵的操作。相反,我们可以先存储要删除的索引列表,然后创建一个新列表。

我已经测试了以下代码,它的运行速度似乎比单线程快得多(大约 3-4 倍)(您可以取消注释 process_pool = mp.Pool(1) 以查看差异)。

import multiprocessing as mp

def cost_interior(w):
    budget = 299
    cost = [100, 100, 100]
    total_cost = 0
    for item in range(0, len(w)):
        if w[item] == 1:
            total_cost = total_cost + cost[item]
    if total_cost > budget or total_cost < (0.5 * budget):
        return True
    return False


def main():
    process_pool = mp.Pool(mp.cpu_count())
    #process_pool = mp.Pool(1)
    w_list = [[1, 0, 1], [1, 1, 0], [1, 1, 1]]
    w_list = w_list*1000000
    should_remove = process_pool.map(cost_interior, w_list)
    process_pool.close()
    process_pool.join()
    should_remove_indices = set()
    for i in range(len(w_list)):
        if should_remove[i]:
            should_remove_indices.add(i)
    w_list_new = []
    for i in range(len(w_list)):
        if i not in should_remove_indices:
            w_list_new.append(w_list[i])

if __name__ == "__main__":
    main()

【讨论】:

【参考方案2】:

不幸的是,可能没有一个好的方法来做到这一点。

您在使用 python 多处理时遇到的问题是它通过创建一个附加进程池来工作。这些进程是原始进程的副本,因此您通常会得到 NUM_PROCS 个数据副本,每个进程 1 个。这里有一些注意事项,但如果您发现自己的内存大幅增加,这可能是由于您的数据有额外的副本。

另外,为了让python在进程之间进行通信,它需要序列化你的参数,将它传递给worker,然后将响应序列化回来。在您上面的示例中,在工作人员中进行处理所需的时钟周期非常少。腌制数据并发送数据可能比实际工作人员处理花费的时间更长。如果您没有看到处理时间随着池大小的增加而减少,这可能是正在发生的事情。

您可以尝试以不同的方式分解代码,看看是否可以让某些东西起作用,但是,鉴于上面的示例,我认为您不太可能获得加速。您可以尝试几种不同的池函数(我喜欢 pool.imap),但它们的根本问题是相同的。

您可以在线阅读有关多处理和全局解释器锁的问题。我发现 python 多处理在子任务需要一段时间时非常有用,但对于非常小的任务,开销太高了。

【讨论】:

以上是关于使用多处理从列表中删除项目的主要内容,如果未能解决你的问题,请参考以下文章

从 C 中的双向链接列表中删除特定项目

我想从列表视图中删除一个项目

从过滤器列表和原始列表中删除项目

使用 LINQ 从列表中删除特定项目

如何在使用 range() 函数从列表中删除项目时迭代列表? [复制]

使用实体框架从 ID 列表中删除多个项目