在 multiprocessing.Manager.dict() 中更新对象

Posted

技术标签:

【中文标题】在 multiprocessing.Manager.dict() 中更新对象【英文标题】:Update object inside a multiprocessing.Manager.dict() 【发布时间】:2020-07-29 14:21:37 【问题描述】:

我想知道如何更新分配为不同进程之间共享字典值的对象。我有以下课程:


class Task:

    STATUS_PROCESSING = 0
    STATUS_EXECUTING = 1
    STATUS_QUEUED = 2
    STATUS_TERMINATED = 3
    STATUS_HALTED = 4
    STATUS_STOPPED = 5

    def __init__(self, id: str, uuid: str, options: dict):
        self.id = id
        self.uuid = uuid
        self.options = options
        self.state = 0

    # Some properties...

    def execute(self):
        """ Executes the task
        """
        # Set self status to Executing
        self.state = Task.STATUS_EXECUTING

        print('Executing...')

        self.state = Task.STATUS_TERMINATED

它只是创建一个具有给定 ID 的新任务,并在调用 execute() 时执行其核心方法。我有另一个带有 staticmethods 的类,用于将一个新的对 (id, task) 附加到 dict,并读取执行其所有任务的 dict,直到主程序停止:

class DummyList:

    @staticmethod
    def submit_task(d: dict, uuid: str, options: dict):
        """ Submit a new task
        """
        # If invalid UUID
        if not Task.is_valid_uuid(uuid):
            return False

        # If more than 20 tasks
        if len(d) > 19:
            return False

        # Create random ID (simplified for question)
        r_id = str(random.randint(1, 2000000))
        if r_id in d:
            return False

        # Add task to the dictionary
        d[r_id] = Task(r_id, uuid, options)

        # Set status to queue
        d[r_id].state = Task.STATUS_QUEUED

        # Return the created ID
        return r_id

    @staticmethod
    def execute_forever(d):
        try:
            while True:
                for i in d.values():
                    print(i.state)
                    i.execute()
                time.sleep(5)
        except KeyboardInterrupt:
            pass

问题是DummyList.execute_forever() 将从另一个进程中调用,而主进程将执行submit_task(...) 函数以添加新任务。像这样:

        # Create a shared dict
        m = multiprocessing.Manager()
        shared_d = m.dict()

        # Start the Task shared list execution in another process
        p = multiprocessing.Process(target=DummyList.execute_forever, args=(shared_d,))
        # Set the process to exit when the main halts
        p.daemon = True
        p.start()

        ........


       # From another place
       # The message variable is not important
       DummyList.submit_task(shared_d, message['proc'], message['options'])

有效!任务已创建、分配给字典并执行,但以下行(在上面的代码中可以看到)没有正确执行:

self.state = Task.STATUS_EXECUTING
self.state = Task.STATUS_TERMINATED
d[r_id].state = Task.STATUS_QUEUED

如果我们尝试在整个代码中写入ìf shared_d[<some_id>].state == 0,它将始终为True,因为属性不会更新

我想这是因为当对象属性被修改时共享字典没有更新,可能是因为字典只知道他必须更新他的 getitemsetitem 方法被称为。您知道是否有任何方法可以改变这种行为?

非常感谢!

【问题讨论】:

【参考方案1】:

我终于找到了解决办法。除非调用代理字典中的 __getitem____setitem__ 方法,否则字典中的对象不会更新。这就是为什么我更改了以下几行:

任务

execute() 方法以return self 结尾。 self.state 必须在整个执行过程中更改。

任务管理器

方法改为:

@staticmethod
    def execute_forever(d):
        """ Infinite loop reading the queued tasks and executing all of them.
        """
        try:
            while True:
                # Notice the loop using the keys
                for i in d.keys():
                    # Execute and re-assign item
                    d[i] = d[i].execute()
                time.sleep(5)
        except KeyboardInterrupt:
            pass

【讨论】:

以上是关于在 multiprocessing.Manager.dict() 中更新对象的主要内容,如果未能解决你的问题,请参考以下文章

multiprocessing.Manager共享内存的问题记录

Ubuntu、cx_Freeze 和 multiprocessing.Manager() 在“spawn”类型进程的情况下发生冲突

进程用manager 和Queue 实现进程消费者生产者

在python中同步两个共享对象的读/写操作

Python多进程(multiprocessing)共享变量

python进程之间修改数据[Manager]与进程池[Pool]