Python 进程池非守护进程?

Posted

技术标签:

【中文标题】Python 进程池非守护进程?【英文标题】:Python Process Pool non-daemonic? 【发布时间】:2011-10-21 22:15:52 【问题描述】:

是否可以创建一个非守护进程的 python 池?我希望一个池能够调用一个内部有另一个池的函数。

我想要这个,因为守护进程无法创建进程。具体会导致报错:

AssertionError: daemonic processes are not allowed to have children

例如,假设function_a 有一个运行function_b 的池,而该池有一个运行function_c 的池。此函数链将失败,因为function_b 正在守护进程中运行,而守护进程无法创建进程。

【问题讨论】:

AFAIK,不,不可能池中的所有工作人员都被守护,并且不可能 注入依赖项,顺便说一句,我不明白你问题的第二部分I want a pool to be able to call a function that has another pool inside 以及它如何干扰工人被守护的事实。 因为如果函数 a 有一个运行函数 b 的池,而函数 b 有一个运行函数 c 的池,那么 b 存在一个问题,即它正在守护进程中运行,而守护进程无法创建进程。 AssertionError: daemonic processes are not allowed to have children 不要导入为from multiprocessing import Pool,而是使用from concurrent.futures import ProcessPoolExecutor as Pool 【参考方案1】:

从 Python 3.8 开始,concurrent.futures.ProcessPoolExecutor 没有此限制。它可以有一个嵌套的进程池完全没有问题:

from concurrent.futures import ProcessPoolExecutor as Pool
from itertools import repeat
from multiprocessing import current_process
import time

def pid():
    return current_process().pid

def _square(i):  # Runs in inner_pool
    square = i ** 2
    time.sleep(i / 10)
    print(f'pid()= i= square=')
    return square

def _sum_squares(i, j):  # Runs in outer_pool
    with Pool(max_workers=2) as inner_pool:
        squares = inner_pool.map(_square, (i, j))
    sum_squares = sum(squares)
    time.sleep(sum_squares ** .5)
    print(f'pid()=, i=, j= sum_squares=')
    return sum_squares

def main():
    with Pool(max_workers=3) as outer_pool:
        for sum_squares in outer_pool.map(_sum_squares, range(5), repeat(3)):
            print(f'pid()= sum_squares=')

if __name__ == "__main__":
    main()

以上演示代码使用 Python 3.8 测试。

但是,ProcessPoolExecutor 的一个限制是它没有maxtasksperchild。如果您需要这个,请考虑使用answer by Massimiliano。

信用:answer by jfs

【讨论】:

这显然是最好的解决方案,因为它只需要很少的改动。 完美运行! ...作为旁注使用孩子 - multiprocessing.PoolProcessPoolExecutor.Pool 内也是可能的! 不幸的是这对我不起作用,仍然得到daemonic processes are not allowed to have children @RoyShilkrot 您使用的是哪个版本的 Python? python 3.7。问题是这是从 Celery 运行的,我不得不使用 import billiard as multiprocessing 并使用他们的 Pool【参考方案2】:

我遇到的问题是尝试在模块之间导入全局变量,导致 ProcessPool() 行被多次评估。

globals.py

from processing             import Manager, Lock
from pathos.multiprocessing import ProcessPool
from pathos.threading       import ThreadPool

class SingletonMeta(type):
    def __new__(cls, name, bases, dict):
        dict['__deepcopy__'] = dict['__copy__'] = lambda self, *args: self
        return super(SingletonMeta, cls).__new__(cls, name, bases, dict)

    def __init__(cls, name, bases, dict):
        super(SingletonMeta, cls).__init__(name, bases, dict)
        cls.instance = None

    def __call__(cls,*args,**kw):
        if cls.instance is None:
            cls.instance = super(SingletonMeta, cls).__call__(*args, **kw)
        return cls.instance

    def __deepcopy__(self, item):
        return item.__class__.instance

class Globals(object):
    __metaclass__ = SingletonMeta
    """     
    This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children
     
    The root cause is that importing this file from different modules causes this file to be reevalutated each time, 
    thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug    
    """
    def __init__(self):
        print "%s::__init__()" % (self.__class__.__name__)
        self.shared_manager      = Manager()
        self.shared_process_pool = ProcessPool()
        self.shared_thread_pool  = ThreadPool()
        self.shared_lock         = Lock()        # BUG: Windows: global name 'lock' is not defined | doesn't affect cygwin

然后从代码中的其他地方安全导入

from globals import Globals
Globals().shared_manager      
Globals().shared_process_pool
Globals().shared_thread_pool  
Globals().shared_lock         

我在这里围绕pathos.multiprocessing 编写了一个更扩展的包装类:

https://github.com/JamesMcGuigan/python2-timeseries-datapipeline/blob/master/src/util/MultiProcessing.py

附带说明,如果您的用例只需要异步多进程映射作为性能优化,那么 joblib 将在后台管理您的所有进程池并允许使用这种非常简单的语法:

squares = Parallel(-1)( delayed(lambda num: num**2)(x) for x in range(100) )
https://joblib.readthedocs.io/

【讨论】:

【参考方案3】:

当错误似乎是误报时,这提供了一种解决方法。与noted by James 一样,这可能发生在来自守护进程的无意导入

例如,如果您有以下简单代码,WORKER_POOL 可能会无意中从工作人员那里导入,从而导致错误。

import multiprocessing

WORKER_POOL = multiprocessing.Pool()

一种简单但可靠的解决方法是:

import multiprocessing
import multiprocessing.pool


class MyClass:

    @property
    def worker_pool(self) -> multiprocessing.pool.Pool:
        # Ref: https://***.com/a/63984747/
        try:
            return self._worker_pool  # type: ignore
        except AttributeError:
            # pylint: disable=protected-access
            self.__class__._worker_pool = multiprocessing.Pool()  # type: ignore
            return self.__class__._worker_pool  # type: ignore
            # pylint: enable=protected-access

在上述解决方法中,MyClass.worker_pool 可以使用而不会出现错误。如果您认为这种方法可以改进,请告诉我。

【讨论】:

【参考方案4】:

我不得不在 Python 3.7 中使用非守护程序池,并最终调整了已接受答案中发布的代码。下面是创建非守护池的 sn-p:

import multiprocessing.pool

class NoDaemonProcess(multiprocessing.Process):
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, value):
        pass


class NoDaemonContext(type(multiprocessing.get_context())):
    Process = NoDaemonProcess

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class NestablePool(multiprocessing.pool.Pool):
    def __init__(self, *args, **kwargs):
        kwargs['context'] = NoDaemonContext()
        super(NestablePool, self).__init__(*args, **kwargs)

由于multiprocessing 的当前实现已被广泛重构为基于上下文,我们需要提供一个NoDaemonContext 类,该类具有我们的NoDaemonProcess 作为属性。然后NestablePool 将使用该上下文而不是默认上下文。

也就是说,我应该警告这种方法至少有两个警告:

    它仍然取决于multiprocessing 包的实现细节,因此随时可能中断。 multiprocessing 使非守护进程的使用变得如此困难是有正当理由的,其中许多已在here 中得到解释。我认为最引人注目的是:

至于允许子线程使用 子进程冒着创建一小群僵尸的风险 'grandchildren' 如果父线程或子线程之前终止 子进程完成并返回。

【讨论】:

关于警告:我的用例是并行化任务,但孙子将信息返回给他们的父母,而他们的父母又将信息返回给他们的父母之后 做一些必要的本地处理。因此,每个级别/分支都明确等待其所有叶子。如果您明确地必须等待生成的进程完成,该警告是否仍然适用? 你会麻烦添加如何使用它而不是 multiprocessing.pool 吗? “您现在可以互换使用 multiprocessing.Pool 和 NestablePool”。【参考方案5】:

在某些 Python 版本中,将标准池替换为自定义可能会引发错误:AssertionError: group argument must be None for now

Here我找到了可以提供帮助的解决方案:

class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, val):
        pass


class NoDaemonProcessPool(multiprocessing.pool.Pool):

    def Process(self, *args, **kwds):
        proc = super(NoDaemonProcessPool, self).Process(*args, **kwds)
        proc.__class__ = NoDaemonProcess

        return proc

【讨论】:

【参考方案6】:

我见过有人使用celerymultiprocessing 的分支billiard(多处理池扩展)来处理这个问题,它允许守护进程产生子进程。解决方法是简单地将 multiprocessing 模块替换为:

import billiard as multiprocessing

【讨论】:

【参考方案7】:

multiprocessing 模块有一个很好的接口来使用带有进程线程的池。根据您当前的用例,您可能会考虑将multiprocessing.pool.ThreadPool 用于您的外部池,这将导致线程(允许从内部生成进程),而不是进程。

它可能受到 GIL 的限制,但在我的特殊情况下(我测试了两者),从外部 Pool 创建的进程 here 的启动时间远远超过了解决方案ThreadPool


Processes 换成Threads 真的很容易。详细了解如何使用ThreadPool 解决方案here 或here。

【讨论】:

谢谢 - 这对我帮助很大 - 在这里很好地使用了线程(以产生实际执行良好的进程) 对于寻找可能适用于他们情况的实用解决方案的人来说,这就是解决方案。 用户选择 process 池可能是 CPU-bound 和/或需要可取消的任务,因此线程不是一个选项。这并不能真正回答问题。【参考方案8】:

multiprocessing.pool.Pool 类在其__init__ 方法中创建工作进程,使它们成为守护进程并启动它们,并且在它们启动之前无法将它们的daemon 属性重新设置为False(并且之后不再允许)。但是您可以创建自己的 multiprocesing.pool.Pool 子类(multiprocessing.Pool 只是一个包装函数)并替换您自己的 multiprocessing.Process 子类,它始终是非守护进程,用于工作进程。

以下是如何执行此操作的完整示例。重要的部分是顶部的 NoDaemonProcessMyPool 两个类,最后在 MyPool 实例上调用 pool.close()pool.join()

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import multiprocessing
# We must import this explicitly, it is not imported by the top-level
# multiprocessing module.
import multiprocessing.pool
import time

from random import randint


class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
    Process = NoDaemonProcess

def sleepawhile(t):
    print("Sleeping %i seconds..." % t)
    time.sleep(t)
    return t

def work(num_procs):
    print("Creating %i (daemon) workers and jobs in child." % num_procs)
    pool = multiprocessing.Pool(num_procs)

    result = pool.map(sleepawhile,
        [randint(1, 5) for x in range(num_procs)])

    # The following is not really needed, since the (daemon) workers of the
    # child's pool are killed when the child is terminated, but it's good
    # practice to cleanup after ourselves anyway.
    pool.close()
    pool.join()
    return result

def test():
    print("Creating 5 (non-daemon) workers and jobs in main process.")
    pool = MyPool(5)

    result = pool.map(work, [randint(1, 5) for x in range(5)])

    pool.close()
    pool.join()
    print(result)

if __name__ == '__main__':
    test()

【讨论】:

我刚刚在 Linux 和 Python 2.6/2.7/3.2 OS X 上使用 Python 2.7/3.2(修复“打印”行之后)再次测试了我的代码。在 OS X 上运行 Linux 和 Python 2.7/3.2很好,但代码确实与 OS X (Lion) 上的 Python 2.6 一起挂起。这似乎是多处理模块中的一个错误,已修复,但我还没有真正检查过错误跟踪器。 谢谢!在 Windows 上,您还需要致电 multiprocessing.freeze_support() 干得好。如果有人因此而发生内存泄漏,请尝试使用“with closing(MyPool(processes=num_cpu)) as pool:”来正确处理池 使用MyPool而不是默认的Pool有什么缺点?换句话说,为了换取启动子进程的灵活性,我要付出什么代价? (如果没有成本,大概标准的Pool 会使用非守护进程)。 @machen 是的,不幸的是,这是真的。在 Python 3.6 中,Pool 类已被广泛重构,因此Process 不再是一个简单的属性,而是一个方法,它返回从 context 获取的流程实例。我尝试覆盖此方法以返回 NoDaemonPool 实例,但这会导致使用 Pool 时出现异常 AssertionError: daemonic processes are not allowed to have children

以上是关于Python 进程池非守护进程?的主要内容,如果未能解决你的问题,请参考以下文章

python基础-守护进程守护线程守护非守护并行

041.Python守护进程,锁信号量和事件

python下编写守护进程

python学习笔记——守护进程

python之守护进程

Python全栈之路模块----之-----守护进程进程锁队列生产者消费者模式