锁对象只能通过继承在进程之间共享

Posted

技术标签:

【中文标题】锁对象只能通过继承在进程之间共享【英文标题】:Lock objects should only be shared between processes through inheritance 【发布时间】:2021-12-22 17:00:18 【问题描述】:

我在一个对象中使用 multiprocessing.Pool 类并尝试以下操作:

from multiprocessing import Pool, Lock

class myobject:
    def __init__(self):
        self.Lock = Lock()
        self.file = open('someiterablefile')
    def function(self):
        self.lock.acquire()
        g = getNext(self.file)
        self.lock.release()
        return g
      
    def anotherfunction(self):
        pool = Pool()
        results = pool.map(self.function, range(10000))
        pool.close()
        pool.join()
        return results

但是,我收到一个运行时错误,指出。我对python和多线程相当陌生。我怎样才能走上正轨?

【问题讨论】:

始终将完整的错误消息(从单词“Traceback”开始)作为文本(不是屏幕截图,不是链接到外部门户)有问题(不是评论)。还有其他有用的信息。 也许你应该将self.Lock 发送到self.function 作为第二个参数。 您在函数定义中忘记了self。 - def function(self), def anotherfunction(self) 【参考方案1】:

您最终需要对您发布的代码进行很多更改:

    您已将您的类命名为object。这是一个内置的 Python 类,所有 Python 对象都从该类派生。 你永远不应该重新定义内置类。 方法functionanotherfunction 缺少参数self,但它们在它们的主体中引用变量self。这段代码永远不可能运行。 function 还需要接受第二个参数。 在方法function 中,您有self.lock.acquire 而不是self.lock.acquire()。所以你实际上并没有调用这个方法。以同样的方法引用getNextself.variable。这两个都没有定义。这不是minimal, reproducible example。 我不确定self.variable 应该是什么。但除非它是在共享内存中分配的东西或者是一个托管对象,例如通过调用multiprocessing.Manager().dict()创建的,当这个对象被序列化/反序列化到每次调用function时,进程池中的进程,它将始终使用调用map之前的初始值,并且函数function对该属性所做的任何更改都将在其中的副本中完成子进程的地址空间,并且永远不会被反射回主进程的副本。

因此,如果您克服了最初的错误,则需要解决所有其他问题。但是让我们来解决您的错误消息,为什么不可否认的是非常不具信息性。

multiprocessing.Lock 实例可以是multiprocessing.Process 的子类的属性。当主进程创建此类类的实例时,作为该实例属性的锁存在于主进程的地址空间中。当在实例上调用方法start 并且结果调用实例的run 方法时,必须将锁序列化/反序列化到子进程正在运行的新地址空间。这按预期工作:

from multiprocessing import Process, Lock

class P(Process):
    def __init__(self, *args, **kwargs):
        Process.__init__(self, *args, **kwargs)
        self.lock = Lock()

    def run(self):
        print(self.lock)

if __name__ == '__main__':
    p = P()
    p.start()
    p.join()

打印:

<Lock(owner=None)>

不幸的是,这在您处理多处理池时不起作用。在您的object 实例中,self.lock 是通过__init__ 方法在主进程中创建的。但是当调用Pool.map 调用self.function 时,锁无法序列化/反序列化到将运行此方法的已运行池进程。

解决方案是使用设置为该锁值的全局变量来初始化多处理池中的每个进程(实际上现在没有必要将该锁作为类的属性)。这样做的方法是使用 Pool 构造函数 See the documentation 的 initializerinitargs 参数:

from multiprocessing import Pool, Lock

def init_pool_processes(the_lock):
    """
    Initialize each process with global variable lock.
    """
    global lock
    lock = the_lock

class Test:
    def function(self, i):
        lock.acquire()
        with open('test.txt', 'a') as f:
            print(i, file=f)
        lock.release()

    def anotherfunction(self):
        lock = Lock()
        pool = Pool(initializer=init_pool_processes, initargs=(lock,))
        pool.map(self.function, range(10))
        pool.close()
        pool.join()

if __name__ == '__main__':
    t = Test()
    t.anotherfunction()

【讨论】:

谢谢,我修复了您引用的前 4 个问题。对困惑感到抱歉。在您列出的代码中,为什么 lock 在 anotherfunction() 中实例化后在 init_pool_processes 中定义为全局? anotherfunction 正在主进程中执行,function 是一个“工作函数”,由于您在多处理池中的一个进程调用pool.map 而被调用创建并因此在完全不同的进程/地址空间中运行。您需要将lock 实例从一个地址空间传递到另一个地址空间,而在处理多处理池时,唯一的方法是为池中的每个进程初始化一个带锁的全局变量。这就是“通过继承”初始化的方式。阅读我发布的链接中的文档。 那么worker函数是类方法的静态副本吗? 在 Windows 上启动一个新进程会创建一个新的空地址空间并启动一个新的 Python 解释器,该解释器重新读取源文件,在新地址中重新创建类定义及其方法可以调用工作函数之前的空间。因此,如果这就是您所说的“类方法的静态副本”,那么是的。

以上是关于锁对象只能通过继承在进程之间共享的主要内容,如果未能解决你的问题,请参考以下文章

12.9 线程与fork

Python 在进程之间共享锁

37. Python 多进程锁 多进程共享内存

如何使用共享内存而不是通过多个进程之间的酸洗来传递对象

31互斥锁与进程间通信

Python学习第20篇:互斥锁以及进程之间的三种通信方式(IPC)以及生产者个消费者模型