并发期货:何时以及如何实施?
Posted
技术标签:
【中文标题】并发期货:何时以及如何实施?【英文标题】:Concurrent Futures: When and how to implement? 【发布时间】:2021-01-07 03:44:20 【问题描述】:from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
import numpy as np
import time
#creating iterable
testDict =
for i in range(1000):
testDict[i] = np.random.randint(1,10)
#default method
stime = time.time()
newdict = []
for k, v in testDict.items():
for i in range(1000):
v = np.tanh(v)
newdict.append(v)
etime = time.time()
print(etime - stime)
#output: 1.1139910221099854
#multi processing
stime = time.time()
testresult = []
def f(item):
x = item[1]
for i in range(1000):
x = np.tanh(x)
return x
def main(testDict):
with ProcessPoolExecutor(max_workers = 8) as executor:
futures = [executor.submit(f, item) for item in testDict.items()]
for future in as_completed(futures):
testresult.append(future.result())
if __name__ == '__main__':
main(testDict)
etime = time.time()
print(etime - stime)
#output: 3.4509658813476562
学习多处理和测试的东西。进行测试以检查我是否正确实现了这一点。查看所花费的输出时间,并发方法慢了 3 倍。那怎么了?
我的目标是并行化一个脚本,该脚本主要在大约 500 个项目的字典上运行。每个循环,这 500 个项目的值都会被处理和更新。这个循环可以说是 5000 代。 k,v 对中没有一个与其他 k,v 对交互。 [它是一种遗传算法]。
我还在寻找有关如何并行化上述目标的指南。如果我在我的遗传算法代码中对我的每个函数使用正确的并发期货方法,其中每个函数接受字典的输入并输出一个新字典,它会有用吗?任何指南/资源/帮助表示赞赏。
编辑:如果我运行这个例子:https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example,它需要比默认的循环检查多 3 倍的时间来解决。
【问题讨论】:
您的机器上有多少个内核?如果将迭代次数从 1000 更改为 10000,您观察到的趋势是否会继续? 10万?您可能只是通过使用如此小的数据集来观察并行化开销。或者,如果您的内核少于 8 个,则可能只是 CPU 过载。 @SethMMorton 4 核。以 10000 跑并看到相同的 3 倍比率。 Overhead 是一种查询途径,但如果可以的话,请查看我对我的帖子所做的编辑:即使是文档示例的运行速度也比列表上的循环慢。 如果您只使用 4 个工人怎么办?您创建的进程似乎是核心的两倍。 由于某种原因恶化了。 【参考方案1】:这里有几个基本问题,您使用的是 numpy,但您没有对计算进行矢量化。您在此处编写代码的方式不会受益于 numpy 的速度优势,还不如使用标准库 math
模块,对于这种代码风格,它比 numpy 更快:
# 0.089sec
import math
for k, v in testDict.items():
for i in range(1000):
v = math.tanh(v)
newdict.append(v)
一旦你对操作进行矢量化,你就会看到 numpy 的好处:
# 0.016sec
for k, v in testDict.items():
arr = no.full(1000, v)
arr2 = np.tanh(arr)
newdict.append(arr2[-1])
作为比较,您的原始单线程代码在我的测试机器上运行时间为 1.171 秒。正如您在此处看到的,如果使用不当,NumPy 甚至可能比纯 Python 慢几个数量级。
现在谈谈为什么你会看到你所看到的。
说实话,我无法复制您的计时结果。您的原始多处理代码在 Python 3.6 上运行我的 macOS 时为 0.299 秒),这比单进程代码快。但是,如果我不得不猜测,您可能使用的是 Windows?在某些平台(如 Windows)中,创建子进程并设置运行多处理任务的环境非常昂贵,因此将多处理用于持续不到几秒钟的任务的好处是可疑的。如果您对原因感兴趣,请read here。
此外,在 Python 3.8 或 Windows 之后的 MacOS 等缺乏可用 fork() 的平台中,当您使用多处理时,子进程必须重新导入模块,因此如果将两个代码放在同一个文件中,它必须在子进程中运行您的单线程代码,然后它才能运行多处理代码。您可能希望将您的测试代码放在一个函数中,并使用 if __name__ == "__main__"
块保护***代码。在装有 Python 3.8 或更高版本的 Mac 上,如果您没有调用 Mac 的非 fork 安全框架库,还可以通过调用 multiprocessing.set_start_method("fork")
恢复使用 fork 方法。
说完,继续你的标题问题。
当您使用多处理时,您需要将数据复制到子进程并返回到主进程以检索结果,并且生成子进程会产生成本。要从多处理中受益,您需要设计工作负载,使这部分成本可以忽略不计。
如果您的数据来自外部来源,请尝试在子进程中加载数据,而不是让主进程加载数据然后将其传输到子进程,让主进程告诉子进程如何获取其切片数据。在这里,您在主进程中生成 testDict,因此如果可以,请将其并行化并将它们移动到子进程中。
另外,由于您使用的是 numpy,如果您正确地矢量化您的操作,numpy 将在执行矢量化操作时释放GIL,因此您可以只使用多线程。由于 numpy 在向量操作期间不持有 GIL,因此您可以在单个 Python 进程中利用多个线程,并且您不需要分叉或将数据复制到子进程,因为线程共享内存。
【讨论】:
我正在学习矢量化,因此我必须在我的遗传算法项目的上下文中询问。如果我代表一个具有 len 6 数组随机排列的基因,比如说 [0 2 3 4 1 5] 并代表我的人口,比如说 200 个人在一个形状为 (200, 6) 的 numpy 数组中,将矢量化有帮助,因为我的基本单位不是一行中的数字,比如说 2,我必须在其上应用函数,而是我在整行本身中的基本单位。经过几次操作后,示例行可能会更改为 [5 4 3 0 1 2]。其他 199 行也会发生同样的情况。 那么矢量化在这里会有所帮助并缩短计算时间吗? @MashhoodAhmad:是的,在玩多处理之前先对代码进行矢量化。如果您的问题不是一个可以轻松矢量化的问题,那么您可能不需要 numpy,那么普通的 python 可能就可以了。您可能想发表一篇单独的帖子来询问如何矢量化您的问题,您的评论中没有足够的细节来说明一种或另一种方式。也就是说,许多机器学习和遗传算法问题都非常适合 numpy 擅长的矢量化算法,尽管它们不一定总是很明显。以上是关于并发期货:何时以及如何实施?的主要内容,如果未能解决你的问题,请参考以下文章