嵌套 concurrent.futures.ThreadPoolExecutor
Posted
技术标签:
【中文标题】嵌套 concurrent.futures.ThreadPoolExecutor【英文标题】:Nesting concurrent.futures.ThreadPoolExecutor 【发布时间】:2017-12-12 20:32:00 【问题描述】:我有一个程序,我目前正在使用 concurrent.futures.ThreadPoolExecutor 来同时运行多个任务。这些任务通常是 I/O 绑定的,涉及对本地数据库和远程 REST API 的访问。但是,这些任务本身可以拆分为子任务,这也将受益于并发性。
我希望在任务中使用 concurrent.futures.ThreadPoolExecutor 是安全的。我编写了一个玩具示例,似乎可行:
import concurrent.futures
def inner(i, j):
return i, j, i**j
def outer(i):
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = executor.submit(inner, i, j): j for j in range(5)
results = []
for future in concurrent.futures.as_completed(futures):
results.append(future.result())
return results
def main():
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = executor.submit(outer, i): i for i in range(10)
results = []
for future in concurrent.futures.as_completed(futures):
results.extend(future.result())
print(results)
if __name__ == "__main__":
main()
虽然这个玩具示例似乎有效,但我希望这是有意为之的。我希望是这样,否则使用 executor 执行任意代码是不安全的,以防它还使用 concurrent.futures 来利用并发性。
【问题讨论】:
Mhhh 认为您应该避免使用分叉炸弹。您是否衡量过子线程前后花费的时间? 【参考方案1】:从其他线程生成线程绝对没有问题。你的情况也不例外。
但迟早,产生线程的开销会相当高,产生更多线程实际上会导致您的软件变慢。
我强烈建议使用像 asyncio 这样的库,它可以很好地异步处理任务。它通过使用一个具有非阻塞 io 的线程来实现。结果可能会比使用普通线程甚至更快,因为开销要小得多。
如果您不想使用 asyncio,为什么不在 main 中创建另一个池执行器,并将其传递给 outer()
函数?这样一来,您将拥有最多 10 (2x5) 个线程,而不是 25 (5x5) 个线程,这更合理吗?
您不能将调用outer()
的同一main()
执行程序传递给outer()
,因为这可能会导致死锁(每个outer()
等待另一个outer()
完成,然后才能安排inner()
)。
【讨论】:
以上是关于嵌套 concurrent.futures.ThreadPoolExecutor的主要内容,如果未能解决你的问题,请参考以下文章