Python - 如何从打印信息的单独进程中实时读取类实例

Posted

技术标签:

【中文标题】Python - 如何从打印信息的单独进程中实时读取类实例【英文标题】:Python - How to read a class instances in real time from a separate process that is printing the information 【发布时间】:2018-11-29 14:05:11 【问题描述】:

我有一段代码不断创建 Car 类的新实例。这样做时,类 Car 正在创建自身实例的列表,因此当我想获取当前实例的信息时,我可以轻松地做到这一点,如下面的代码所示:

from multiprocessing import Process
import time
class Car:

    car_list = list()
    def __init__(self, id, model):
        self.id = id
        self.model = model
        Car.car_list.append(self)

    @classmethod
    def get_current_instances(cls):
        return Car.car_list


class Interface:

    def print_current_system(self):
        while True:
            print(len(Car.get_current_instances()))
            time.sleep(1)



if __name__ == "__main__":

    interface = Interface()
    model = ["Toyota", "BMW"]

    [Car(i, model[i]) for i in range(len(model))]

    print_process = Process(target=interface.print_current_system)
    print_process.start()

    Car(2345, "Tesla")
    print("from main process " + str(len(Car.get_current_instances()))) 

出于问题的目的,此代码已简化。但是,问题仍然存在。我正在从一个新进程调用一个函数 print_current_system。此函数不断查看 Car 的所有当前实例并打印汽车的数量。

当我启动这个进程,然后再添加一些新的 Car 实例时,这些实例对另一个子进程是隐藏的,而对主进程是完全可见的。我很确定我需要使用队列或管道之类的东西。但是,我不确定如何。 这是上面代码的输出:

2
from main process 3
2
2
2
2
2

【问题讨论】:

进程间不共享内存,例如需要通过队列同步 @Netwave 但这将如何运作?我是否必须将 Car 的所有实例放入队列中?另外,当我调用 queue.get() 时,队列的大小会减少一倍吗?我不知道这是如何工作的。 是的,而不是推送到列表,您需要将您拥有的进程数推送到队列中,以便所有人都可以使用它。 【参考方案1】:

背景: 因为 Python 本质上是单线程的(解释器由 GIL 或全局解释器锁保护),所以其中没有真正的线程。相反,要达到相同的效果,您必须使用不同的过程,就像您在示例中所做的那样。因为这些是不同的进程,具有不同的解释器和不同的命名空间,所以您将无法从不同的进程访问一个进程中的正常数据。当您创建新进程时,python 解释器会分叉自己并复制所有 Python 对象,因此 Car.car_list 现在是两个不同的对象,每个进程中都有一个。因此,当一个进程添加到该列表时,它添加到的列表与另一个进程正在读取的列表不同。

答案:您的预感是正确的:您将希望使用多处理模块中的一种数据结构。这些数据结构被专门编写为“线程安全”(我猜在这种情况下实际上是“进程安全”)并在后台编组两个进程之间的共享数据。

示例:您可以使用全局队列,其中“生产者”进程添加项目,“消费者”进程删除它们并将它们添加到自己的列表副本中。

from multiprocessing import Queue

class Car:

    global_queue = Queue()
    _car_list = [] # This member will be up-to-date in the producer
                   # process. In that process, access it directly.
                   # In the consumer process, call get_car_list instead.
                   # This can be wrapped in an interface which knows
                   # which process it is in, so the calling code does
                   # not have to keep track.

    def __init__(self, id, model):
        self.id = id
        self.model = model
        self.global_queue.put(self)
        self._car_list.append(self)

    @classmethod
    def get_car_list(cls):
        """ Get the car list for the consumer process

            Note: do not call this from the producer process
        """
        # Before returning the car list, pull all pending cars off the queue
        # while cls.global_queue.qsize() > 0:
        # qsize is not implemented on some unix systems
        while not cls.global_queue.empty():
            cls._car_list.append(cls.global_queue.get())
        return cls._car_list

注意:使用上面的代码,您只能有一个数据消费者。如果其他进程调用 get_car_list 方法,它们将从队列中删除待处理的汽车,而消费者进程将不会收到它们。如果您需要拥有多个消费者进程,则需要采用不同的方法。

【讨论】:

我只需要一个来自它自己线程的消费者。但是我不需要,偶尔从主进程中获取汽车列表。也许这不是问题,因为我不需要队列,在这种情况下,我可以从不同的类方法中获取汽车列表,对吧? 是的,您可以从任一进程中获取汽车列表。我将更新我的示例以说明这一点。 哦,我想你的意思是写 Car.global_queue.put(self),对吧?而不是 self.global_queue.put(self)。 两者都是有效的,因为 global_queue 是一个类级别的成员。如果类的名称发生变化,使用“self”更易于维护(类似于我在 get_car_list 方法中使用“cls”的方式),但您可能会发现使用“Car._car_list”更具可读性。这是一种风格选择。 我尝试了你的建议,但它给出了一个错误:return self._maxsize - self._sem._semlock._get_value() NotImplementedError。错误行是 'while cls.global_queue.qsize() >0:' 这是获取队列大小的正确方法吗? ***.com/questions/41952413/…【参考方案2】:

如果您只想计算您拥有的汽车数量,您可以使用Value 之类的共享内存对象。

您只需对代码进行一些更改即可实现您想要的:

from multiprocessing import Process, Value
import time

class Car:

    car_list = list()
    car_quantity = Value('i', 0)     # Use a shared memory object here.

    def __init__(self, id, model):
        self.id = id
        self.model = model
        Car.car_list.append(self)
        Car.car_quantity.value += 1  # Update quantity

    @classmethod
    def get_current_instances(cls):
        return Car.car_list


class Interface:

    def print_current_system(self):
        while True:
            print(Car.car_quantity.value)  # Just print the value of the memory shared object (Value).
            time.sleep(1)



if __name__ == "__main__":

    interface = Interface()
    model = ["Toyota", "BMW"]

    [Car(i, model[i]) for i in range(len(model))]

    print_process = Process(target=interface.print_current_system)
    print_process.start()

    time.sleep(3)   # Added here in order you can see the 
                    # ouptut changing from 2 to 3.

    Car(2345, "Tesla")
    print("from main process " + str(len(Car.get_current_instances()))) 

输出:

2
2
2
from main process 3
3
3
3
3
3

【讨论】:

不,我也可以使用对象内部的内容。

以上是关于Python - 如何从打印信息的单独进程中实时读取类实例的主要内容,如果未能解决你的问题,请参考以下文章

在没有flush()和新行的子进程输出上进行非阻塞读取

如何在 Python 中实现机会/概率? [复制]

如何在ASP.NET MVC中实现报表打印和导出

如何从 websocket(客户端)打印流信息?

如何在多进程系统中实现锁定?

在python中实现格式化输出的方法