如何用 ProcessPoolExecutor 优雅地终止 loop.run_in_executor?

Posted

技术标签:

【中文标题】如何用 ProcessPoolExecutor 优雅地终止 loop.run_in_executor?【英文标题】:How to terminate loop.run_in_executor with ProcessPoolExecutor gracefully? 【发布时间】:2020-09-03 17:53:25 【问题描述】:

如何优雅地终止loop.run_in_executorProcessPoolExecutor?启动程序后不久,发送 SIGINT (ctrl + c)。

def blocking_task():
    sleep(3)

async def main():
    exe = concurrent.futures.ProcessPoolExecutor(max_workers=4)
    loop = asyncio.get_event_loop()
    tasks = [loop.run_in_executor(exe, blocking_task) for i in range(3)]
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print('ctrl + c')

max_workers 等于或小于任务数,一切正常。但是如果max_workers更大,上面代码的输出如下:

Process ForkProcess-4:
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
    call_item = call_queue.get(block=True)
  File "/usr/lib/python3.8/multiprocessing/queues.py", line 97, in get
    res = self._recv_bytes()
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 414, in _recv_bytes
    buf = self._recv(4)
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
KeyboardInterrupt
ctrl + c

我想只捕获一次异常 (KeyboardInterrupt) 并忽略或静音进程池中的其他异常,但如何?


更新额外积分:

您能解释一下多重异常(原因)吗? 添加信号处理程序是否适用于 Windows? 如果没有,是否有在没有信号处理程序的情况下工作的解决方案?

【问题讨论】:

【参考方案1】:

您可以使用ProcessPoolExecutorinitializer parameter 在每个进程中为SIGINT 安装一个处理程序。

更新: 在 Unix 上,当创建进程时,它成为其父进程组的成员。如果您使用Ctrl+C 生成SIGINT,则信号正在发送到整个进程组。

import asyncio
import concurrent.futures
import os
import signal
import sys
from time import sleep


def handler(signum, frame):
    print('SIGINT for PID=', os.getpid())
    sys.exit(0)


def init():    
    signal.signal(signal.SIGINT, handler)


def blocking_task():
    sleep(15)


async def main():
    exe = concurrent.futures.ProcessPoolExecutor(max_workers=5, initializer=init)
    loop = asyncio.get_event_loop()
    tasks = [loop.run_in_executor(exe, blocking_task) for i in range(2)]
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print('ctrl + c')

Ctrl-C 开始后不久:

^CSIGINT for PID= 59942
SIGINT for PID= 59943
SIGINT for PID= 59941
SIGINT for PID= 59945
SIGINT for PID= 59944
ctrl + c

【讨论】:

谢谢@Alex。参考链接应该指向 ProcessPoolExecutor :) 忽略子进程 (signal.signal(signal.SIGINT, signal.SIG_IGN)) 中的 SIGINT 而不是调用 sys.exit(0) 会更干净。如果子进程被脏终止,exe.shutdown() 可能会永远阻塞,而该进程永远不会返回

以上是关于如何用 ProcessPoolExecutor 优雅地终止 loop.run_in_executor?的主要内容,如果未能解决你的问题,请参考以下文章

Day794.如何用协程来优化多线程业务 -Java 性能调优实战

如何用python实现算法,得到两个城市间的最优路径,综合考虑油费和过路费

第3章 如何用DAP仿真器下载程序

操作系统 如何用python批量修改文件创建时间

如何用pca做人脸识别 python实现

90%的人会遇到性能问题,如何用1行代码快速定位?