在完成所有任务之前关闭 ProcessPoolExecutor 的异步
Posted
技术标签:
【中文标题】在完成所有任务之前关闭 ProcessPoolExecutor 的异步【英文标题】:Asyncio with ProcessPoolExecutor shutdown before finishing all tasks 【发布时间】:2021-10-28 06:56:21 【问题描述】:我想将 ProcessPoolExecutor 与 asyncio 结合起来,在 TestClass 中同时运行我的阻塞函数。每个任务都旨在长时间运行,因此我需要一个有效的关闭过程以在退出脚本后使事情顺利进行。我需要在哪里为 KeyboardInterrupt 添加错误处理以顺利关闭所有任务和进程?我搜索了很多相关的主题,但没有一个能解决我想要的问题。希望能得到一些帮助!提前致谢。
import asyncio
from concurrent.futures import ProcessPoolExecutor
class TestClass:
def __init__(self) -> None:
self.value1 = 1
self.value2 = 2
async def task(loop,executor_processes, i):
print(f"[TASK i] Initializing Abck class")
new_test = await loop.run_in_executor(executor_processes,TestClass)
# other async and sync functions contained in TestClass
print(f"[TASK i] Finished")
async def main():
executor_processes = ProcessPoolExecutor(max_workers=5)
loop_ = asyncio.get_event_loop()
tasks = []
for i in range(1, 100):
tasks.append(task(loop_, executor_processes, i))
await asyncio.gather(*tasks)
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
print("ctrl + c")
finally:
print('Program finished')
这是在所有任务和进程完成之前按 ctrl + c 后的错误日志。
Fatal Python error: Fatal Python error: init_import_sizeinit_import_size: : Failed to import the site moduleFailed to import the site module
Python runtime state: Python runtime state: initializedinitialized
Traceback (most recent call last):
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 580, in <module>
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 580, in <module>
Fatal Python error: init_import_size: Failed to import the site module
Python runtime state: initialized
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 580, in <module>
Fatal Python error: init_import_size: Failed to import the site module
Python runtime state: initialized
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 580, in <module>
Fatal Python error: init_import_size: Failed to import the site module
Python runtime state: initialized
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 580, in <module>
main()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 563, in main
main()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 563, in main
main()
main()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 563, in main
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 563, in main
known_paths = venv(known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 495, in venv
known_paths = venv(known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 495, in venv
known_paths = venv(known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 495, in venv
known_paths = venv(known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 495, in venv
addsitepackages(known_paths, [sys.prefix])
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 350, in addsitepackages
addsitepackages(known_paths, [sys.prefix])
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 350, in addsitepackages
main()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 563, in main
addsitepackages(known_paths, [sys.prefix])
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 350, in addsitepackages
addsitepackages(known_paths, [sys.prefix])
addsitedir(sitedir, known_paths)
addsitedir(sitedir, known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 350, in addsitepackages
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 208, in addsitedir
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 208, in addsitedir
addsitedir(sitedir, known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 208, in addsitedir
addpackage(sitedir, name, known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 169, in addpackage
known_paths = venv(known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 495, in venv
addpackage(sitedir, name, known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 169, in addpackage
addsitedir(sitedir, known_paths)
exec(line)
exec(line)
addpackage(sitedir, name, known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 208, in addsitedir
File "<string>", line 1, in <module>
File "<string>", line 1, in <module>
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 169, in addpackage
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/util.py", line 14, in <module>
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/util.py", line 2, in <module>
exec(line)
File "<string>", line 1, in <module>
from contextlib import contextmanager
from . import abc
addpackage(sitedir, name, known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/contextlib.py", line 6, in <module>
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/abc.py", line 4, in <module>
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 169, in addpackage
addsitepackages(known_paths, [sys.prefix])
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/util.py", line 14, in <module>
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 350, in addsitepackages
from contextlib import contextmanager
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/contextlib.py", line 165, in <module>
exec(line)
File "<string>", line 1, in <module>
addsitedir(sitedir, known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 208, in addsitedir
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/util.py", line 14, in <module>
from contextlib import contextmanager
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/contextlib.py", line 5, in <module>
addpackage(sitedir, name, known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 160, in addpackage
f = io.TextIOWrapper(io.open_code(fullname))
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 779, in exec_module
File "<frozen importlib._bootstrap_external>", line 911, in get_code
File "<frozen importlib._bootstrap_external>", line 580, in _compile_bytecode
KeyboardInterrupt
from . import machinery
KeyboardInterrupt
from functools import wraps
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/functools.py", line 438, in <module>
from collections import deque
class _AsyncGeneratorContextManager(_GeneratorContextManagerBase,
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/collections/__init__.py", line 21, in <module>
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/abc.py", line 85, in __new__
cls = super().__new__(mcls, name, bases, namespace, **kwargs)
KeyboardInterrupt
from operator import itemgetter as _itemgetter, eq as _eq
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 779, in exec_module
_CacheInfo = namedtuple("CacheInfo", ["hits", "misses", "maxsize", "currsize"])
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/collections/__init__.py", line 394, in namedtuple
File "<frozen importlib._bootstrap_external>", line 911, in get_code
File "<frozen importlib._bootstrap_external>", line 580, in _compile_bytecode
Exception in thread QueueManagerThread:
Traceback (most recent call last):
KeyboardInterrupt
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/threading.py", line 932, in _bootstrap_inner
exec(s, namespace)
File "<string>", line 1, in <module>
ctrl + c
Program finished
KeyboardInterrupt
self.run()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/process.py", line 394, in _queue_management_worker
work_item.future.set_exception(bpe)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/_base.py", line 539, in set_exception
raise InvalidStateError(': !r'.format(self._state, self))
concurrent.futures._base.InvalidStateError: CANCELLED: <Future at 0x7ffed1f2f250 state=cancelled>
【问题讨论】:
【参考方案1】:Windows 解决方案
如果您在 Windows 上运行,那么 CTRL-C 中断处理似乎不适用于多处理池。以下内容有点笨拙,但似乎很流行。
这个想法是用一个初始设置为False
的全局变量ctrl_c_entered
初始化多处理池中的每个进程。我已经用一个方法foo
完成了你的课程TestClass
,这将是调用的工作函数。调用时必须:
-
测试全局标志
ctrl_c_entered
,如果为真,立即返回。
拥有自己的 KeyboardInterrupt 处理程序,在此类中断上,它必须将全局 ctrl_c_entered
标志设置为 True 并返回。
更新:但是,当池进程尚未将控制权转移到工作函数时,可以输入 CTRL-C。例如,它可能正在从输入队列中获取下一个要运行的任务。在这种情况下,否则不会有 try/catch
用于有效的 KeyboardInterrupt 异常。所以我们需要为池中的每个进程设置一个中断处理程序,用于将ctrl_c_entered
标志设置为True
。但这现在意味着原始,默认的SIGINT
中断处理程序必须在上面的步骤 2 中临时恢复,以便捕获 KeyboardInterrupt 异常。
您还必须让所有提交的异步任务完成。所以我们设置了一个signal.SIGINT
中断处理程序,如果输入了 CTRL-C,它将主进程的全局ctrl_c_entered
标志设置为True
(我们确实不打破asyncio.run(main()
声明。我们长时间运行的 asyncio 任务必须检查这个 ctrl_c_entered
标志,如果它设置为 True,则终止。
import asyncio
from concurrent.futures import ProcessPoolExecutor
import signal
import time
from functools import wraps
def handle_ctrl_c(func):
@wraps(func)
def wrapper(*args, **kwargs):
global ctrl_c_entered
if not ctrl_c_entered:
signal.signal(signal.SIGINT, default_sigint_handler) # the default
try:
return func(*args, **kwargs)
except KeyboardInterrupt:
ctrl_c_entered = True
return KeyboardInterrupt()
finally:
signal.signal(signal.SIGINT, pool_ctrl_c_handler)
else:
return KeyboardInterrupt()
return wrapper
class TestClass:
def __init__(self) -> None:
self.value1 = 1
self.value2 = 2
@handle_ctrl_c
def foo(self, i):
time.sleep(1)
return i ** 2
async def task(loop, executor_processes, i):
# If this is a long-running task, periodically check running flag and return if set.
# For example:
if ctrl_c_entered:
return KeyboardInterrupt()
print(f"[TASK i] Initializing Abck class")
new_test = await loop.run_in_executor(executor_processes, TestClass().foo, i)
# other async and sync functions contained in TestClass
print(f"[TASK i] Finished")
return new_test
def pool_ctrl_c_handler(*args, **kwargs):
global ctrl_c_entered
ctrl_c_entered = True
def init_pool():
# set global variable for each process in the pool:
global ctrl_c_entered
global default_sigint_handler
ctrl_c_entered = False
default_sigint_handler = signal.signal(signal.SIGINT, pool_ctrl_c_handler)
async def main():
executor_processes = ProcessPoolExecutor(max_workers=5, initializer=init_pool)
loop_ = asyncio.get_event_loop()
tasks = []
for i in range(1, 100):
tasks.append(task(loop_, executor_processes, i))
results = await asyncio.gather(*tasks)
print(results)
def ctrl_c_handler(*args, **kwargs):
global ctrl_c_entered
ctrl_c_entered = True
if __name__ == '__main__':
ctrl_c_entered = False
signal.signal(signal.SIGINT, ctrl_c_handler)
asyncio.run(main())
print('Program finished')
打印:
[TASK 1] Initializing Abck class
[TASK 2] Initializing Abck class
[TASK 3] Initializing Abck class
[TASK 4] Initializing Abck class
[TASK 5] Initializing Abck class
[TASK 6] Initializing Abck class
[TASK 7] Initializing Abck class
[TASK 8] Initializing Abck class
[TASK 9] Initializing Abck class
[TASK 10] Initializing Abck class
[TASK 11] Initializing Abck class
[TASK 12] Initializing Abck class
[TASK 13] Initializing Abck class
[TASK 14] Initializing Abck class
[TASK 15] Initializing Abck class
[TASK 16] Initializing Abck class
[TASK 17] Initializing Abck class
[TASK 18] Initializing Abck class
[TASK 19] Initializing Abck class
[TASK 1] Finished
[TASK 2] Finished
[TASK 3] Finished
[TASK 4] Finished
[TASK 5] Finished
[TASK 6] Finished
[TASK 7] Finished
[TASK 9] Finished
[TASK 8] Finished
[TASK 10] Finished
ctrl + c
ctrl + c
ctrl + c
ctrl + c
ctrl + c
[TASK 13] Finished
[TASK 16] Finished
[TASK 17] Finished
[TASK 18] Finished
[TASK 19] Finished
[TASK 14] Finished
[TASK 12] Finished
[TASK 11] Finished
[TASK 15] Finished
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100, KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt()]
使用 Fork 解决方案的 Linux 和平台
这更简单,因为中断处理或多或少适用于多处理池。处理这个问题的最简单方法是再次为每个池进程初始化一个全局running
标志,如果False
,工作函数可以定期检查并终止该标志。当用户输入 CTRL-C 时,每个池进程将设置一个 CTRL-C 处理程序并将running
设置为 False。这将负责终止任何已经运行的任务。主进程可以简单处理KeyboardInterrupt
异常:
import asyncio
from concurrent.futures import ProcessPoolExecutor
import signal
import time
class TestClass:
def __init__(self) -> None:
self.value1 = 1
self.value2 = 2
def foo(self):
for _ in range(20):
if not running:
return
time.sleep(.1)
async def task(loop, executor_processes, i):
print(f"[TASK i] Initializing Abck class")
new_test = await loop.run_in_executor(executor_processes, TestClass().foo)
# other async and sync functions contained in TestClass
print(f"[TASK i] Finished")
def ctrl_c_handler(*args, **kwargs):
global running
running = False
def init_pool():
global running
running = True
signal.signal(signal.SIGINT, ctrl_c_handler)
async def main():
executor_processes = ProcessPoolExecutor(max_workers=5, initializer=init_pool)
loop_ = asyncio.get_event_loop()
tasks = []
for i in range(1, 100):
tasks.append(task(loop_, executor_processes, i))
await asyncio.gather(*tasks)
try:
asyncio.run(main())
except KeyboardInterrupt:
print("ctrl + c")
print('Program finished')
【讨论】:
感谢您的回答,我也尝试过这个示例,但使用signal.signal(signal.SIGINT, signal.SIG_IGN)
和 done, pending = await asyncio.wait(tasks,return_when=asyncio.FIRST_EXCEPTION)
而不是 await asyncio.gather(*tasks)
并使用全局变量。它似乎有类似的效果。我发现有趣的是,如果在所有任务中初始化 TestClass 之前调用 ctrl+c 也会引发此错误。
其实我刚刚更新了Windows解决方案。请参阅新的第 3 步描述。当池进程实际上并未运行任务但可能即将运行时,可以输入 Ctrl-C 时有一个窗口。这可能会导致未处理的异常。查看更新的解决方案。
我尝试了您更新的代码,但在您回复的第 3 步尝试停止该场景中的脚本时,它引发了另一个错误 concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
。
我修改了 Windows 版本的 foo
以接受一个参数并返回一个结果(它平方它的参数)并让 main
打印出 20 个结果(从 100 减少以保持上市规模合理)。如果出现 Ctrl-C,则返回的结果是 KeyboardException
的实例。我在完成 20 个任务中的前 10 个后输入了 Ctrl-C。
我明白你的意思。我认为我不会立即找到解决方案。但是,我再次更新了 Windows 源代码。我创建了一个装饰器函数handle_ctrl_c
,您可以使用它来装饰您的池工作者函数。所有的中断处理都在这个装饰器中,我将全局running
标志切换为更直观的ctrl_c_entered
标志,最初是False
。以上是关于在完成所有任务之前关闭 ProcessPoolExecutor 的异步的主要内容,如果未能解决你的问题,请参考以下文章
如何在不关闭 Executor 的情况下等待 ThreadPoolExecutor 中的所有任务完成?