如何使用多处理将对象从主进程传递到子进程

Posted

技术标签:

【中文标题】如何使用多处理将对象从主进程传递到子进程【英文标题】:How to pass object from main process to child process using multiprocessing 【发布时间】:2019-09-04 13:26:06 【问题描述】:

我尝试在一个类中使用 Multiprocessing。我使用 Multiprocessing.pipe() 将实例 o 从父进程传递到子进程。

        self.listofwritetags = self.collectwritetaglist()
        self.progressbar['value'] = 20
        self.frame.update_idletasks()
        self.alldevice = alldevices_V3.AllDevices(self.comm_object)
        self.progressbar['value'] = 40
        self.frame.update_idletasks()

        con1,con2 = multiprocessing.Pipe()
        con1.send(self.alldevice)
        con2.send(self.comm_object)        


        # Multithreading section
        # self.callmotor1dprocess = thread_with_trace(target=self.callallmotor1d,args=(self.comm_object,self.alldevice))
        self.callmotor1dprocess = multiprocessing.Process(target = self.callallmotor1d,args= (con1,con2))
        self.listofthread.append(self.callmotor1dprocess)



        self.button2.config(text="Initialized")
        initial = "True"
        self.progressbar.stop()

现在我调用所有的多处理启动

def startprocess(self):
        for item in self.listofthread:
            item.start()
        self.button3.config(text="started")

   def stopprocess(self):
        for item in self.listofthread:
            item.kill()

我在类内调用此代码。现在我应该在类外调用已执行的方法。

def callallmotor1d(con1,con2):
comobject = con1.recv()
devices = con2.recv()
while True:
    Allmotorprocessing.process(comobject, devices) 

但是我得到了一个很常见的错误:- 错误信息:-

Traceback(最近一次调用最后一次): _feed 中的文件“C:\Users\misu01\AppData\Local\Programs\Python\Python37\lib\multiprocessing\queues.py”,第 236 行 obj = _ForkingPickler.dumps(obj) 转储中的文件“C:\Users\misu01\AppData\Local\Programs\Python\Python37\lib\multiprocessing\reduction.py”,第 51 行 cls(buf, 协议).dump(obj) TypeError:无法腌制 _thread.lock 对象 回溯(最近一次通话最后): _feed 中的文件“C:\Users\misu01\AppData\Local\Programs\Python\Python37\lib\multiprocessing\queues.py”,第 236 行 obj = _ForkingPickler.dumps(obj) 转储中的文件“C:\Users\misu01\AppData\Local\Programs\Python\Python37\lib\multiprocessing\reduction.py”,第 51 行 cls(buf, 协议).dump(obj) TypeError: can't pickle _thread.lock objects

我不知道为什么要创建 thread.lock 对象。 为了避免这个错误,我尝试像这样修改我的 Alldevices 类和 comm_object 类:-

这是我的修改:-

类 AllDevices:

def __init__(self,comobject):
    self.mylock = threading.Lock()
    self.comobject = comobject
    self.dfM1D = pd.read_excel(r'C:\OPCUA\Working_VF1_5.xls', sheet_name='Motor1D')
    self.allmotor1dobjects = callallmotor1D_V3.Cal_AllMotor1D(self.dfM1D, self.comobject)


def __getstate__(self):
    state = vars(self).copy()
    # Remove the unpicklable entries.
    del state['mylock']
    return state

def __setstate__(self, state):
    # Restore instance attributes.
    vars(self).update(state)

这里是 comobject 类。

class General():
def __init__(self):
    self.client = Communication()
    self.mylock = threading.Lock()
    self.sta_con_plc = self.client.opc_client_connect()
    self.readgeneral = ReadGeneral(self.client.PLC)
    self.writegeneral = WriteGeneral(self.client.PLC)

def __getstate__(self):
    state = vars(self).copy()
    # Remove the unpicklable entries.
    del state['mylock']
    return state

def __setstate__(self, state):
    # Restore instance attributes.
    vars(self).update(state)

但我还是遇到了错误。

这是我的实现是否正确?

self.allmotor1dobjects = callallmotor1D_V2.Cal_AllMotor1D(self.dfM1D, self.comobject,self.logger)

这里 self.allmotor1dobjects 也是类实例。

喜欢:-

    self.client = Communication()        
    self.readgeneral = ReadGeneral(self.client.PLC)
    self.writegeneral = WriteGeneral(self.client.PLC)

这些也是类实例。

我从未使用过 thread.lock 这两个类中的任何一个。 我不知道它是如何创建的。

根据文档建议https://docs.python.org/3/library/pickle.html#pickling-class-instances 如果我使用 getstatesetstate,它应该会消除此错误。

在我的情况下它不起作用。

如何消除此错误。

我们将非常感谢您在这方面的任何帮助

【问题讨论】:

什么是devices?好像它们包含线程锁作为属性,不能腌制。 devices 是一个类对象......类名是 alldevices 可以初始化一些基本配置的东西。所以我不能通过管道传递一个类的对象 通常可以,但如果它们具有不可腌制的实例属性并且没有覆盖腌制行为以避免腌制它们,则不能。我已经发布了包含一些可能修复的答案,但对alldevice 或您的具体用例一无所知,我无法给出具体答案。 @SM2019 问题不在于你的对象是一个“类对象”(它不是,这意味着它是一个类,你的意思是一个 instance类,但 everything 是 Python 中类的实例)。问题是你的对象不能被腌制。 @ juanpa.arrivillaga ,谢谢你我现在明白了。 【参考方案1】:

您可以通过管道传递 大多数 类,但如果实例具有不可提取类型的属性,则不能。在这种情况下,alldevice 的实例(其实例存储在您存储为self.devices 的某个集合中)直接或间接具有threading.Lock 属性(错误消息显示_thread.lock,因为在所有抽象,这是 threading.Lock 返回的实例的实际类。

threading.Lock 不可选择(因为线程锁仅在给定进程中才有意义;即使您在另一个进程中重新创建它们,它们实际上也不会在进程之间提供任何类型的同步)。

如果alldevice 类在您的控制之下,您有几个选择:

    如果可能,请删除每个实例 threading.Lock(最简单的解决方案,但假设不需要同步,或者可以通过共享全局锁完成同步) 如果需要同步,并且必须跨进程操作,但不必针对每个实例,并且您使用的是类 UNIX 系统(Linux、BSD),则可以使用共享全局 multiprocessing.Lock() 代替;当Process 调用触发fork 时,它将被继承

    如果需要同步,并且您必须有一个每个实例的Lock(并且锁可以跨进程操作也可以),您可以将threading.Lock 替换为pickle 友好的multiprocessing.Manager 的@987654336 @。您需要在某处创建一个通用的multiprocessing.Manager() 实例(例如全局,或作为alldevice 的类属性),然后使用该管理器的Lock 而不是threading。简单例子:

    import multiprocessing
    
    class alldevice:
        MANAGER = multiprocessing.Manager()  # Shared manager for all alldevice instances
    
        def __init__(self, ... other args here ...):
            self.mylock = self.MANAGER.Lock()  # Make picklable lock
            # ... rest of initialization ...
    

    当不涉及multiprocessing 时,这将比threading.Lock 慢,因为它需要IPC 来锁定和解锁(ManagerLock 实例实际上是一个代理对象,在它实际运行的任何进程上与Manager 通信),并且它将跨进程锁定(如果您要锁定对无法从多个进程同时使用的实际硬件的访问,这可能是最好的),但它是相对的很简单。

    如果需要同步,但仅限于进程内,而不是跨进程,您可以控制酸洗过程以避免尝试酸洗threading.Lock,而是在另一侧取消酸洗时重新创建它。只需显式实现the pickle support methods 以避免酸洗Lock,并强制在另一侧重新创建它。示例:

    import copy
    
    class alldevice:
        def __init__(self, ... other args here ...):
            self.mylock = threading.Lock()  # Use regular lock
            # ... rest of initialization ...
    
        def __getstate__(self):
            state = vars(self).copy()  # Make copy of instance dict
            del state['mylock']        # Remove lock attribute
            return state
    
        def __setstate__(self, state):
            vars(self).update(state)
            self.mylock = threading.Lock()  # Make new lock on other side
    
        # If you ever make copies within the same thread, you may want to define
        # __deepcopy__ so the local copy process doesn't make a new lock:
        def __deepcopy__(self, memo):
            # Make new empty instance without invoking __init__
            newself = self.__class__.__new__(self.__class__)
            # Individually deepcopy attributes *except* mylock, which we alias
            for name, value in vars(self).items():
                # Cascading deepcopy for all other attributes
                if name != 'mylock':
                    value = copy.deepcopy(value, memo)
                setattr(newself, name, value)
            return newself
    

    __deepcopy__ 覆盖仅在您希望副本继续共享锁时才需要;否则,如果深层副本应表现为完全独立的实例,则可以省略它,最终会在副本中获得不相关的锁定。

如果您无法控制alldevice 类,但可以识别有问题的属性,您唯一的选择是为alldevice 注册一个copyreg 处理程序,以执行与选项#4 相同的基本操作,看起来像这样:

import copyreg

def unpickle_alldevice(state):
    self = alldevice.__new__(alldevice)  # Make empty alldevice
    vars(self).update(state)             # Update with provided state
    self.mylock = threading.Lock()       # Make fresh lock
    return self        

def pickle_alldevice(ad):
    state = vars(ad).copy()  # Make shallow copy of instance dict
    del state['mylock']      # Remove lock attribute
    return unpickle_alldevice, (state,)  # Return __reduce__ style info for reconstruction

# Register alternate pickler for alldevice
copyreg.pickle(alldevice, pickle_alldevice)

【讨论】:

我今天发现的 logger 造成了问题。实际上它创建了 thread.lock()。我使用 logger 在子进程期间记录事件。getstatesetsate 应该解决这个问题。

以上是关于如何使用多处理将对象从主进程传递到子进程的主要内容,如果未能解决你的问题,请参考以下文章

如何使用共享内存而不是通过多个进程之间的酸洗来传递对象

多处理进程对象中的字典解包

不断地将事件从主进程传递到渲染器进程

如何将ThreadLocal传递到子线程

多处理数据共享

将值从主窗体传递到子窗体