在子进程运行和通信时终止子进程,这些子进程通过队列干净地通信

Posted

技术标签:

【中文标题】在子进程运行和通信时终止子进程,这些子进程通过队列干净地通信【英文标题】:Terminating Subprocesses while they are running and communicating which communicate via queues cleanly 【发布时间】:2021-01-31 06:44:05 【问题描述】:

我正在做一个更大的项目,我有 2 个线程(相同的进程)和一个单独的进程。其中一个线程是 gui,另一个线程是哨兵线程,观察子进程,子进程正在用神经网络做一些繁重的工作。架构看起来有点像这样:

我需要能够取消神经网络的进程并分别结束哨兵线程。我创建了一个小例子,它展示了总体架构和我的方法。

from multiprocessing import Process, Queue
from threading import Thread
from time import sleep
 
 
class Worker(Process):
    # The worker resembles the neural network. It does some calculations and shares
    # the information via the queue.
    def __init__(self, queue: Queue):
        Process.__init__(self)
        self.queue = queue
 
    def run(self):
        i = 0
        while True:
            self.queue.put(i)
            i += 1
 
    def stop(self):
        # I used the stop function for trying out some things, like using a joinable 
        # queue and block execution as long as the queue is not empty, which is not 
        # working
        self.queue.put(None)
        self.terminate()
 
 
class Listener(Thread):
    # This class resembles the sentinel thread. It checks in an infinite loop for
    # messages. In the real application I send signals via the signals and slots
    # design pattern to the gui and display the sent information.
 
    def __init__(self):
        Thread.__init__(self)
        self.queue = Queue()
        self.worker = Worker(self.queue)
 
    def run(self):
        self.worker.start()
        while True:
            data = self.queue.get()
            if data is not None:
                print(data)
            else:
                break
        print("broken")
 
    def stop(self):
        self.worker.stop()
 
 
class System:
    # This class resembles the gui
 
    def __init__(self):
        self.listener = Listener()
 
    def start(self):
        self.listener.start()
 
    def stop(self):
        self.listener.stop()
 
 
if __name__ == "__main__":
    system = System()
    system.start()
    sleep(0.1)
    system.stop()

有什么问题?

只要一个进程读取或写入队列,和/或队列没有被正确清空,一个或两个进程就会变成僵尸进程,这在某种意义上基本上是死锁。因此,我需要找到一种方法在终止进程时正确处理队列,从而使进程终止时不会出错。

到目前为止我所做的尝试:

    对每个 task_done() 使用 Joinable Queue 和 join()

    重写 SIGTERM 信号处理程序以等待队列被清空

    使用可加入队列并且仅在 SIGTERM 信号处理程序中加入()

结果

    处理速度大大下降,但终止工作正常

    和 3. 终止不像我实现的那样工作 有时它有效,有时它没有。所以这种方法没有可靠的输出和知识

对 (3) 的尝试如下:

class Worker(Process):
 
    def __init__(self, queue: Queue):
        Process.__init__(self)
        self.queue = queue
        self.abort = False
        self.lock = Lock()
        signal(SIGTERM, self.stop)
 
    def run(self):
        i = 0
        while True:
            self.lock.acquire()
            if self.abort:
                break
            else:
                self.queue.put(i)
                i += 1
            self.lock.release()
        exit(0)
 
    def stop(self, sig, frame):
        self.abort = True
        self.queue.put(None)
        self.queue.join()
        exit(0)

【问题讨论】:

给系统加个心跳机制怎么样?让进程通信,它们每 N 秒就启动并运行一次。添加逻辑以在双方自 T 秒后未收到心跳时停止运行。 afaik 是队列中最大的问题。我需要工作进程停止将消息放入队列并让哨兵进程清理队列并获取所有消息。我还看不出心跳如何帮助解决这个问题。 为什么它不再有用了? (1) 如果没有收到来自哨兵的心跳,worker 将停止将消息放入队列。 (2) 如果 Sentinel 没有收到来自 worker 的心跳,它会清理队列并获取所有消息。 如果工作类不使用主循环进行计算,而是进行长时间的顺序操作,您对实现它有何建议? 【参考方案1】:

有多种可能的方法,但如果您希望在性能和稳健性之间取得折衷,我建议您仅使用 signal-handler 来设置 .running- 标志 在工作人员上,并在worker.run() 中使用while self.running 对其进行检查。循环中断后,您从 worker 发送哨兵值。这确保了哨兵值始终是队列中的最后一个值,并且所有值都由侦听器读取。这种布局一起允许工作人员正常关闭,同时仍然避免更昂贵的同步来检查退出条件。

from multiprocessing import Process, Queue
from functools import partial
from threading import Thread
from time import sleep
import signal


SENTINEL = 'SENTINEL'


def sigterm_handler(signum, frame, worker):
    worker.shutdown()


def register_sigterm(worker):
    global sigterm_handler
    sigterm_handler = partial(sigterm_handler, worker=worker)
    signal.signal(signal.SIGTERM, sigterm_handler)


class Worker(Process):

    def __init__(self, queue: Queue):
        Process.__init__(self)
        self.queue = queue
        self.running = False

    def run(self):
        register_sigterm(self)
        self.running = True
        i = 0
        while self.running:
            self.queue.put(i)
            i += 1
        self.queue.put(SENTINEL)

    def stop(self):  # called by parent
        self.terminate()

    def shutdown(self):  # called by child from signal-handler
        self.running = False


class Listener(Thread):

    def __init__(self):
        Thread.__init__(self)
        self.queue = Queue()
        self.worker = Worker(self.queue)

    def run(self):
        self.worker.start()
        for data in iter(self.queue.get, SENTINEL):
            print(data)

    def stop(self):
        self.worker.stop()
        self.worker.join()


class System:

    def __init__(self):
        self.listener = Listener()

    def start(self):
        self.listener.start()

    def stop(self):
        self.listener.stop()


if __name__ == "__main__":

    system = System()
    system.start()
    sleep(0.1)
    system.stop()

考虑以下实验。

这个想法是用猴子修补一个队列实例,在收到 SIGTERM 后,下一次调用 queue.put(),传递的值和指定的哨兵值被发送,queue.close()sys.exit() 被调用。这允许彻底关闭,同时避免重复的标志检查。

multiprocessing.Queue() 实际上只是multiprocessing.context.BaseContext 上的一个方法,返回一个预配置的multiprocessing.queues.Queue 实例。为了不干扰它,我选择了组合而不是继承。到目前为止的测试表明它工作得很好。

stqueue.py

import sys
import time
import signal
from functools import partial
from multiprocessing import current_process as curr_p


def _shutdown(self):
    self._xput = self.put
    self.put = self.final_put


def _final_put(self, obj):
    self._xput(obj)
    self._xput(self._xsentinel)
    self.close()
    sys.exit(0)


def _sigterm_handler(signum, frame, queue):
    print(f"[time.ctime(), curr_p().name] --- handling signal")
    queue.shutdown()


def register_sigterm_queue(queue, sentinel):
    """Monkey-patch queue-instance to shutdown process
    after next call to `queue.put()` upon receipt of SIGTERM.
    """
    queue._xsentinel = sentinel
    queue.shutdown = _shutdown.__get__(queue)
    queue.final_put = _final_put.__get__(queue)
    global _sigterm_handler
    _sigterm_handler = partial(_sigterm_handler, queue=queue)
    signal.signal(signal.SIGTERM, _sigterm_handler)

main.py

import time
from threading import Thread
import multiprocessing as mp
from multiprocessing import Process, Queue, current_process as curr_p

import numpy as np

from stqueue import register_sigterm_queue


SENTINEL = 'SENTINEL'


class Worker(Process):

    def __init__(self, queue: Queue):
        Process.__init__(self)
        self.queue = queue

    def run(self):
        register_sigterm_queue(self.queue, SENTINEL)  # <<<
        while True:
            print(f"[time.ctime(), curr_p().name] --- starting numpy")
            r = np.sum(
                np.unique(np.random.randint(0, 2500, 100_000_000))
            )
            print(f"[time.ctime(), curr_p().name] --- ending numpy")
            self.queue.put(r)

    def stop(self):  # called by parent
        self.terminate()

...


if __name__ == "__main__":

    import logging
    mp.log_to_stderr(logging.DEBUG)

    system = System()
    system.start()
    time.sleep(10)
    print(f"[time.ctime(), curr_p().name] --- sending signal")
    system.stop()
    print(f"[time.ctime(), curr_p().name] --- signal send")

示例输出:

[DEBUG/MainProcess] created semlock with handle 140000699432960
[DEBUG/MainProcess] created semlock with handle 140000699428864
[DEBUG/MainProcess] created semlock with handle 140000664752128
[DEBUG/MainProcess] Queue._after_fork()
[Sat Oct 24 21:59:58 2020, Worker-1] --- starting numpy
[DEBUG/Worker-1] recreated blocker with handle 140000699432960
[DEBUG/Worker-1] recreated blocker with handle 140000699428864
[DEBUG/Worker-1] recreated blocker with handle 140000664752128
[DEBUG/Worker-1] Queue._after_fork()
[INFO/Worker-1] child process calling self.run()
[DEBUG/Worker-1] Queue._start_thread()
[DEBUG/Worker-1] doing self._thread.start()
[DEBUG/Worker-1] starting thread to feed data to pipe
[DEBUG/Worker-1] ... done self._thread.start()
[Sat Oct 24 22:00:04 2020, Worker-1] --- ending numpy
[Sat Oct 24 22:00:04 2020, Worker-1] --- starting numpy
3123750
[Sat Oct 24 22:00:08 2020, MainProcess] --- sending signal
[Sat Oct 24 22:00:10 2020, Worker-1] --- handling signal
[DEBUG/Worker-1] telling queue thread to quit
[INFO/Worker-1] process shutting down
[DEBUG/Worker-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Worker-1] running the remaining "atexit" finalizers
[DEBUG/Worker-1] joining queue thread
[DEBUG/Worker-1] feeder thread got sentinel -- exiting
[DEBUG/Worker-1] ... queue thread joined
[INFO/Worker-1] process exiting with exitcode 0
[Sat Oct 24 22:00:10 2020, Worker-1] --- ending numpy
3123750
[Sat Oct 24 22:00:10 2020, MainProcess] --- signal send
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers

Process finished with exit code 0

【讨论】:

@ThomasChristopherDavies 如果没有循环,工人是否只发回一次结果?队列上的 wlock 不是递归的,因此如果您的信号处理程序尝试写入队列,而您的常规代码已经将其保存在后台,它就会死锁。 @ThomasChristopherDavies 那么神经网络也是多线程的?是的,如果您只是终止所涉及的进程,就会破坏队列,这不足为奇。但是我仍然不清楚为什么你不能让这个进程检查一个标志,而不是在它再次将某些东西放入队列之后或之前。 @ThomasChristopherDavies 如果有什么不清楚的地方可以在这​​里提问,这样未来的读者也可以从中受益。 @ThomasChristopherDavies 该解决方案修补队列,以便在收到 SIGTERM 时,处理程序本身发生的所有事情都是原始的 .put() 方法被替换为 ._final_put() 方法。然后下一次处理程序之外的常规代码调用.put(),它真的调用.final_put(),然后通过将最后一个项目+哨兵与原始.put()(保存为.xput())并引发@ 987654344@。这避免了从处理程序内部干扰同步原语,这可能导致死锁。 @ThomasChristopherDavies _shutdown.__get__(queue) 正在使函数 _shutdown() 成为队列实例 see 的绑定方法。不得不再次查找,这不是您经常需要做的。 partial 只是创建了一个包装器,它用我们正在修补的现有队列实例填充处理程序的 queue 参数,因此我们在处理程序中可以引用它。

以上是关于在子进程运行和通信时终止子进程,这些子进程通过队列干净地通信的主要内容,如果未能解决你的问题,请参考以下文章

通过单击按钮终止正在运行的子进程

Linux的SIGUSR1和SIGUSR2信号

与子进程通信,无需等待子进程在 windows 上终止

如何在父进程终止后终止所有子进程?

进程控制---子进程终止状态相关的宏

子进程在终止前获取结果