为大型itertools产品实现内存高效的ThreadPool
Posted
技术标签:
【中文标题】为大型itertools产品实现内存高效的ThreadPool【英文标题】:Implementation of memory-efficient ThreadPool for large-scale itertools product 【发布时间】:2021-10-28 20:01:55 【问题描述】:我有一个包含 15 个变量的字典,每个变量有 3 个值,我需要为此生成所有可能组合的乘积(3**15 = 14.3M 组合)。我正在使用带有 12 核处理器的多线程来处理组合(可能会跳转到 64 核)。
我使用itertools.product
来生成不同的组合,并使用ThreadPool
和imap_unordered
来运行多处理。此外,我正在使用deque
来删除可用的结果。但是,我发现内存消耗高达 2.5GB 左右。我知道itertools.product
是可迭代的,因此不应在内存中存储太多数据,但似乎并非如此。
以下是我的代码,我想知道是否有人可以帮助我弄清楚如何更好地优化内存利用率。
此外,我想知道imap_unordered
中的块大小如何影响内存效率。我尝试了不同的数字来查看它如何影响内存使用(包括 10、100、1000、10000),但除了将内存利用率稳定在 2.5GB 左右之外,它似乎没有太大影响。如果我不包括块大小,内存往往会爆炸 >5GB。
我还尝试将线程数从 12 更改为 1,这也没有影响内存使用。但是,使用单处理器实现(下面注释掉)将内存使用量减少到仅约 30MB。
import numpy as np
import itertools
import multiprocessing
import queue
import functools
from multiprocessing import pool, dummy
def dummy_func(values, keys):
print( dict(zip(keys, values)) )
return
def main():
num_threads = multiprocessing.cpu_count()
parameters = 'a': ['7.0', '9.0', '11.0'], 'b': ['125p', '200p', '275p'],
'c': ['320n', '440n', '560n'], 'd': ['400p', '500p', '600p'],
'e': ['262p', '374p', '486p'], 'f': ['13p', '25p', '37p'],
'g': ['19p', '40p', '61p'], 'h': ['7p', '16p', '22p'],
'i': ['7p', '16p', '22p'],
'j': ['0.7200000000000004', '1.1500000000000008', '1.5700000000000012'],
'k': ['4', '8', '11'], 'l': ['41', '77', '113'], 'm': ['4', '8', '11'],
'n': ['16p', '31p', '46p'], 'o': ['20n', '30n', '35n']
keys = list(parameters)
# process simulations for all permutations using single process
#for values in itertools.product(*map(parameters.get, keys)):
# dummy_func(values, keys)
# process simulations for all permutations using multi-threading
with multiprocessing.pool.ThreadPool(num_threads) as workers:
queue.deque(workers.imap_unordered(functools.partial(dummy_func, keys=keys),
itertools.product(*map(parameters.get, keys)), 100))
return
if __name__ == "__main__":
main()
【问题讨论】:
你为什么要在一个双端队列中实现所有结果呢?这会产生您试图避免的内存问题。听起来您可能误解了双端队列的概念。 也许您看到collections.deque(iterator, maxlen=0)
曾经在某处使用迭代器,但没有意识到 maxlen
参数的作用。
这里有一些关于 chunksize 做什么的信息。
您的工作函数 dummy_func
正在返回 None
,这就是您一遍又一遍地添加到双端队列中的内容,因此它会变得非常大。这有什么意义?无论如何,尝试为 deque 构造函数指定一个合理的 maxlen 参数,以将大小限制为 maxlen
元素。
@martineau chunksize
影响速度而不是空间。
【参考方案1】:
更新
如果你不想炸毁内存,你需要做 3 件事:
-
您需要有一个 iterable 来生成您的值,并传递给
dummy_func
以增量方式生成值。 itertools.product
实际上在产生第一个值之前会在内存中生成所有值,因此无论您做什么,它都会炸毁内存。
您必须使用一个函数来逐个处理 iterable,并为每个结果将结果附加到使用合适的非零 maxlen 参数初始化的 deque
.您当前的代码正在使用 map
函数的完整输出初始化 deque
,该函数将具有传递的 iterable 的长度。这会破坏记忆。
即使您为工作函数 dummy_func
生成值,增量使用 imap
,您生成任务的速度也可能比生成结果的速度快,因此池的输入队列将继续增长,您将内存爆炸。
为了克服1中描述的问题。我正在使用permutations
生成器函数。
为了克服 2 中描述的问题。我使用 maxlen=10 初始化了一个空双端队列。由于每个值都是从dumy_func
返回的,所以我会将其附加到双端队列。
要克服 3. 中描述的问题,您需要使用 BoundedQueueProcessPool
或 BoundedQueueThreadPool
类。它使用imap
方法提交带有回调函数的新任务来处理结果。它与标准池函数的不同之处在于,一旦输入队列大小达到池中的进程数或线程数视情况而定(您可以手动指定最大队列大小),它默认会阻止主线程提交更多任务max_waiting_tasks 参数):
import multiprocessing.pool
import multiprocessing
import threading
from functools import wraps, partial
name = 'bounded_pool'
class ImapResult():
def __init__(self, semaphore, result):
self.semaphore = semaphore
self.it = result.__iter__()
def __iter__(self):
return self
def __next__(self):
try:
elem = self.it.__next__()
self.semaphore.release()
return elem
except StopIteration:
raise
except:
self.semaphore.release()
raise
class BoundedQueuePool:
def __init__(self, semaphore):
self.semaphore = semaphore
def release(self, result, callback=None):
self.semaphore.release()
if callback:
callback(result)
def apply_async(self, func, args=(), kwds=, callback=None, error_callback=None):
self.semaphore.acquire()
callback_fn = self.release if callback is None else partial(self.release, callback=callback)
error_callback_fn = self.release if error_callback is None else partial(self.release, callback=error_callback)
return super().apply_async(func, args, kwds, callback=callback_fn, error_callback=error_callback_fn)
def imap(self, func, iterable, chunksize=1):
def new_iterable(iterable):
for elem in iterable:
self.semaphore.acquire()
yield elem
result = super().imap(func, new_iterable(iterable), chunksize)
return ImapResult(self.semaphore, result)
def imap_unordered(self, func, iterable, chunksize=1):
def new_iterable(iterable):
for elem in iterable:
self.semaphore.acquire()
yield elem
result = super().imap_unordered(func, new_iterable(iterable), chunksize)
return ImapResult(self.semaphore, result)
class BoundedQueueProcessPool(BoundedQueuePool, multiprocessing.pool.Pool):
def __init__(self, *args, max_waiting_tasks=None, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
if max_waiting_tasks is None:
max_waiting_tasks = self._processes
elif max_waiting_tasks < 0:
raise ValueError(f'Invalid negative max_waiting_tasks value: max_waiting_tasks')
BoundedQueuePool.__init__(self, multiprocessing.BoundedSemaphore(self._processes + max_waiting_tasks))
class BoundedQueueThreadPool(BoundedQueuePool, multiprocessing.pool.ThreadPool):
def __init__(self, *args, max_waiting_tasks=None, **kwargs):
multiprocessing.pool.ThreadPool.__init__(self, *args, **kwargs)
if max_waiting_tasks is None:
max_waiting_tasks = self._processes
elif max_waiting_tasks < 0:
raise ValueError(f'Invalid negative max_waiting_tasks value: max_waiting_tasks')
BoundedQueuePool.__init__(self, threading.BoundedSemaphore(self._processes + max_waiting_tasks))
def threadpool(pool):
def decorate(f):
@wraps(f)
def wrap(*args, **kwargs):
return pool.apply_async(f, args, kwargs)
return wrap
return decorate
def processpool(pool):
def decorate(f):
@wraps(f)
def wrap(*args, **kwargs):
return pool.apply_async(f, args, kwargs)
return wrap
return decorate
##################################################################
import queue
from itertools import permutations
def dummy_func(values, keys):
#print( dict(zip(keys, values)))
...
return dict(zip(keys, values))
def main():
num_threads = multiprocessing.cpu_count()
parameters = 'a': ['7.0', '9.0', '11.0'], 'b': ['125p', '200p', '275p'],
'c': ['320n', '440n', '560n'], 'd': ['400p', '500p', '600p'],
'e': ['262p', '374p', '486p'], 'f': ['13p', '25p', '37p'],
'g': ['19p', '40p', '61p'], 'h': ['7p', '16p', '22p'],
'i': ['7p', '16p', '22p'],
'j': ['0.7200000000000004', '1.1500000000000008', '1.5700000000000012'],
'k': ['4', '8', '11'], 'l': ['41', '77', '113'], 'm': ['4', '8', '11'],
'n': ['16p', '31p', '46p'], 'o': ['20n', '30n', '35n']
# A more reasonably sized parameters:
parameters = 'a': ['7.0', '9.0', '11.0'], 'b': ['125p', '200p', '275p'],
'c': ['320n', '440n', '560n'], 'd': ['400p', '500p', '600p'],
'e': ['262p', '374p', '486p'], 'f': ['13p', '25p', '37p'],
'g': ['19p', '40p', '61p'], 'h': ['7p', '16p', '22p'],
'i': ['7p', '16p', '22p'],
'j': ['0.7200000000000004', '1.1500000000000008', '1.5700000000000012'],
keys = list(parameters)
# process simulations for all permutations using single process
#for values in itertools.product(*map(parameters.get, keys)):
# dummy_func(values, keys)
q = queue.deque(maxlen=10)
pool = BoundedQueueThreadPool(num_threads)
for v in pool.imap(partial(dummy_func, keys=keys), permutations(parameters.values(), len(keys))):
q.append(v)
return q
if __name__ == '__main__':
import time
t = time.time()
q = main()
print(q)
print(time.time() - t)
【讨论】:
感谢您的详细回复。但是,需要明确的是,虚拟 _func 的目的是对数据执行一些任意任务。目的不是返回或保留数据以供以后打印。这将需要存储数据。我还尝试了 dequeue (0,1,10,100,1000) 的各种 maxlen 设置,但这些设置并没有影响内存或性能。 您的原始函数隐式返回了None
(我不知道为什么您甚至为此使用了不必要的return
语句)。然后告诉我,通过一个看似(对我而言)存储语义上无意义的值的双端队列来炸毁内存的目的是什么。我一直在怀疑这样做是有目的的,只是假设由于您可能的一些误解或心不在焉,您忘记了返回结果。
您设置的 maxlen 值不会影响内存的原因是因为在 itertools.product
方法产生其所有值之前不会构造双端队列,而这本身就需要占了很多内存。在我上面的解决方案中,至少在使用imap
时的intent 是对itertools.product
进行lazy 评估,并在元素可用时将它们一一添加到双端队列中.然而,事实证明itertools.product
本质上是先在内存中构建所有结果,然后再逐个生成值,这就是为什么即使使用imap
内存也会爆炸的原因。以上是关于为大型itertools产品实现内存高效的ThreadPool的主要内容,如果未能解决你的问题,请参考以下文章