Python 多处理:如何启动相互依赖的进程?

Posted

技术标签:

【中文标题】Python 多处理:如何启动相互依赖的进程?【英文标题】:Python multiprocessing: How to start processes that depend on each other? 【发布时间】:2021-12-26 12:39:33 【问题描述】:

我有一个关于 Python 多处理方法的基本问题,即如何以最佳方式启动使用队列传输数据的不同进程。

为此,我使用了一个简单的例子

    数据已收到 数据已处理 数据已发送

所有上面的步骤应该通过三个不同的过程并行发生。

这里是示例代码:

import multiprocessing
import keyboard
import time

def getData(queue_raw):
    for num in range(1000):
        queue_raw.put(num)
        print("getData: put "+ str(num)+" in queue_raw")
    while True:
        if keyboard.read_key() == "s":
            break

def calcFeatures(queue_raw, queue_features):
    while not queue_raw.empty():
        data = queue_raw.get()
        queue_features.put(data**2)
        print("calcFeatures: put "+ str(data**2)+" in queue_features")

def sendFeatures(queue_features):
    while not queue_features.empty():
        feature = queue_features.get()
        print("sendFeatures: put "+ str(feature)+" out")

if __name__ == "__main__":

    queue_raw = multiprocessing.Queue()
    queue_features = multiprocessing.Queue()

    processes = [

        multiprocessing.Process(target=getData, args=(queue_raw,)),
        multiprocessing.Process(target=calcFeatures, args=(queue_raw, queue_features,)),
        multiprocessing.Process(target=sendFeatures, args=(queue_features,))
    ]

    processes[0].start()
    time.sleep(0.1)
    processes[1].start()
    time.sleep(0.1)
    processes[2].start()

    #for p in processes:
    #    p.start()
    for p in processes:
        p.join()

这个程序有效,但我的问题是关于不同进程的开始。 理想情况下,process[1] 应该仅在 process[0] 将数据放入 queue_raw 时启动;而process[2] 应该只在process[1] 将计算的特征放入queue_features 时启动。

现在我是通过 time.sleep() 函数完成的,这是次优的,因为我不一定知道这些过程需要多长时间。 我也尝试过类似的东西:

processes[0].start()
while queue_raw.empty():
    time.sleep(0.5)
processes[1].start()

但它不起作用,因为只估计了第一个过程。有什么方法可以完成这个过程取决于开始吗?

【问题讨论】:

启动所有进程并将它们设计为无限睡眠循环,除非有一些工作要做(在相应队列中找到数据)? 如果队列中没有数据,Queue.get() 应该阻塞。你确定要睡在那里吗? (docs.python.org/3/library/…) @moooeeeep 是的,根据文档,情况并非如此。但是,如果我将睡眠语句排除在外,程序宁愿不执行其他进程,或者它会执行,但由于队列仍然是空的,进程已经完成。然后键盘中断也不起作用。我在没有打印语句的情况下也进行了相同的测试,但它仍然无法正常工作。所以我认为我通常缺少一个基本概念,当传递的队列在进程之间互换使用时,进程是如何启动的 您应该将退出条件更改为while True,或者检查其他一些标志以通知进程退出。 示例:***.com/questions/48569731/… 【参考方案1】:

@moooeeeep 指出了正确的评论。 使用while not queue.empty(): 进行检查并不是等到数据实际在队列中!

通过哨兵对象(此处为None)和while True 循环的方法将强制进程等待,直到其他进程将数据放入队列:

FLAG_STOP=False
while FLAG_STOP is False:
    data = queue_raw.get()  # get will wait
    if data is None:
        # Finish analysis
        FLAG_STOP = True
    else:
        # work with data

【讨论】:

以上是关于Python 多处理:如何启动相互依赖的进程?的主要内容,如果未能解决你的问题,请参考以下文章

并发 并行 同步 异步 多线程 阻塞 非阻塞的区别

当某些项目相互依赖时,如何运行异步进程列表?

SpringBoot子模块相互依赖打包

如何同步三个相互依赖的任务的循环执行?

并发,并行,同步,异步的区别

Python进阶 - 多线程多进程基础