从 Python 多处理中的排队进程获取错误标志/消息
Posted
技术标签:
【中文标题】从 Python 多处理中的排队进程获取错误标志/消息【英文标题】:Get error flag/message from a queued process in Python multiprocessing 【发布时间】:2021-12-30 13:47:45 【问题描述】:我正在准备一个 Python 多处理工具,我在其中使用 Process
和 Queue
命令。队列正在将另一个脚本放入并行运行的进程中。作为健全性检查,在队列中,我想检查我的其他脚本中是否发生任何错误,并在出现错误时返回标志/消息(status = os.system()
将运行该进程,status
是一个标志错误)。但是我无法将consumer
进程中的队列/子进程中的错误输出到父进程。以下是我的代码的主要部分(缩短):
import os
import time
from multiprocessing import Process, Queue, Lock
command_queue = Queue()
lock = Lock()
p = Process(target=producer, args=(command_queue, lock, test_config_list_path))
for i in range(consumer_num):
c = Process(target=consumer, args=(command_queue, lock))
consumers.append(c)
p.daemon = True
p.start()
for c in consumers:
c.daemon = True
c.start()
p.join()
for c in consumers:
c.join()
if error_flag:
Stop_this_process_and_send_a_message!
def producer(queue, lock, ...):
for config_path in test_config_list_path:
queue.put((config_path, process_to_be_queued))
def consumer(queue, lock):
while True:
elem = queue.get()
if elem is None:
return
status = os.system(elem[1])
if status:
error_flag = 1
time.sleep(3)
现在我想得到error_flag
并在主代码中使用它来处理事情。但似乎我无法将error_flag
从consumer
(子)部分输出到代码的主要部分。如果有人可以提供帮助,我将不胜感激。
【问题讨论】:
【参考方案1】:鉴于您的更新,我还将multiprocessing.Event
实例传递给您的to_do
进程。这使您可以简单地在主进程中的事件上调用wait
,这将阻塞直到调用set
。自然地,当to_do
或其线程之一检测到脚本错误时,它会在将error_flag.value
设置为True
之后在事件上调用set
。这将唤醒主进程,然后主进程可以在进程上调用方法terminate
,这将执行您想要的操作。在正常完成to_do
时,仍然需要在事件上调用set
,因为主进程在设置事件之前一直处于阻塞状态。但在这种情况下,主进程只会在进程上调用join
。
单独使用multiprocessing.Value
实例需要定期在循环中检查其值,所以我认为等待multiprocessing.Event
更好。我还使用 cmets 对您的代码进行了其他一些更新,因此请查看它们:
import multiprocessing
from ctypes import c_bool
...
def to_do(event, error_flag):
# Run the tests
wrapper_threads.main(event, error_flag)
# on error or normal process completion:
event.set()
def git_pull_change(path_to_repo):
repo = Repo(path)
current = repo.head.commit
repo.remotes.origin.pull()
if current == repo.head.commit:
print("Repo not changed. Sleep mode activated.")
# Call to time.sleep(some_number_of_seconds) should go here, right?
return False
else:
print("Repo changed. Start running the tests!")
return True
def main():
while True:
status = git_pull_change(git_path)
if status:
# The repo was just pulled, so no point in doing it again:
#repo = Repo(git_path)
#repo.remotes.origin.pull()
event = multiprocessing.Event()
error_flag = multiprocessing.Value(c_bool, False, lock=False)
process = multiprocessing.Process(target=to_do, args=(event, error_flag))
process.start()
# wait for an error or normal process completion:
event.wait()
if error_flag.value:
print('Error! breaking the process!!!!!!!!!!!!!!!!!!!!!!!')
process.terminate() # Kill the process
else:
process.join()
break
【讨论】:
谢谢。我已经使用了您最初的建议,只是将它提升了几个级别,现在它可以工作了。【参考方案2】:您应该始终在您运行的平台上标记多处理问题。由于我没有在 if __name__ == '__main__':
块中看到您创建进程的代码,因此我不得不假设您运行在使用 OS fork
调用来创建新进程的平台上,例如 Linux。
这意味着您新创建的进程在创建时会继承 error_flag
的值,但出于所有意图和目的,如果进程修改此变量,它会修改存在于地址空间中的此变量的本地副本是该过程独有的。
您需要在共享内存中创建error_flag
并将其作为参数传递给您的进程:
from multiprocessing import Value
from ctypes import c_bool
...
error_flag = Value(c_bool, False, lock=False)
for i in range(consumer_num):
c = Process(target=consumer, args=(command_queue, lock, error_flag))
consumers.append(c)
...
if error_flag.value:
...
#Stop_this_process_and_send_a_message!
def consumer(queue, lock, error_flag):
while True:
elem = queue.get()
if elem is None:
return
status = os.system(elem[1])
if status:
error_flag.value = True
time.sleep(3)
但我有一个问题/cmets 想问你。您在原始代码中有以下语句:
if error_flag:
Stop_this_process_and_send_a_message!
但是这个语句位于之后你已经加入了所有启动的进程。那么有哪些进程要停止以及您将消息发送到哪里(您可能有多个消费者,其中任何一个都可能正在设置error_flag
- 顺便说一句,由于设置了值,因此无需在锁定下完成此操作True
是一个原子动作)。而且由于您要加入所有进程,即等待它们完成,我不确定您为什么要让它们成为守护进程。您还将Lock
实例传递给您的生产者和消费者,但它根本没有被使用。
您的消费者在从队列中获得None
记录时返回。所以如果你有N个消费者,test_config_path
的最后N个元素需要是None
。
我也认为不需要producer
进程。主进程也可以在启动消费者进程之前甚至之后将所有记录写入队列。
在函数consumer
结束时对time.sleep(3)
的调用无法访问。
【讨论】:
感谢 Booboo,这对理解如何处理这个 arg 很有帮助。但是,我的工具箱中有一个嵌套的多进程进程。一个进程正在在线检查变量,如果变量发生更改,它会并行触发另一个进程,该进程本身正在运行一个单独的多进程以并行执行一些测试。让我给你一个我的工具箱的总结/流程图。这里有字符限制,所以我将它作为答案发布。 我刚刚发布了完整的问题。【参考方案3】:所以上面的代码总结是并行运行一些测试的内部过程。我从中删除了 def 函数部分,但假设它是以下代码摘要中的 wrapper_threads
。在这里,我将添加正在检查变量的父进程(假设在我的 git 存储库中提交)。以下流程是无限期运行的,当有变化时,它将触发主要问题中的多进程:
def to_do():
# Run the tests
wrapper_threads.main()
def git_pull_change(path_to_repo):
repo = Repo(path)
current = repo.head.commit
repo.remotes.origin.pull()
if current == repo.head.commit:
print("Repo not changed. Sleep mode activated.")
return False
else:
print("Repo changed. Start running the tests!")
return True
def main():
process = None
while True:
status = git_pull_change(git_path)
if status:
repo = Repo(git_path)
repo.remotes.origin.pull()
process = multiprocessing.Process(target=to_do)
process.start()
if error_flag.value:
print('Error! breaking the process!!!!!!!!!!!!!!!!!!!!!!!')
os.system('pkill -U user XXX')
break
现在我想将 error_flag
从子进程传播到这个进程并停止进程 XXX
。问题是我不知道如何将 error_flag
带到这个(大)父进程。
【讨论】:
首先,你的缩进关闭了吗?在函数main
中,您将在循环中重复执行status = git_pull_change(git_path)
,并且永远不会测试status
。因此,关于您是否甚至可以启动多个processes
的逻辑并不那么清楚,因为您将不断检查git pull
是否导致更改,或者一旦您启动to_do
进程,您将等到出现错误。似乎您的意图是启动多个进程,但我想确定一下。通过更正和澄清更新您的问题,并在您这样做后向我发表评论。
我猜你不能在中间或运行测试时重新拉取存储库,所以它只能是一次测试。以上是关于从 Python 多处理中的排队进程获取错误标志/消息的主要内容,如果未能解决你的问题,请参考以下文章
python多处理日志记录:带有RotatingFileHandler的QueueHandler“文件被另一个进程使用”错误