带计数器的多处理嵌套 for 循环
Posted
技术标签:
【中文标题】带计数器的多处理嵌套 for 循环【英文标题】:Multiprocessing nested for loop with counter 【发布时间】:2021-08-07 19:45:03 【问题描述】:我正在寻找一种简单的解决方案,它可以帮助我充分利用 PC 的能力来处理我的数据。我认为,将任务划分到不同的核心将有助于减少处理时间,但我不知道该怎么做,我在 *** 上搜索过类似的问题,但没有任何解决方案可以解决我的问题。我正在处理大约长度的数据:3000,并且由于我使用嵌套 for 循环来查找列表中相似(在 +- 0.5 范围内)元素的数量,它将运行 3000x3000 次,大约需要 2 分钟,我想减少花费的时间。
repeat= []
values = []
for i in completeList:
count = 0
for j in completeList:
if isfloat(i) and isfloat(j):
if float(i)-0.5 <= float(j) <= float(i)+0.5:
count = count + 1
repeat.append(count)
values.append(i)
任何帮助将不胜感激。
关于, 马尼什
【问题讨论】:
很遗憾,这不是minimal, reproducible example,因为缺少isfloat
和completeList
的定义。但如果这些都像人们想象的那样微不足道,我很惊讶它需要 2 分钟才能运行。此外,在您的双循环中,如果列表有 3000 个元素,那么您将比较具有相同标识的两个元素 3000 次。你真的想这样做吗?如果不是简单的list
实例,您应该发布isfloat
和completeList
。
在我的桌面上,completeList
是 3000 个数字,isfloat
刚刚返回 True
,程序运行时间不到 4 秒。使用这种微不足道的处理,由于创建进程和将数据从一个地址空间传输到另一个地址空间以及从一个地址空间传输到另一个地址空间的开销,使用多处理无法获得任何重要的结果。这就是为什么我们需要查看您的代码的更多。此外,只需计算一次 float(i)
。
completeList 只是一个长度在 3000-4000 左右的普通列表。是的,isfloat 只返回 true 和 false。我的主要目标是找出特定范围内的所有元素 (float(i)-0.5
很可能算法方法比并行化要快得多。在大多数情况下,大多数操作都是在 O(n^2) 时完成的,而这可以在 O(n) 时完成(前提是数据不是高度退化的)——这比并行性甚至理论上可以提供的加速要好得多。对于初学者,通过isfloat
过滤completeList
并将每个元素转换为float
在嵌套循环之前 基本上是免费的O(n^2)-to-O(n) 优化。元素比较可以通过排序和移动窗口来加速。但如前所述,这需要minimal reproducible example 来确定它是否值得。
【参考方案1】:
由于您仍然没有发布isfloat
的实际代码或显示completeList
的元素是什么样的,我能做的最好的就是推测它们可能是什么。这有所不同,因为正如我所提到的,执行 isfloat
和 float
以转换 completeList
的元素所需的 CPU 越多,使用多处理获得的收益就越大。
对于CASE 1,我假设completeList
是由字符串组成的,isfloat
需要使用正则表达式来确定字符串是否与我们预期的浮点格式匹配并且@987654328 @ 因此需要从字符串转换。这将是我想象中 CPU 最密集的情况。对于CASE 2,completeList
由浮点数组成,isfloat
只返回True
,float
不必做任何真正的转换。
我的桌面有 8 个核心处理器:
案例 1
import multiprocessing as mp
import time
import random
import re
from functools import partial
def isfloat(s):
return not re.fullmatch(r'\d*\.\d+', s) is None
def single_process(complete_list):
#repeat = []
values = []
for idx_i, v_i in enumerate(complete_list):
count = 0
for idx_j, v_j in enumerate(complete_list):
if idx_i == idx_j:
continue # don't compare an element with itself
if isfloat(v_i) and isfloat(v_j):
f_i = float(v_i)
if f_i-0.5 <= float(v_j) <= f_i+0.5:
count = count + 1
# repeat will end up being a copy of complete_list
# why are we doing this?
#repeat.append(v_i)
values.append(count) # these are actually counts
return values
def multi_worker(complete_list, index_range):
values = []
for idx_i in index_range:
v_i = complete_list[idx_i]
count = 0
for idx_j, v_j in enumerate(complete_list):
if idx_i == idx_j:
continue # don't compare an element with itself
if isfloat(v_i) and isfloat(v_j):
f_i = float(v_i)
if f_i-0.5 <= float(v_j) <= f_i+0.5:
count = count + 1
values.append(count) # these are actually counts
return values
def multi_process(complete_list):
def split(a, n):
k, m = divmod(len(a), n)
return (a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n))
n = len(complete_list)
POOL_SIZE = mp.cpu_count()
range_splits = split(range(0, n), POOL_SIZE)
pool = mp.Pool(POOL_SIZE)
value_lists = pool.map(partial(multi_worker, complete_list), range_splits)
values = []
# join results together:
for value_list in value_lists:
values.extend(value_list)
return values
def main():
# generate 3000 random numbers:
random.seed(0)
complete_list = [str(random.uniform(1.0, 3.0)) for _ in range(3000)]
t = time.time()
values = single_process(complete_list)
print(time.time() - t, values[0:10], values[-10:-1])
t = time.time()
values = multi_process(complete_list)
print(time.time() - t, values[0:10], values[-10:-1])
# required for Windows:
if __name__ == '__main__':
main()
打印:
27.7540442943573 [1236, 1491, 1464, 1477, 1494, 1472, 1410, 1450, 1502, 1537] [1485, 1513, 1513, 1501, 1283, 1538, 804, 1459, 1457]
7.187546253204346 [1236, 1491, 1464, 1477, 1494, 1472, 1410, 1450, 1502, 1537] [1485, 1513, 1513, 1501, 1283, 1538, 804, 1459, 1457]
案例 2
import multiprocessing as mp
import time
import random
from functools import partial
def isfloat(s):
return True
def single_process(complete_list):
values = []
for idx_i, v_i in enumerate(complete_list):
count = 0
for idx_j, v_j in enumerate(complete_list):
if idx_i == idx_j:
continue # don't compare an element with itself
if isfloat(v_i) and isfloat(v_j):
f_i = float(v_i)
if f_i-0.5 <= float(v_j) <= f_i+0.5:
count = count + 1
values.append(count) # these are actually counts
return values
def multi_worker(complete_list, index_range):
values = []
for idx_i in index_range:
v_i = complete_list[idx_i]
count = 0
for idx_j, v_j in enumerate(complete_list):
if idx_i == idx_j:
continue # don't compare an element with itself
if isfloat(v_i) and isfloat(v_j):
f_i = float(v_i)
if f_i-0.5 <= float(v_j) <= f_i+0.5:
count = count + 1
values.append(count) # these are actually counts
return values
def multi_process(complete_list):
def split(a, n):
k, m = divmod(len(a), n)
return (a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n))
n = len(complete_list)
POOL_SIZE = mp.cpu_count()
range_splits = split(range(0, n), POOL_SIZE)
pool = mp.Pool(POOL_SIZE)
value_lists = pool.map(partial(multi_worker, complete_list), range_splits)
values = []
# join results together:
for value_list in value_lists:
values.extend(value_list)
return values
def main():
# generate 3000 random numbers:
random.seed(0)
complete_list = [random.uniform(1.0, 3.0) for _ in range(3000)]
t = time.time()
values = single_process(complete_list)
print(time.time() - t, values[0:10], values[-10:-1])
t = time.time()
values = multi_process(complete_list)
print(time.time() - t, values[0:10], values[-10:-1])
# required for Windows:
if __name__ == '__main__':
main()
打印:
4.181002378463745 [1236, 1491, 1464, 1477, 1494, 1472, 1410, 1450, 1502, 1537] [1485, 1513, 1513, 1501, 1283, 1538, 804, 1459, 1457]
1.325998067855835 [1236, 1491, 1464, 1477, 1494, 1472, 1410, 1450, 1502, 1537] [1485, 1513, 1513, 1501, 1283, 1538, 804, 1459, 1457]
结果
对于案例 1,加速比为 3.86,对于案例 2,加速比仅为 3.14。
【讨论】:
以上是关于带计数器的多处理嵌套 for 循环的主要内容,如果未能解决你的问题,请参考以下文章