存储结果 ThreadPoolExecutor

Posted

技术标签:

【中文标题】存储结果 ThreadPoolExecutor【英文标题】:store results ThreadPoolExecutor 【发布时间】:2019-02-04 13:13:19 【问题描述】:

我对“concurrent.futures”的并行处理相当陌生,我正在测试一些简单的实验。我编写的代码似乎可以工作,但我不确定如何存储结果。我试图创建一个列表(“期货”)并将结果附加到该列表中,但这大大减慢了过程。我想知道是否有更好的方法来做到这一点。谢谢。

import concurrent.futures
import time

couple_ods= []
futures=[]

dtab=
for i in range(100):
    for j in range(100):
       dtab[i,j]=i+j/2
       couple_ods.append((i,j))

avg_speed=100
def task(i):
    origin=i[0]
    destination=i[1]
    time.sleep(0.01)
    distance=dtab[origin,destination]/avg_speed
    return distance
start1=time.time()
def main():
    with concurrent.futures.ThreadPoolExecutor() as executor:
       for number in couple_ods:
          future=executor.submit(task,number)
          futures.append(future.result())

if __name__ == '__main__':
    main()
end1=time.time()

【问题讨论】:

【参考方案1】:

当您调用 future.result() 时,它会阻塞,直到值准备好。因此,您不会从这里的并行性中获得任何好处——您开始一项任务,等待它完成,开始另一项任务,等待它完成,等等。

当然,您的示例首先不会从线程中受益。你的任务除了 CPU 绑定的 Python 计算什么都不做,这意味着(至少在 CPython、MicroPython 和 PyPy,它们是 concurrent.futures 附带的唯一完整实现),GIL(全局解释器锁)将阻止更多比你的一个线程一次进步。

希望您的真实程序有所不同。如果它正在做 I/O 绑定的事情(发出网络请求、读取文件等),或者使用像 NumPy 这样的扩展库来释放 GIL 以解决繁重的 CPU 工作,那么它会正常工作。但除此之外,您需要在此处使用 ProcessPoolExecutor


无论如何,您要做的就是将future 本身附加到一个列表中,这样您就可以在等待它们之前获得所有期货的列表:

for number in couple_ods:
    future=executor.submit(task,number)
    futures.append(future)
     

然后,在您开始所有作业后,您就可以开始等待它们了。有三个简单的选项,当您需要更多控制时,还有一个复杂的选项。


(1) 您可以直接循环它们以按提交顺序等待它们:

for future in futures:
    result = future.result()
    dostuff(result)

(2) 如果您需要等待它们全部完成后再进行任何工作,您可以拨打wait

futures, _ = concurrent.futures.wait(futures)
for future in futures:
    result = future.result()
    dostuff(result)

(3) 如果您想在每个准备就绪后立即处理,即使它们出现故障,请使用as_completed

for future in concurrent.futures.as_completed(futures): 
    dostuff(future.result())

请注意,文档中使用此函数的示例提供了一些方法来确定完成了哪个任务。如果你需要,它可以很简单,只需将每个索引传递一个索引,然后 return index, real_result,然后你可以 for index, result in … 进行循环。

(4) 如果您需要更多控制权,您可以循环访问waiting,了解目前为止所做的任何事情:

while futures:
    done, futures = concurrent.futures.wait(concurrent.futures.FIRST_COMPLETED)
    for future in done:
        result = future.result()
        dostuff(result)

该示例与as_completed 执行相同的操作,但您可以对其编写细微的变化来执行不同的操作,例如等待所有内容完成但如果有任何引发异常则提前取消。


对于很多简单的情况,你可以只使用执行器的map方法来简化第一个选项。这就像内置的 map 函数一样,为参数中的每个值调用一次函数,然后为您提供一些可以循环以相同顺序获取结果的内容,但它是并行执行的。所以:

for result in executor.map(task, couple_ods):
    dostuff(result)

【讨论】:

我认为答案的as_completed部分有一个小错误,应该是:for future in concurrent.futures.as_completed(futures): dostuff(future.result())

以上是关于存储结果 ThreadPoolExecutor的主要内容,如果未能解决你的问题,请参考以下文章

mssql 存储过程调用另一个存储过程中的结果的方法分享

JAVA获取ORACLE存储过程返回结果集的问题

oracle存储过程如何输出结果集

sqlserver 存储过程 返回结果集的 例子

ORACLE 存储过程怎么返回临时表结果集

sql server存储过程如何输出结果集