在完成所有任务之前关闭 ProcessPoolExecutor 的异步

Posted

技术标签:

【中文标题】在完成所有任务之前关闭 ProcessPoolExecutor 的异步【英文标题】:Asyncio with ProcessPoolExecutor shutdown before finishing all tasks 【发布时间】:2021-10-28 06:56:21 【问题描述】:

我想将 ProcessPoolExecutor 与 asyncio 结合起来,在 TestClass 中同时运行我的阻塞函数。每个任务都旨在长时间运行,因此我需要一个有效的关闭过程以在退出脚本后使事情顺利进行。我需要在哪里为 KeyboardInterrupt 添加错误处理以顺利关闭所有任务和进程?我搜索了很多相关的主题,但没有一个能解决我想要的问题。希望能得到一些帮助!提前致谢。

import asyncio
from concurrent.futures import ProcessPoolExecutor


class TestClass:
    def __init__(self) -> None:
        self.value1 = 1
        self.value2 = 2


async def task(loop,executor_processes, i):
    print(f"[TASK i] Initializing Abck class")
    new_test = await loop.run_in_executor(executor_processes,TestClass)
    # other async and sync functions contained in TestClass
    print(f"[TASK i] Finished")


async def main():
    executor_processes = ProcessPoolExecutor(max_workers=5)

    loop_ = asyncio.get_event_loop()
    tasks = []

    for i in range(1, 100):
        tasks.append(task(loop_, executor_processes, i))

    await asyncio.gather(*tasks)


if __name__ == '__main__':
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("ctrl + c")
    finally:
        print('Program finished')

这是在所有任务和进程完成之前按 ctrl + c 后的错误日志。

Fatal Python error: Fatal Python error: init_import_sizeinit_import_size: : Failed to import the site moduleFailed to import the site module

Python runtime state: Python runtime state: initializedinitialized

Traceback (most recent call last):
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 580, in <module>
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 580, in <module>
Fatal Python error: init_import_size: Failed to import the site module
Python runtime state: initialized
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 580, in <module>
Fatal Python error: init_import_size: Failed to import the site module
Python runtime state: initialized
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 580, in <module>
Fatal Python error: init_import_size: Failed to import the site module
Python runtime state: initialized
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 580, in <module>
    main()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 563, in main
    main()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 563, in main
    main()
    main()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 563, in main
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 563, in main
    known_paths = venv(known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 495, in venv
    known_paths = venv(known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 495, in venv
    known_paths = venv(known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 495, in venv
    known_paths = venv(known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 495, in venv
    addsitepackages(known_paths, [sys.prefix])
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 350, in addsitepackages
    addsitepackages(known_paths, [sys.prefix])
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 350, in addsitepackages
    main()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 563, in main
    addsitepackages(known_paths, [sys.prefix])
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 350, in addsitepackages
    addsitepackages(known_paths, [sys.prefix])
    addsitedir(sitedir, known_paths)
    addsitedir(sitedir, known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 350, in addsitepackages
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 208, in addsitedir
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 208, in addsitedir
    addsitedir(sitedir, known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 208, in addsitedir
    addpackage(sitedir, name, known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 169, in addpackage
    known_paths = venv(known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 495, in venv
    addpackage(sitedir, name, known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 169, in addpackage
    addsitedir(sitedir, known_paths)
    exec(line)
    exec(line)
    addpackage(sitedir, name, known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 208, in addsitedir
  File "<string>", line 1, in <module>
  File "<string>", line 1, in <module>
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 169, in addpackage
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/util.py", line 14, in <module>
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/util.py", line 2, in <module>
    exec(line)
  File "<string>", line 1, in <module>
    from contextlib import contextmanager
    from . import abc
    addpackage(sitedir, name, known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/contextlib.py", line 6, in <module>
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/abc.py", line 4, in <module>
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 169, in addpackage
    addsitepackages(known_paths, [sys.prefix])
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/util.py", line 14, in <module>
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 350, in addsitepackages
    from contextlib import contextmanager
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/contextlib.py", line 165, in <module>
    exec(line)
  File "<string>", line 1, in <module>
    addsitedir(sitedir, known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 208, in addsitedir
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/util.py", line 14, in <module>
    from contextlib import contextmanager
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/contextlib.py", line 5, in <module>
    addpackage(sitedir, name, known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 160, in addpackage
    f = io.TextIOWrapper(io.open_code(fullname))
  File "<frozen importlib._bootstrap>", line 991, in _find_and_load
  File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 779, in exec_module
  File "<frozen importlib._bootstrap_external>", line 911, in get_code
  File "<frozen importlib._bootstrap_external>", line 580, in _compile_bytecode
KeyboardInterrupt
    from . import machinery
KeyboardInterrupt
    from functools import wraps
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/functools.py", line 438, in <module>
    from collections import deque
    class _AsyncGeneratorContextManager(_GeneratorContextManagerBase,
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/collections/__init__.py", line 21, in <module>
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/abc.py", line 85, in __new__
    cls = super().__new__(mcls, name, bases, namespace, **kwargs)
KeyboardInterrupt
    from operator import itemgetter as _itemgetter, eq as _eq
  File "<frozen importlib._bootstrap>", line 991, in _find_and_load
  File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 779, in exec_module
    _CacheInfo = namedtuple("CacheInfo", ["hits", "misses", "maxsize", "currsize"])
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/collections/__init__.py", line 394, in namedtuple
  File "<frozen importlib._bootstrap_external>", line 911, in get_code
  File "<frozen importlib._bootstrap_external>", line 580, in _compile_bytecode
Exception in thread QueueManagerThread:
Traceback (most recent call last):
KeyboardInterrupt
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    exec(s, namespace)
  File "<string>", line 1, in <module>
ctrl + c
Program finished
KeyboardInterrupt
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/process.py", line 394, in _queue_management_worker
    work_item.future.set_exception(bpe)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/_base.py", line 539, in set_exception
    raise InvalidStateError(': !r'.format(self._state, self))
concurrent.futures._base.InvalidStateError: CANCELLED: <Future at 0x7ffed1f2f250 state=cancelled>

【问题讨论】:

【参考方案1】:

Windows 解决方案

如果您在 Windows 上运行,那么 CTRL-C 中断处理似乎不适用于多处理池。以下内容有点笨拙,但似乎很流行。

这个想法是用一个初始设置为False的全局变量ctrl_c_entered初始化多处理池中的每个进程。我已经用一个方法foo 完成了你的课程TestClass,这将是调用的工作函数。调用时必须:

    测试全局标志ctrl_c_entered,如果为真,立即返回。 拥有自己的 KeyboardInterrupt 处理程序,在此类中断上,它必须将全局 ctrl_c_entered 标志设置为 True 并返回。 更新:但是,当池进程尚未将控制权转移到工作函数时,可以输入 CTRL-C。例如,它可能正在从输入队列中获取下一个要运行的任务。在这种情况下,否则不会有 try/catch 用于有效的 KeyboardInterrupt 异常。所以我们需要为池中的每个进程设置一个中断处理程序,用于将ctrl_c_entered标志设置为True。但这现在意味着原始,默认的SIGINT 中断处理程序必须在上面的步骤 2 中临时恢复,以便捕获 KeyboardInterrupt 异常。

您还必须让所有提交的异步任务完成。所以我们设置了一个signal.SIGINT 中断处理程序,如果输入了 CTRL-C,它将主进程的全局ctrl_c_entered 标志设置为True(我们确实打破asyncio.run(main()声明。我们长时间运行的 asyncio 任务必须检查这个 ctrl_c_entered 标志,如果它设置为 True,则终止。

import asyncio
from concurrent.futures import ProcessPoolExecutor
import signal
import time
from functools import wraps

def handle_ctrl_c(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        global ctrl_c_entered
        if not ctrl_c_entered:
            signal.signal(signal.SIGINT, default_sigint_handler) # the default
            try:
                return func(*args, **kwargs)
            except KeyboardInterrupt:
                ctrl_c_entered = True
                return KeyboardInterrupt()
            finally:
                signal.signal(signal.SIGINT, pool_ctrl_c_handler)
        else:
            return KeyboardInterrupt()
    return wrapper


class TestClass:
    def __init__(self) -> None:
        self.value1 = 1
        self.value2 = 2

    @handle_ctrl_c
    def foo(self, i):
        time.sleep(1)
        return i ** 2

async def task(loop, executor_processes, i):
    # If this is a long-running task, periodically check running flag and return if set.
    # For example:
    if ctrl_c_entered:
        return KeyboardInterrupt()
    print(f"[TASK i] Initializing Abck class")
    new_test = await loop.run_in_executor(executor_processes, TestClass().foo, i)
    # other async and sync functions contained in TestClass
    print(f"[TASK i] Finished")
    return new_test

def pool_ctrl_c_handler(*args, **kwargs):
    global ctrl_c_entered
    ctrl_c_entered = True

def init_pool():
    # set global variable for each process in the pool:
    global ctrl_c_entered
    global default_sigint_handler
    ctrl_c_entered = False
    default_sigint_handler = signal.signal(signal.SIGINT, pool_ctrl_c_handler)

async def main():
    executor_processes = ProcessPoolExecutor(max_workers=5, initializer=init_pool)

    loop_ = asyncio.get_event_loop()
    tasks = []

    for i in range(1, 100):
        tasks.append(task(loop_, executor_processes, i))

    results = await asyncio.gather(*tasks)
    print(results)

def ctrl_c_handler(*args, **kwargs):
    global ctrl_c_entered
    ctrl_c_entered = True

if __name__ == '__main__':
    ctrl_c_entered = False
    signal.signal(signal.SIGINT, ctrl_c_handler)
    asyncio.run(main())
    print('Program finished')

打印:

[TASK 1] Initializing Abck class
[TASK 2] Initializing Abck class
[TASK 3] Initializing Abck class
[TASK 4] Initializing Abck class
[TASK 5] Initializing Abck class
[TASK 6] Initializing Abck class
[TASK 7] Initializing Abck class
[TASK 8] Initializing Abck class
[TASK 9] Initializing Abck class
[TASK 10] Initializing Abck class
[TASK 11] Initializing Abck class
[TASK 12] Initializing Abck class
[TASK 13] Initializing Abck class
[TASK 14] Initializing Abck class
[TASK 15] Initializing Abck class
[TASK 16] Initializing Abck class
[TASK 17] Initializing Abck class
[TASK 18] Initializing Abck class
[TASK 19] Initializing Abck class
[TASK 1] Finished
[TASK 2] Finished
[TASK 3] Finished
[TASK 4] Finished
[TASK 5] Finished
[TASK 6] Finished
[TASK 7] Finished
[TASK 9] Finished
[TASK 8] Finished
[TASK 10] Finished
ctrl + c
ctrl + c
ctrl + c
ctrl + c
ctrl + c
[TASK 13] Finished
[TASK 16] Finished
[TASK 17] Finished
[TASK 18] Finished
[TASK 19] Finished
[TASK 14] Finished
[TASK 12] Finished
[TASK 11] Finished
[TASK 15] Finished
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100, KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt()]

使用 Fork 解决方案的 Linux 和平台

这更简单,因为中断处理或多或少适用于多处理池。处理这个问题的最简单方法是再次为每个池进程初始化一个全局running 标志,如果False,工作函数可以定期检查并终止该标志。当用户输入 CTRL-C 时,每个池进程将设置一个 CTRL-C 处理程序并将running 设置为 False。这将负责终止任何已经运行的任务。主进程可以简单处理KeyboardInterrupt异常:

import asyncio
from concurrent.futures import ProcessPoolExecutor
import signal
import time

class TestClass:
    def __init__(self) -> None:
        self.value1 = 1
        self.value2 = 2

    def foo(self):
        for _ in range(20):
            if not running:
                return
            time.sleep(.1)

async def task(loop, executor_processes, i):
    print(f"[TASK i] Initializing Abck class")
    new_test = await loop.run_in_executor(executor_processes, TestClass().foo)
    # other async and sync functions contained in TestClass
    print(f"[TASK i] Finished")

def ctrl_c_handler(*args, **kwargs):
    global running
    running = False

def init_pool():
    global running
    running = True
    signal.signal(signal.SIGINT, ctrl_c_handler)

async def main():
    executor_processes = ProcessPoolExecutor(max_workers=5, initializer=init_pool)

    loop_ = asyncio.get_event_loop()
    tasks = []

    for i in range(1, 100):
        tasks.append(task(loop_, executor_processes, i))

    await asyncio.gather(*tasks)

try:
    asyncio.run(main())
except KeyboardInterrupt:
    print("ctrl + c")
print('Program finished')

【讨论】:

感谢您的回答,我也尝试过这个示例,但使用 signal.signal(signal.SIGINT, signal.SIG_IGN)done, pending = await asyncio.wait(tasks,return_when=asyncio.FIRST_EXCEPTION) 而不是 await asyncio.gather(*tasks) 并使用全局变量。它似乎有类似的效果。我发现有趣的是,如果在所有任务中初始化 TestClass 之前调用 ctrl+c 也会引发此错误。 其实我刚刚更新了Windows解决方案。请参阅新的第 3 步描述。当池进程实际上并未运行任务但可能即将运行时,可以输入 Ctrl-C 时有一个窗口。这可能会导致未处理的异常。查看更新的解决方案。 我尝试了您更新的代码,但在您回复的第 3 步尝试停止该场景中的脚本时,它引发了另一个错误 concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending. 我修改了 Windows 版本的 foo 以接受一个参数并返回一个结果(它平方它的参数)并让 main 打印出 20 个结果(从 100 减少以保持上市规模合理)。如果出现 Ctrl-C,则返回的结果是 KeyboardException 的实例。我在完成 20 个任务中的前 10 个后输入了 Ctrl-C。 我明白你的意思。我认为我不会立即找到解决方案。但是,我再次更新了 Windows 源代码。我创建了一个装饰器函数handle_ctrl_c,您可以使用它来装饰您的池工作者函数。所有的中断处理都在这个装饰器中,我将全局running 标志切换为更直观的ctrl_c_entered 标志,最初是False

以上是关于在完成所有任务之前关闭 ProcessPoolExecutor 的异步的主要内容,如果未能解决你的问题,请参考以下文章

Task.WhenAll 不等待任务完成[关闭]

如何在退出前安全地关闭所有线程[重复]

如何在不关闭 Executor 的情况下等待 ThreadPoolExecutor 中的所有任务完成?

让线程在开始下一组任务之前等待所有任务完成

Spring Boot 在关闭时关闭休眠会话 - 在 @Async 方法完成之前

Django Channels 从 Celery 任务发送组消息。 Asyncio 事件循环在所有异步任务完成之前停止