为大型itertools产品实现内存高效的ThreadPool

Posted

技术标签:

【中文标题】为大型itertools产品实现内存高效的ThreadPool【英文标题】:Implementation of memory-efficient ThreadPool for large-scale itertools product 【发布时间】:2021-10-28 20:01:55 【问题描述】:

我有一个包含 15 个变量的字典,每个变量有 3 个值,我需要为此生成所有可能组合的乘积(3**15 = 14.3M 组合)。我正在使用带有 12 核处理器的多线程来处理组合(可能会跳转到 64 核)。

我使用itertools.product 来生成不同的组合,并使用ThreadPoolimap_unordered 来运行多处理。此外,我正在使用deque 来删除可用的结果。但是,我发现内存消耗高达 2.5GB 左右。我知道itertools.product 是可迭代的,因此不应在内存中存储太多数据,但似乎并非如此。

以下是我的代码,我想知道是否有人可以帮助我弄清楚如何更好地优化内存利用率。

此外,我想知道imap_unordered 中的块大小如何影响内存效率。我尝试了不同的数字来查看它如何影响内存使用(包括 10、100、1000、10000),但除了将内存利用率稳定在 2.5GB 左右之外,它似乎没有太大影响。如果我不包括块大小,内存往往会爆炸 >5GB。

我还尝试将线程数从 12 更改为 1,这也没有影响内存使用。但是,使用单处理器实现(下面注释掉)将内存使用量减少到仅约 30MB。

import numpy as np
import itertools
import multiprocessing
import queue
import functools
from multiprocessing import pool, dummy

def dummy_func(values, keys):
    print( dict(zip(keys, values)) )
    return

def main():
    num_threads = multiprocessing.cpu_count()

    parameters = 'a': ['7.0', '9.0', '11.0'], 'b': ['125p', '200p', '275p'], 
                  'c': ['320n', '440n', '560n'], 'd': ['400p', '500p', '600p'], 
                  'e': ['262p', '374p', '486p'], 'f': ['13p', '25p', '37p'], 
                  'g': ['19p', '40p', '61p'], 'h': ['7p', '16p', '22p'], 
                  'i': ['7p', '16p', '22p'], 
                  'j': ['0.7200000000000004', '1.1500000000000008', '1.5700000000000012'], 
                  'k': ['4', '8', '11'], 'l': ['41', '77', '113'], 'm': ['4', '8', '11'], 
                  'n': ['16p', '31p', '46p'], 'o': ['20n', '30n', '35n']
    keys = list(parameters)

    # process simulations for all permutations using single process
    #for values in itertools.product(*map(parameters.get, keys)):
    #    dummy_func(values, keys)

    # process simulations for all permutations using multi-threading
    with multiprocessing.pool.ThreadPool(num_threads) as workers:
        queue.deque(workers.imap_unordered(functools.partial(dummy_func, keys=keys), 
                                           itertools.product(*map(parameters.get, keys)), 100))
    return

if __name__ == "__main__":
    main()

【问题讨论】:

你为什么要在一个双端队列中实现所有结果呢?这会产生您试图避免的内存问题。听起来您可能误解了双端队列的概念。 也许您看到 collections.deque(iterator, maxlen=0) 曾经在某处使用迭代器,但没有意识到 maxlen 参数的作用。 这里有一些关于 chunksize 做什么的信息。 您的工作函数 dummy_func 正在返回 None,这就是您一遍又一遍地添加到双端队列中的内容,因此它会变得非常大。这有什么意义?无论如何,尝试为 deque 构造函数指定一个合理的 maxlen 参数,以将大小限制为 maxlen 元素。 @martineau chunksize 影响速度而不是空间。 【参考方案1】:

更新

如果你不想炸毁内存,你需要做 3 件事:

    您需要有一个 iterable 来生成您的值,并传递给 dummy_func 以增量方式生成值。 itertools.product 实际上在产生第一个值之前会在内存中生成所有值,因此无论您做什么,它都会炸毁内存。 您必须使用一个函数来逐个处理 iterable,并为每个结果将结果附加到使用合适的非零 maxlen 参数初始化的 deque .您当前的代码正在使用 map 函数的完整输出初始化 deque,该函数将具有传递的 iterable 的长度。这会破坏记忆。 即使您为工作函数 dummy_func 生成值,增量使用 imap,您生成任务的速度也可能比生成结果的速度快,因此池的输入队列将继续增长,您将内存爆炸。

为了克服1中描述的问题。我正在使用permutations生成器函数。

为了克服 2 中描述的问题。我使用 maxlen=10 初始化了一个空双端队列。由于每个值都是从dumy_func 返回的,所以我会将其附加到双端队列。

要克服 3. 中描述的问题,您需要使用 BoundedQueueProcessPoolBoundedQueueThreadPool 类。它使用imap 方法提交带有回调函数的新任务来处理结果。它与标准池函数的不同之处在于,一旦输入队列大小达到池中的进程数或线程数视情况而定(您可以手动指定最大队列大小),它默认会阻止主线程提交更多任务max_waiting_tasks 参数):

import multiprocessing.pool
import multiprocessing
import threading
from functools import wraps, partial

name = 'bounded_pool'

class ImapResult():
    def __init__(self, semaphore, result):
        self.semaphore = semaphore
        self.it = result.__iter__()

    def __iter__(self):
        return self

    def __next__(self):
        try:
            elem = self.it.__next__()
            self.semaphore.release()
            return elem
        except StopIteration:
            raise
        except:
            self.semaphore.release()
            raise

class BoundedQueuePool:
    def __init__(self, semaphore):
        self.semaphore = semaphore

    def release(self, result, callback=None):
        self.semaphore.release()
        if callback:
            callback(result)

    def apply_async(self, func, args=(), kwds=, callback=None, error_callback=None):
        self.semaphore.acquire()
        callback_fn = self.release if callback is None else partial(self.release, callback=callback)
        error_callback_fn = self.release if error_callback is None else partial(self.release, callback=error_callback)
        return super().apply_async(func, args, kwds, callback=callback_fn, error_callback=error_callback_fn)

    def imap(self, func, iterable, chunksize=1):
        def new_iterable(iterable):
            for elem in iterable:
                self.semaphore.acquire()
                yield elem
        result = super().imap(func, new_iterable(iterable), chunksize)
        return ImapResult(self.semaphore, result)

    def imap_unordered(self, func, iterable, chunksize=1):
        def new_iterable(iterable):
            for elem in iterable:
                self.semaphore.acquire()
                yield elem
        result = super().imap_unordered(func, new_iterable(iterable), chunksize)
        return ImapResult(self.semaphore, result)

class BoundedQueueProcessPool(BoundedQueuePool, multiprocessing.pool.Pool):
    def __init__(self, *args, max_waiting_tasks=None, **kwargs):
        multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
        if max_waiting_tasks is None:
            max_waiting_tasks = self._processes
        elif max_waiting_tasks < 0:
            raise ValueError(f'Invalid negative max_waiting_tasks value: max_waiting_tasks')
        BoundedQueuePool.__init__(self, multiprocessing.BoundedSemaphore(self._processes + max_waiting_tasks))

class BoundedQueueThreadPool(BoundedQueuePool, multiprocessing.pool.ThreadPool):
    def __init__(self, *args, max_waiting_tasks=None, **kwargs):
        multiprocessing.pool.ThreadPool.__init__(self, *args, **kwargs)
        if max_waiting_tasks is None:
            max_waiting_tasks = self._processes
        elif max_waiting_tasks < 0:
            raise ValueError(f'Invalid negative max_waiting_tasks value: max_waiting_tasks')
        BoundedQueuePool.__init__(self, threading.BoundedSemaphore(self._processes + max_waiting_tasks))

def threadpool(pool):
    def decorate(f):
        @wraps(f)
        def wrap(*args, **kwargs):
            return pool.apply_async(f, args, kwargs)
        return wrap
    return decorate

def processpool(pool):
    def decorate(f):
        @wraps(f)
        def wrap(*args, **kwargs):
            return pool.apply_async(f, args, kwargs)
        return wrap
    return decorate

##################################################################

import queue
from itertools import permutations

def dummy_func(values, keys):
    #print( dict(zip(keys, values)))
    ...
    return dict(zip(keys, values))

def main():
    num_threads = multiprocessing.cpu_count()

    parameters = 'a': ['7.0', '9.0', '11.0'], 'b': ['125p', '200p', '275p'],
                  'c': ['320n', '440n', '560n'], 'd': ['400p', '500p', '600p'],
                  'e': ['262p', '374p', '486p'], 'f': ['13p', '25p', '37p'],
                  'g': ['19p', '40p', '61p'], 'h': ['7p', '16p', '22p'],
                  'i': ['7p', '16p', '22p'],
                  'j': ['0.7200000000000004', '1.1500000000000008', '1.5700000000000012'],
                  'k': ['4', '8', '11'], 'l': ['41', '77', '113'], 'm': ['4', '8', '11'],
                  'n': ['16p', '31p', '46p'], 'o': ['20n', '30n', '35n']
                  

    # A more reasonably sized parameters:
    parameters = 'a': ['7.0', '9.0', '11.0'], 'b': ['125p', '200p', '275p'],
                  'c': ['320n', '440n', '560n'], 'd': ['400p', '500p', '600p'],
                  'e': ['262p', '374p', '486p'], 'f': ['13p', '25p', '37p'],
                  'g': ['19p', '40p', '61p'], 'h': ['7p', '16p', '22p'],
                  'i': ['7p', '16p', '22p'],
                  'j': ['0.7200000000000004', '1.1500000000000008', '1.5700000000000012'],
                  


    keys = list(parameters)

    # process simulations for all permutations using single process
    #for values in itertools.product(*map(parameters.get, keys)):
    #    dummy_func(values, keys)

    q = queue.deque(maxlen=10)

    pool = BoundedQueueThreadPool(num_threads)
    for v in pool.imap(partial(dummy_func, keys=keys), permutations(parameters.values(), len(keys))):
        q.append(v)
    return q

if __name__ == '__main__':
    import time
    t = time.time()
    q = main()
    print(q)
    print(time.time() - t)

【讨论】:

感谢您的详细回复。但是,需要明确的是,虚拟 _func 的目的是对数据执行一些任意任务。目的不是返回或保留数据以供以后打印。这将需要存储数据。我还尝试了 dequeue (0,1,10,100,1000) 的各种 maxlen 设置,但这些设置并没有影响内存或性能。 您的原始函数隐式返回了None(我不知道为什么您甚至为此使用了不必要的return 语句)。然后告诉我,通过一个看似(对我而言)存储语义上无意义的值的双端队列来炸毁内存的目的是什么。我一直在怀疑这样做是有目的的,只是假设由于您可能的一些误解或心不在焉,您忘记了返回结果。 您设置的 maxlen 值不会影响内存的原因是因为在 itertools.product 方法产生其所有值之前不会构造双端队列,而这本身就需要占了很多内存。在我上面的解决方案中,至少在使用imap 时的intent 是对itertools.product 进行lazy 评估,并在元素可用时将它们一一添加到双端队列中.然而,事实证明itertools.product 本质上是先在内存中构建所有结果,然后再逐个生成值,这就是为什么即使使用imap 内存也会爆炸的原因。

以上是关于为大型itertools产品实现内存高效的ThreadPool的主要内容,如果未能解决你的问题,请参考以下文章

itertools 高效的循环

python自带性能强悍的标准库 itertools

python itertools 模块讲解

python值itertools模块

python itertools 用法

python itertools 模块