从 Python 多处理中的排队进程获取错误标志/消息

Posted

技术标签:

【中文标题】从 Python 多处理中的排队进程获取错误标志/消息【英文标题】:Get error flag/message from a queued process in Python multiprocessing 【发布时间】:2021-12-30 13:47:45 【问题描述】:

我正在准备一个 Python 多处理工具,我在其中使用 ProcessQueue 命令。队列正在将另一个脚本放入并行运行的进程中。作为健全性检查,在队列中,我想检查我的其他脚本中是否发生任何错误,并在出现错误时返回标志/消息(status = os.system() 将运行该进程,status 是一个标志错误)。但是我无法将consumer 进程中的队列/子进程中的错误输出到父进程。以下是我的代码的主要部分(缩短):

import os
import time
from multiprocessing import Process, Queue, Lock

command_queue = Queue()
lock = Lock()

p = Process(target=producer, args=(command_queue, lock, test_config_list_path))
for i in range(consumer_num):
    c = Process(target=consumer, args=(command_queue, lock))
    consumers.append(c)

p.daemon = True
p.start()

for c in consumers:
    c.daemon = True
    c.start()

p.join()
for c in consumers:
    c.join()

if error_flag:
    Stop_this_process_and_send_a_message!



def producer(queue, lock, ...):
    for config_path in test_config_list_path:
        queue.put((config_path, process_to_be_queued))



def consumer(queue, lock):
    while True:
        elem = queue.get()
        if elem is None:
            return
        status = os.system(elem[1])
        if status:
            error_flag = 1
    time.sleep(3)

现在我想得到error_flag 并在主代码中使用它来处理事情。但似乎我无法将error_flagconsumer(子)部分输出到代码的主要部分。如果有人可以提供帮助,我将不胜感激。

【问题讨论】:

【参考方案1】:

鉴于您的更新,我还将multiprocessing.Event 实例传递给您的to_do 进程。这使您可以简单地在主进程中的事件上调用wait,这将阻塞直到调用set。自然地,当to_do 或其线程之一检测到脚本错误时,它会在将error_flag.value 设置为True 之后在事件上调用set。这将唤醒主进程,然后主进程可以在进程上调用方法terminate,这将执行您想要的操作。在正常完成to_do 时,仍然需要在事件上调用set,因为主进程在设置事件之前一直处于阻塞状态。但在这种情况下,主进程只会在进程上调用join

单独使用multiprocessing.Value 实例需要定期在循环中检查其值,所以我认为等待multiprocessing.Event 更好。我还使用 cmets 对您的代码进行了其他一些更新,因此请查看它们:

import multiprocessing
from ctypes import c_bool
...

def to_do(event, error_flag):
    # Run the tests
    wrapper_threads.main(event, error_flag)
    # on error or normal process completion:
    event.set()

def git_pull_change(path_to_repo):

    repo = Repo(path)
    current = repo.head.commit

    repo.remotes.origin.pull()
    if current == repo.head.commit:
        print("Repo not changed. Sleep mode activated.")
        # Call to time.sleep(some_number_of_seconds) should go here, right?
        return False
    else:
        print("Repo changed. Start running the tests!")
        return True

def main():
    while True:
        status = git_pull_change(git_path)
        if status:
            # The repo was just pulled, so no point in doing it again:
            #repo = Repo(git_path)
            #repo.remotes.origin.pull()
            event = multiprocessing.Event()
            error_flag = multiprocessing.Value(c_bool, False, lock=False)
            process = multiprocessing.Process(target=to_do, args=(event, error_flag))
            process.start()
            # wait for an error or normal process completion:
            event.wait()
            if error_flag.value:
                print('Error! breaking the process!!!!!!!!!!!!!!!!!!!!!!!')
                process.terminate() # Kill the process
            else:
                process.join()
            break

【讨论】:

谢谢。我已经使用了您最初的建议,只是将它提升了几个级别,现在它可以工作了。【参考方案2】:

您应该始终在您运行的平台上标记多处理问题。由于我没有在 if __name__ == '__main__': 块中看到您创建进程的代码,因此我不得不假设您运行在使用 OS fork 调用来创建新进程的平台上,例如 Linux。

这意味着您新创建的进程在创建时会继承 error_flag 的值,但出于所有意图和目的,如果进程修改此变量,它会修改存在于地址空间中的此变量的本地副本是该过程独有的。

您需要在共享内存中创建error_flag 并将其作为参数传递给您的进程:

from multiprocessing import Value
from ctypes import c_bool
...
error_flag = Value(c_bool, False, lock=False)
for i in range(consumer_num):
    c = Process(target=consumer, args=(command_queue, lock, error_flag))
    consumers.append(c)
...

if error_flag.value:
    ...
    #Stop_this_process_and_send_a_message!




def consumer(queue, lock, error_flag):
    while True:
        elem = queue.get()
        if elem is None:
            return
        status = os.system(elem[1])
        if status:
            error_flag.value = True
    time.sleep(3)

但我有一个问题/cmets 想问你。您在原始代码中有以下语句:

if error_flag:
    Stop_this_process_and_send_a_message!

但是这个语句位于之后你已经加入了所有启动的进程。那么有哪些进程要停止以及您将消息发送到哪里(您可能有多个消费者,其中任何一个都可能正在设置error_flag - 顺便说一句,由于设置了值,因此无需在锁定下完成此操作True 是一个原子动作)。而且由于您要加入所有进程,即等待它们完成,我不确定您为什么要让它们成为守护进程。您还将Lock 实例传递给您的生产者和消费者,但它根本没有被使用。

您的消费者在从队列中获得None 记录时返回。所以如果你有N个消费者,test_config_path的最后N个元素需要是None

我也认为不需要producer 进程。主进程也可以在启动消费者进程之前甚至之后将所有记录写入队列。

在函数consumer 结束时对time.sleep(3) 的调用无法访问。

【讨论】:

感谢 Booboo,这对理解如何处理这个 arg 很有帮助。但是,我的工具箱中有一个嵌套的多进程进程。一个进程正在在线检查变量,如果变量发生更改,它会并行触发另一个进程,该进程本身正在运行一个单独的多进程以并行执行一些测试。让我给你一个我的工具箱的总结/流程图。这里有字符限制,所以我将它作为答案发布。 我刚刚发布了完整的问题。【参考方案3】:

所以上面的代码总结是并行运行一些测试的内部过程。我从中删除了 def 函数部分,但假设它是以下代码摘要中的 wrapper_threads。在这里,我将添加正在检查变量的父进程(假设在我的 git 存储库中提交)。以下流程是无限期运行的,当有变化时,它将触发主要问题中的多进程:

def to_do():
    # Run the tests
    wrapper_threads.main()


def git_pull_change(path_to_repo):

    repo = Repo(path)
    current = repo.head.commit

    repo.remotes.origin.pull()
    if current == repo.head.commit:
        print("Repo not changed. Sleep mode activated.")
        return False
    else:
        print("Repo changed. Start running the tests!")
        return True

def main():
    process = None
    while True:
        status = git_pull_change(git_path)

    if status:
        repo = Repo(git_path)
        repo.remotes.origin.pull()
        process = multiprocessing.Process(target=to_do)
        process.start()

    if error_flag.value:
        print('Error! breaking the process!!!!!!!!!!!!!!!!!!!!!!!')
        os.system('pkill -U user XXX')
        break

现在我想将 error_flag 从子进程传播到这个进程并停止进程 XXX。问题是我不知道如何将 error_flag 带到这个(大)父进程。

【讨论】:

首先,你的缩进关闭了吗?在函数main 中,您将在循环中重复执行status = git_pull_change(git_path),并且永远不会测试status。因此,关于您是否甚至可以启动多个processes 的逻辑并不那么清楚,因为您将不断检查git pull 是否导致更改,或者一旦您启动to_do 进程,您将等到出现错误。似乎您的意图是启动多个进程,但我想确定一下。通过更正和澄清更新您的问题,并在您这样做后向我发表评论。 我猜你不能在中间或运行测试时重新拉取存储库,所以它只能是一次测试。

以上是关于从 Python 多处理中的排队进程获取错误标志/消息的主要内容,如果未能解决你的问题,请参考以下文章

什么是多线程,多进程?

如何在 Python 中限制和排队进程

python中的多处理[破池进程]

python多处理日志记录:带有RotatingFileHandler的QueueHandler“文件被另一个进程使用”错误

python多进程存数据不改变顺序

多处理在新控制台 python 中运行进程以获取每个进程的输入