如何使用 asyncio 和 concurrent.futures.ProcessPoolExecutor 在 Python 中终止长时间运行的计算(CPU 绑定任务)?

Posted

技术标签:

【中文标题】如何使用 asyncio 和 concurrent.futures.ProcessPoolExecutor 在 Python 中终止长时间运行的计算(CPU 绑定任务)?【英文标题】:How to terminate long-running computation (CPU bound task) in Python using asyncio and concurrent.futures.ProcessPoolExecutor? 【发布时间】:2021-03-29 00:46:09 【问题描述】:

类似问题(但答案对我不起作用):How to cancel long-running subprocesses running using concurrent.futures.ProcessPoolExecutor?

与上面链接的问题和提供的解决方案不同,在我的情况下,计算本身相当长(受 CPU 限制)并且不能循环运行以检查是否发生了某些事件。

以下代码的精简版:

import asyncio
import concurrent.futures as futures
import time

class Simulator:
    def __init__(self):
        self._loop = None
        self._lmz_executor = None
        self._tasks = []
        self._max_execution_time = time.monotonic() + 60
        self._long_running_tasks = []

    def initialise(self):
        # Initialise the main asyncio loop
        self._loop = asyncio.get_event_loop()
        self._loop.set_default_executor(
            futures.ThreadPoolExecutor(max_workers=3))

        # Run separate processes of long computation task
        self._lmz_executor = futures.ProcessPoolExecutor(max_workers=3)

    def run(self):
        self._tasks.extend(
            [self.bot_reasoning_loop(bot_id) for bot_id in [1, 2, 3]]
        )

        try:
            # Gather bot reasoner tasks
            _reasoner_tasks = asyncio.gather(*self._tasks)
            # Send the reasoner tasks to main monitor task
            asyncio.gather(self.sample_main_loop(_reasoner_tasks))
            self._loop.run_forever()
        except KeyboardInterrupt:
            pass
        finally:
            self._loop.close()

    async def sample_main_loop(self, reasoner_tasks):
        """This is the main monitor task"""
        await asyncio.wait_for(reasoner_tasks, None)
        for task in self._long_running_tasks:
            try:
                await asyncio.wait_for(task, 10)
            except asyncio.TimeoutError:
                print("Oops. Some long operation timed out.")
                task.cancel()  # Doesn't cancel and has no effect
                task.set_result(None)  # Doesn't seem to have an effect

        self._lmz_executor.shutdown()
        self._loop.stop()
        print('And now I am done. Yay!')

    async def bot_reasoning_loop(self, bot):
        import math

        _exec_count = 0
        _sleepy_time = 15
        _max_runs = math.floor(self._max_execution_time / _sleepy_time)

        self._long_running_tasks.append(
            self._loop.run_in_executor(
                    self._lmz_executor, really_long_process, _sleepy_time))

        while time.monotonic() < self._max_execution_time:
            print("Bot#: thinking for s. Run /".format(
                    bot, _sleepy_time, _exec_count, _max_runs))
            await asyncio.sleep(_sleepy_time)
            _exec_count += 1

        print("Bot# Finished Thinking".format(bot))

def really_long_process(sleepy_time):
    print("I am a really long computation.....")
    _large_val = 9729379273492397293479237492734 ** 344323
    print("I finally computed this large value: ".format(_large_val))

if __name__ == "__main__":
    sim = Simulator()
    sim.initialise()
    sim.run()

这个想法是有一个主模拟循环运行和监控三个机器人线程。然后,这些机器人线程中的每一个都会执行一些推理,但也会使用ProcessPoolExecutor 启动一个非常长的后台进程,这可能最终会运行更长的阈值/最大执行时间来进行推理。

正如您在上面的代码中看到的那样,当发生超时时,我尝试.cancel() 这些任务。虽然这并没有真正取消实际的计算,它一直在后台发生,asyncio 循环直到所有长时间运行的计算完成后才会终止。

如何在方法中终止这种长时间运行的 CPU 密集型计算?

其他类似的 SO 问题,但不一定相关或有帮助:

    asyncio: Is it possible to cancel a future been run by an Executor? How to terminate a single async task in multiprocessing if that single async task exceeds a threshold time in Python Asynchronous multiprocessing with a worker pool in Python: how to keep going after timeout?

【问题讨论】:

【参考方案1】:

如何在方法中终止这种长时间运行的 CPU 密集型计算?

您尝试的方法不起作用,因为ProcessPoolExecutor 返回的期货不可取消。虽然 asyncio 的 run_in_executor tries 传播取消,但一旦任务开始执行,它只是 ignored by Future.cancel

没有根本原因。与线程不同,进程可以安全地终止,因此ProcessPoolExecutor.submit 完全有可能返回cancel 终止相应进程的未来。 Asyncio 协程具有明确定义的取消语义,并且可以自动使用它。不幸的是,ProcessPoolExecutor.submit 返回一个常规的concurrent.futures.Future,它假定底层执行器的最小公分母,并将运行的未来视为不可触及。

因此,要取消在子进程中执行的任务,必须完全绕过ProcessPoolExecutor 并管理自己的进程。挑战在于如何在不重新实现一半 multiprocessing 的情况下做到这一点。标准库提供的一个选项是(ab)使用multiprocessing.Pool 用于此目的,因为它支持可靠地关闭工作进程。 CancellablePool 可以按如下方式工作:

不是生成固定数量的进程,而是生成固定数量的 1-worker 池。 从异步协程将任务分配给池。如果协程在另一个进程中等待任务完成时被取消,terminate 单进程池并创建一个新的。 由于一切都是从单个异步线程协调的,因此不必担心竞争条件,例如意外杀死已经开始执行另一个任务的进程。 (如果要在 ProcessPoolExecutor 中支持取消,则需要防止这种情况。)

这是该想法的示例实现:

import asyncio
import multiprocessing

class CancellablePool:
    def __init__(self, max_workers=3):
        self._free = self._new_pool() for _ in range(max_workers)
        self._working = set()
        self._change = asyncio.Event()

    def _new_pool(self):
        return multiprocessing.Pool(1)

    async def apply(self, fn, *args):
        """
        Like multiprocessing.Pool.apply_async, but:
         * is an asyncio coroutine
         * terminates the process if cancelled
        """
        while not self._free:
            await self._change.wait()
            self._change.clear()
        pool = usable_pool = self._free.pop()
        self._working.add(pool)

        loop = asyncio.get_event_loop()
        fut = loop.create_future()
        def _on_done(obj):
            loop.call_soon_threadsafe(fut.set_result, obj)
        def _on_err(err):
            loop.call_soon_threadsafe(fut.set_exception, err)
        pool.apply_async(fn, args, callback=_on_done, error_callback=_on_err)

        try:
            return await fut
        except asyncio.CancelledError:
            pool.terminate()
            usable_pool = self._new_pool()
        finally:
            self._working.remove(pool)
            self._free.add(usable_pool)
            self._change.set()

    def shutdown(self):
        for p in self._working | self._free:
            p.terminate()
        self._free.clear()

显示取消的简约测试用例:

def really_long_process():
    print("I am a really long computation.....")
    large_val = 9729379273492397293479237492734 ** 344323
    print("I finally computed this large value: ".format(large_val))

async def main():
    loop = asyncio.get_event_loop()
    pool = CancellablePool()

    tasks = [loop.create_task(pool.apply(really_long_process))
             for _ in range(5)]
    for t in tasks:
        try:
            await asyncio.wait_for(t, 1)
        except asyncio.TimeoutError:
            print('task timed out and cancelled')
    pool.shutdown()

asyncio.get_event_loop().run_until_complete(main())

请注意 CPU 使用率如何从未超过 3 个内核,以及它如何在接近测试结束时开始下降,这表明进程正在按预期终止。

要将其应用于问题中的代码,请将self._lmz_executor 设为CancellablePool 的实例并将self._loop.run_in_executor(...) 更改为self._loop.create_task(self._lmz_executor.apply(...))

【讨论】:

这太棒了,可以按我的意愿工作。遗憾的是,asyncio 库实际上并没有终止在 ProcessPoolExecutor 中运行的进程。这对 ThreadPoolExecutor 有意义。我可能会为 Python 邮件列表保留这样的讨论。谢谢。 @Darkfish 我认为这对 Python 错误跟踪器来说是一个有价值的建议。 (尽管实际上它可能会被忽略,除非附带实现它的补丁。)另外,在生产中使用 tihs 之前要小心。我的第一次尝试是使用ProcessPoolExecutor 作为底层池,但是当有人用挂起的任务终止池时,该类非常不高兴 - 它的管理线程开始引发“错误的文件描述符”和类似的异常,他们无法被抓住。 multiprocessing 优雅地处理它,但我不确定是否有人真正测试过。 很高兴知道。现在我只是在实现一个研究原型,所以我想在发布之前必须对其进行彻底的测试。也许当我结束了这个,我会考虑修补或至少开始讨论 Python 列表。谢谢老兄。

以上是关于如何使用 asyncio 和 concurrent.futures.ProcessPoolExecutor 在 Python 中终止长时间运行的计算(CPU 绑定任务)?的主要内容,如果未能解决你的问题,请参考以下文章

concurrent.futures 和 asyncio.futures 有啥区别?

从 concurrent.futures 到 asyncio

concurrent.futures.Future 可以转换为 asyncio.Future 吗?

为啥 asyncio.Future 与 concurrent.futures.Future 不兼容?

Concurrency with asyncio

asyncio yield from concurrent.futures.Future of an Executor