使用 concurrent.futures 模块中的 ThreadPoolExecutor 终止执行程序

Posted

技术标签:

【中文标题】使用 concurrent.futures 模块中的 ThreadPoolExecutor 终止执行程序【英文标题】:Terminate executor using ThreadPoolExecutor from concurrent.futures module 【发布时间】:2021-10-14 02:06:48 【问题描述】:

我正在尝试根据从长时间运行的请求返回的值终止 ThreadPool。一旦请求返回值的总和达到MIN_REQUIRED_VALUE,我希望终止线程池

我确信问题在于我正在创建一个完整的期货列表,而这些期货总是必须得到解决。我不确定如何在不使用 ThreadPoolExecutor 创建列表的情况下执行请求

我知道有几个与终止线程池相关的问题。我发现了类似的问题,但答案似乎无法处理返回值。

类似问题:

Python ThreadPoolExecutor terminate all threads asyncio: Is it possible to cancel a future been run by an Executor?

如果有更好的方法可以用另一个模块做到这一点,那很好。

任何帮助将不胜感激。

from time import sleep
from concurrent.futures import ThreadPoolExecutor, as_completed

NUM_REQUESTS = 50
MIN_REQUIRED_VALUE = 30


def long_request(id):
    sleep(3)
    return "data": "value": 10


def check_results(results):
    total = 0
    for result in results:
        total += result["data"]["value"]

    return total


def main():
    futures = []
    responses = []

    with ThreadPoolExecutor(max_workers=10) as executor:
        for request_index in range(NUM_REQUESTS):
            future = executor.submit(long_request, request_index)

            # Create Futures List
            futures.append(future)

        for future in as_completed(futures):
            responses.append(future.result())

            # Check minimum value reached
            total = check_results(responses)
            if total > MIN_REQUIRED_VALUE:
                executor.shutdown(wait=False)


if __name__ == "__main__":
    main()

【问题讨论】:

【参考方案1】:

我更改了代码以仅在未达到 MIN_REQUIRED_VALUE 时附加带有结果的期货,并遍历所有未决期货并在达到 MIN_REQUIRED_VALUE 时取消它们。

您可以注意到我添加了 num_requests 来检查提交的请求数,在这种情况下结果正好是 6,这是预期的。

如果有人有更好的方法来做到这一点会很高兴。

from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep

NUM_REQUESTS = 1000
MIN_REQUIRED_VALUE = 50


def long_request(id):
    sleep(1)
    return "data": "value": 10


def check_results(results):
    total = 0
    for result in results:
        total += result["data"]["value"]

    return total


def main():
    futures = []
    responses = []
    num_requests = 0

    with ThreadPoolExecutor(max_workers=10) as executor:
        for request_index in range(NUM_REQUESTS):
            future = executor.submit(long_request, request_index)

            # Future list
            futures.append(future)

        for future in as_completed(futures):

            # --- Changed Logic Below ---
            total = check_results(responses)

            if total > MIN_REQUIRED_VALUE:
                for pending_future in futures:
                    pending_future.cancel()
            else:
                num_requests += 1
                responses.append(future.result())

    return num_requests


if __name__ == "__main__":
    requests = main()
    print("Num Requests: ", requests)

【讨论】:

@gold_cy 你是对的, executor.shutdown(wait=False) 现在工作正常。我一开始就尝试过并放弃了它,因为它不符合我的第一个逻辑。我用第二个逻辑尝试了它,它有效。它必须做类似于循环不完整的期货并调用取消的事情

以上是关于使用 concurrent.futures 模块中的 ThreadPoolExecutor 终止执行程序的主要内容,如果未能解决你的问题,请参考以下文章

使用 concurrent.futures 模块中的 ThreadPoolExecutor 终止执行程序

使用concurrent.futures模块并发,实现进程池线程池

concurrent.futures模块与协程

35concurrent.futures模块与协程

python中的concurrent.futures模块

创建进程池与线程池concurrent.futures模块的使用