如何将相同的队列放入不同的多处理文件?

Posted

技术标签:

【中文标题】如何将相同的队列放入不同的多处理文件?【英文标题】:How do I get the same queue into differnet multiprocessing files? 【发布时间】:2021-09-30 04:37:07 【问题描述】:

我看到很多关于如何使用队列的教程,但它们总是显示它们在同一个文件中实现。我试图从一开始就很好地组织我的代码文件,因为我预计项目会变得非常大。如何获取我在主文件中初始化的队列以导入其他函数文件?

这是我的主文件:

import multiprocessing
import queue
from data_handler import data_handler
from get_info import get_memory_info
from get_info import get_cpu_info


if __name__ == '__main__':

    q = queue.Queue()

    getDataHandlerProcess = multiprocessing.Process(target=data_handler(q))
    getMemoryInfoProcess = multiprocessing.Process(target=get_memory_info(q))
    getCPUInfoProcess = multiprocessing.Process(target=get_cpu_info(q))

    getDataHandlerProcess.start()
    getMemoryInfoProcess.start()
    getCPUInfoProcess.start()

    print("DEBUG: All tasks successfully started.")

这是我的制作人:

import psutil
import struct
import time
from data_frame import build_frame


def get_cpu_info(q):
    while True:
        cpu_string_data = bytes('', 'utf-8')
        cpu_times = psutil.cpu_percent(interval=0.0, percpu=True)
        for item in cpu_times:
            cpu_string_data = cpu_string_data + struct.pack('<d',item)
        cpu_frame = build_frame(cpu_string_data, 0, 0, -1, -1)
        q.put(cpu_frame)
        print(cpu_frame)
        time.sleep(1.000)


def get_memory_info(q):
    while True:
        memory_string_data = bytes('', 'utf-8')
        virtual_memory = psutil.virtual_memory()
        swap_memory = psutil.swap_memory()
        memory_info = list(virtual_memory+swap_memory)
        for item in memory_info:
            memory_string_data = memory_string_data + struct.pack('<d',item)
        memory_frame = build_frame(memory_string_data, 0, 1, -1, -1)
        q.put(memory_frame)
        print(memory_frame)
        time.sleep(1.000)


def get_disk_info(q):
    while True:
        disk_usage = psutil.disk_usage("/")
        disk_io_counters = psutil.disk_io_counters()
        time.sleep(1.000)
        print(disk_usage)
        print(disk_io_counters)


def get_network_info(q):
    while True:
        net_io_counters = psutil.net_io_counters()
        time.sleep(1.000)
        print(net_io_counters)

这是我的消费者:

def data_handler(q):
    while True:
        next_element = q.get()
        print(next_element)
        print('Item received at data handler queue.')

【问题讨论】:

将生产者和消费者抽象到库中,然后将它们导入到一个文件中?与不是单个父进程的子进程或兄弟进程的进程共享队列(尽管并非不可能)非常困难。看来你已经这样做了,所以我不太明白这个问题...... 【参考方案1】:

我并不完全清楚“如何获取我在主文件中初始化的队列以导入其他函数文件?”是什么意思。 通常,您将队列作为参数传递给函数,并在函数范围内使用它,而不管文件结构如何。或者执行用于任何其他数据类型的任何其他变量共享技术。

但是,您的代码似乎有一些错误。首先,您不应该将queue.Queuemultiprocessing 一起使用。它有它自己的那个类的版本。

q = multiprocessing.Queue()

它比queue.Queue 慢,但它适用于跨进程共享数据。

其次,创建流程对象的正确方法是:

getDataHandlerProcess = multiprocessing.Process(target=data_handler, args = (q,))

否则,您实际上是在调用主线程data_handler(q) 并尝试将其返回值分配给multiprocessing.Processtarget 参数。你的data_handler 函数永远不会返回,所以程序可能在多处理开始之前就进入了无限死锁。编辑:实际上它可能会进入无限等待,试图从一个永远不会被填充的空队列中获取一个元素。

【讨论】:

您的建议非常有效,谢谢!问题是我没有正确使用 .Process() 函数。添加 args 使其正确地统计过程。谢谢!

以上是关于如何将相同的队列放入不同的多处理文件?的主要内容,如果未能解决你的问题,请参考以下文章

Gradle 设置以处理具有相同代码库但依赖项不同的两种部署类型

带指针的多处理和 ctypes

如何使用 nodejs 将消息发送到 RabbitMQ 队列

将7个不同的小球放入4个不同盒子中,每个盒子都不空,则不同的方法中种数有

在python中使用线程时如何保留文件写入顺序

如何将DynamoDB放入请求放入队列