如何用 ProcessPoolExecutor 优雅地终止 loop.run_in_executor?
Posted
技术标签:
【中文标题】如何用 ProcessPoolExecutor 优雅地终止 loop.run_in_executor?【英文标题】:How to terminate loop.run_in_executor with ProcessPoolExecutor gracefully? 【发布时间】:2020-09-03 17:53:25 【问题描述】:如何优雅地终止loop.run_in_executor
和ProcessPoolExecutor
?启动程序后不久,发送 SIGINT (ctrl + c)。
def blocking_task():
sleep(3)
async def main():
exe = concurrent.futures.ProcessPoolExecutor(max_workers=4)
loop = asyncio.get_event_loop()
tasks = [loop.run_in_executor(exe, blocking_task) for i in range(3)]
await asyncio.gather(*tasks)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print('ctrl + c')
max_workers
等于或小于任务数,一切正常。但是如果max_workers
更大,上面代码的输出如下:
Process ForkProcess-4:
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
call_item = call_queue.get(block=True)
File "/usr/lib/python3.8/multiprocessing/queues.py", line 97, in get
res = self._recv_bytes()
File "/usr/lib/python3.8/multiprocessing/connection.py", line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
File "/usr/lib/python3.8/multiprocessing/connection.py", line 414, in _recv_bytes
buf = self._recv(4)
File "/usr/lib/python3.8/multiprocessing/connection.py", line 379, in _recv
chunk = read(handle, remaining)
KeyboardInterrupt
ctrl + c
我想只捕获一次异常 (KeyboardInterrupt) 并忽略或静音进程池中的其他异常,但如何?
更新额外积分:
您能解释一下多重异常(原因)吗? 添加信号处理程序是否适用于 Windows? 如果没有,是否有在没有信号处理程序的情况下工作的解决方案?【问题讨论】:
【参考方案1】:您可以使用ProcessPoolExecutor
的initializer
parameter 在每个进程中为SIGINT
安装一个处理程序。
更新:
在 Unix 上,当创建进程时,它成为其父进程组的成员。如果您使用Ctrl+C
生成SIGINT
,则信号正在发送到整个进程组。
import asyncio
import concurrent.futures
import os
import signal
import sys
from time import sleep
def handler(signum, frame):
print('SIGINT for PID=', os.getpid())
sys.exit(0)
def init():
signal.signal(signal.SIGINT, handler)
def blocking_task():
sleep(15)
async def main():
exe = concurrent.futures.ProcessPoolExecutor(max_workers=5, initializer=init)
loop = asyncio.get_event_loop()
tasks = [loop.run_in_executor(exe, blocking_task) for i in range(2)]
await asyncio.gather(*tasks)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print('ctrl + c')
Ctrl-C
开始后不久:
^CSIGINT for PID= 59942
SIGINT for PID= 59943
SIGINT for PID= 59941
SIGINT for PID= 59945
SIGINT for PID= 59944
ctrl + c
【讨论】:
谢谢@Alex。参考链接应该指向 ProcessPoolExecutor :) 忽略子进程 (signal.signal(signal.SIGINT, signal.SIG_IGN)
) 中的 SIGINT 而不是调用 sys.exit(0)
会更干净。如果子进程被脏终止,exe.shutdown()
可能会永远阻塞,而该进程永远不会返回以上是关于如何用 ProcessPoolExecutor 优雅地终止 loop.run_in_executor?的主要内容,如果未能解决你的问题,请参考以下文章
Day794.如何用协程来优化多线程业务 -Java 性能调优实战