存储结果 ThreadPoolExecutor
Posted
技术标签:
【中文标题】存储结果 ThreadPoolExecutor【英文标题】:store results ThreadPoolExecutor 【发布时间】:2019-02-04 13:13:19 【问题描述】:我对“concurrent.futures”的并行处理相当陌生,我正在测试一些简单的实验。我编写的代码似乎可以工作,但我不确定如何存储结果。我试图创建一个列表(“期货”)并将结果附加到该列表中,但这大大减慢了过程。我想知道是否有更好的方法来做到这一点。谢谢。
import concurrent.futures
import time
couple_ods= []
futures=[]
dtab=
for i in range(100):
for j in range(100):
dtab[i,j]=i+j/2
couple_ods.append((i,j))
avg_speed=100
def task(i):
origin=i[0]
destination=i[1]
time.sleep(0.01)
distance=dtab[origin,destination]/avg_speed
return distance
start1=time.time()
def main():
with concurrent.futures.ThreadPoolExecutor() as executor:
for number in couple_ods:
future=executor.submit(task,number)
futures.append(future.result())
if __name__ == '__main__':
main()
end1=time.time()
【问题讨论】:
【参考方案1】:当您调用 future.result()
时,它会阻塞,直到值准备好。因此,您不会从这里的并行性中获得任何好处——您开始一项任务,等待它完成,开始另一项任务,等待它完成,等等。
当然,您的示例首先不会从线程中受益。你的任务除了 CPU 绑定的 Python 计算什么都不做,这意味着(至少在 CPython、MicroPython 和 PyPy,它们是 concurrent.futures
附带的唯一完整实现),GIL(全局解释器锁)将阻止更多比你的一个线程一次进步。
希望您的真实程序有所不同。如果它正在做 I/O 绑定的事情(发出网络请求、读取文件等),或者使用像 NumPy 这样的扩展库来释放 GIL 以解决繁重的 CPU 工作,那么它会正常工作。但除此之外,您需要在此处使用 ProcessPoolExecutor
。
无论如何,您要做的就是将future
本身附加到一个列表中,这样您就可以在等待它们之前获得所有期货的列表:
for number in couple_ods:
future=executor.submit(task,number)
futures.append(future)
然后,在您开始所有作业后,您就可以开始等待它们了。有三个简单的选项,当您需要更多控制时,还有一个复杂的选项。
(1) 您可以直接循环它们以按提交顺序等待它们:
for future in futures:
result = future.result()
dostuff(result)
(2) 如果您需要等待它们全部完成后再进行任何工作,您可以拨打wait
:
futures, _ = concurrent.futures.wait(futures)
for future in futures:
result = future.result()
dostuff(result)
(3) 如果您想在每个准备就绪后立即处理,即使它们出现故障,请使用as_completed
:
for future in concurrent.futures.as_completed(futures):
dostuff(future.result())
请注意,文档中使用此函数的示例提供了一些方法来确定完成了哪个任务。如果你需要,它可以很简单,只需将每个索引传递一个索引,然后 return index, real_result
,然后你可以 for index, result in …
进行循环。
(4) 如果您需要更多控制权,您可以循环访问wait
ing,了解目前为止所做的任何事情:
while futures:
done, futures = concurrent.futures.wait(concurrent.futures.FIRST_COMPLETED)
for future in done:
result = future.result()
dostuff(result)
该示例与as_completed
执行相同的操作,但您可以对其编写细微的变化来执行不同的操作,例如等待所有内容完成但如果有任何引发异常则提前取消。
对于很多简单的情况,你可以只使用执行器的map
方法来简化第一个选项。这就像内置的 map
函数一样,为参数中的每个值调用一次函数,然后为您提供一些可以循环以相同顺序获取结果的内容,但它是并行执行的。所以:
for result in executor.map(task, couple_ods):
dostuff(result)
【讨论】:
我认为答案的as_completed
部分有一个小错误,应该是:for future in concurrent.futures.as_completed(futures): dostuff(future.result())
以上是关于存储结果 ThreadPoolExecutor的主要内容,如果未能解决你的问题,请参考以下文章