如何使 ProcessPoolExecutor 中的任务表现得像守护进程?

Posted

技术标签:

【中文标题】如何使 ProcessPoolExecutor 中的任务表现得像守护进程?【英文标题】:How to make tasks in ProcessPoolExecutor behave like daemon process? 【发布时间】:2019-05-21 11:39:52 【问题描述】:

Python 3.6.6

代码如下:

import asyncio
import time
from concurrent.futures import ProcessPoolExecutor


executor_processes = ProcessPoolExecutor(2)


def calculate():
    while True:
        print("while")
        time.sleep(1)


async def async_method():
    loop_ = asyncio.get_event_loop()
    loop_.run_in_executor(executor_processes, calculate)
    await asyncio.sleep(1)
    print("finish sleep")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(async_method())
    print("main_thread is finished")

输出:

同时 完成睡眠 main_thread 已完成 而 而 ...

我希望子进程将被终止,就像使用守护进程属性生成进程时一样:

import asyncio
import time
import multiprocessing


def calculate():
    while True:
        print("while")
        time.sleep(1)


async def async_method():
    proc = multiprocessing.Process(target=calculate)
    proc.daemon = True
    proc.start()
    await asyncio.sleep(1)
    print("finish sleep")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(async_method())
    print("main_thread is finished")

输出:

同时 完成睡眠 main_thread 已完成

问题:如何将loop_.run_in_executor(executor_processes, calculate) 的行为改为“类似守护进程”?

【问题讨论】:

【参考方案1】:

您展示的代码显然只是一个小示例,用于展示您希望实现的目标。我们不知道您的实际任务/问题。但老实说,我不相信你是在正确的道路上。

ProcessPoolExecutorconcurrent.futures 标准库包的一部分。它在调用submit() 时向调用者返回FutureFuture 是尚未完成的计算结果的代理。这是一个承诺;尽管在这种情况下,该术语在技术上并不完全正确。请参阅Wiki page 了解区别。

这意味着计算预计在有限时间内完成并产生结果。这就是为什么 Python 中的 ThreadPoolExecutorProcessPoolExecutor 实现不允许您产生恶魔工人。要求一个您实际上并不希望实现的结果的承诺没有多大意义。

你怎样才能实现你的目标?

1 - 子类 ProcessPoolExecutor? 您可以拦截新进程的创建和启动以潜入 _adjust_process_count() 中的 p.daemon = True。然而,由于concurrent.futures 的设计并没有考虑到无限期运行的任务,所以这并没有多大帮助。与multiprocessing 不同,concurrent.futures.process 定义了一个不考虑守护进程的exit handler。它只是尝试join() 一切,这可能需要一些时间进行无限循环。

2 - 定义您自己的退出处理程序!您可以同时执行 multiprocessingconcurrent.futures.process 所做的事情:定义一个退出处理程序,当您的 Python 进程即将结束时进行清理去关机。 atexit 可以提供帮助:

import atexit

executor_processes = ProcessPoolExecutor(2)

def calculate():
    while True:
        print("while")
        time.sleep(1)

def end_processes():
    [proc.terminate() for proc in multiprocessing.active_children()]

async def async_method():
    [...]

if __name__ == '__main__':
    atexit.register(end_processes)
    loop = asyncio.get_event_loop()
    [...]

注意:这将终止所有在进程结束时处于活动状态的子进程。如果您想要优雅地关闭子进程,请保留句柄并在代码中的指令结束之前执行此操作。 另请注意,进程可以拒绝遵守terminate()kill() 是你最后的手段。

【讨论】:

以上是关于如何使 ProcessPoolExecutor 中的任务表现得像守护进程?的主要内容,如果未能解决你的问题,请参考以下文章

如何选择 ProcessPoolExecutor 进程的 python 解释器?

如何使用 asyncio 和 concurrent.futures.ProcessPoolExecutor 在 Python 中终止长时间运行的计算(CPU 绑定任务)?

如何使用 asyncio 和 concurrent.futures.ProcessPoolExecutor 在 Python 中终止长时间运行的计算(CPU 绑定任务)?

如何将具有多个参数的函数传递给 python concurrent.futures.ProcessPoolExecutor.map()?

Python:concurrent.futures 如何使其可取消?

如何让 concurrent.futures ProcessPoolExecutor 与字典一起工作?