尝试通过运行 Tkinter 的发送进程在进程之间通过管道发送任何内容时出现管道损坏错误

Posted

技术标签:

【中文标题】尝试通过运行 Tkinter 的发送进程在进程之间通过管道发送任何内容时出现管道损坏错误【英文标题】:Broken pipe error when trying to send anything over pipe between processes with sending process running Tkinter 【发布时间】:2021-12-23 00:39:33 【问题描述】:

我正在使用来自multiprocessing 模块(Python 3.8)的PipeProcess。我的初始程序如下所示:

from multiprocessing import Process, Pipe


class Process1(object):
    def __init__(self, pipe_out):
        self.pipe_out = pipe_out

        self.run()

    def run(self):
        try:
            while True:
                print("Sending message to process 2")
                self.pipe_out.send(["hello"])
        except KeyboardInterrupt:
            pass


class Process2(object):
    def __init__(self, pipe_in):
        self.pipe_in = pipe_in

        self.run()

    def run(self):
        try:
            while self.pipe_in.poll():
                request = self.pipe_in.recv()
                method = request[0]
                args = request[1:]

                try:
                    getattr(self, method + "_callback")(*args)
                except AttributeError as ae:
                    print("Unknown callback received from pipe", str(ae))

            print("Process 2 done with receiving")
        except KeyboardInterrupt:
            pass

    def hello_callback(self):
        print("Process 1 said hello")


class Controller(object):
    def __init__(self):
        pipe_proc1_out, pipe_proc2_in = Pipe()

        self.proc1 = Process(
            target=Process1,
            args=(pipe_proc1_out, )
        )

        self.proc2 = Process(
            target=Process2,
            args=(pipe_proc2_in, )
        )

    def run(self):
        try:
            self.proc1.start()
            self.proc2.start()

            while True:
                continue
        except KeyboardInterrupt:
            print("Quitting processes...")
            self.proc1.join(1)
            if self.proc1.is_alive():
                self.proc1.terminate()

            self.proc2.join(1)
            if self.proc2.is_alive():
                self.proc2.terminate()

            print("Finished")


def pipes():
    c = Controller()
    c.run()


if __name__ == "__main__":
    pipes()

我有一个Controller 实例,它一直运行到收到键盘中断为止。它还处理Process1Process2两个进程,前者不断发送,后者不断接收。

上面的代码是一个涉及复杂 GUI (PySide)、图像处理 (OpenCV) 和游戏引擎 (Panda3D) 的大型项目的框架。所以我尝试添加 Tkinter 作为 GUI 示例:

from multiprocessing import Process, Pipe
import tkinter as tk


class Process1(tk.Frame):
    def __init__(self, pipe_out):
        self.pipe_out = pipe_out

        self.setup_gui()
        self.run()

    def setup_gui(self):
        self.app = tk.Tk()
        lb1 = tk.Label(self.app, text="Message:")
        lb1.pack()
        self.ent1 = tk.Entry(self.app)
        self.ent1.pack()
        btn1 = tk.Button(self.app, text="Say hello to other process",
                         command=self.btn1_clicked)
        btn1.pack()

    def btn1_clicked(self):
        msg = self.ent1.get()
        self.pipe_out.send(["hello", msg])

    def run(self):
        try:
            self.app.mainloop()
        except KeyboardInterrupt:
            pass


class Process2(object):
    def __init__(self, pipe_in):
        self.pipe_in = pipe_in

        self.run()

    def run(self):
        try:
            while self.pipe_in.poll():
                request = self.pipe_in.recv()
                method = request[0]
                args = request[1:]

                try:
                    getattr(self, method + "_callback")(*args)
                except AttributeError as ae:
                    print("Unknown callback received from pipe", str(ae))

            print("Process 2 done with receiving")
        except KeyboardInterrupt:
            pass

    def hello_callback(self, msg):
        print("Process 1 say\"" + msg + "\"")


class Controller(object):
    def __init__(self):
        pipe_proc1_out, pipe_proc2_in = Pipe()

        self.proc1 = Process(
            target=Process1,
            args=(pipe_proc1_out, )
        )

        self.proc2 = Process(
            target=Process2,
            args=(pipe_proc2_in, )
        )

    def run(self):
        try:
            self.proc1.start()
            self.proc2.start()

            while True:
                continue
        except KeyboardInterrupt:
            print("Quitting processes...")
            self.proc1.join(1)
            if self.proc1.is_alive():
                self.proc1.terminate()

            self.proc2.join(1)
            if self.proc2.is_alive():
                self.proc2.terminate()

            print("Finished")


def pipes():
    c = Controller()
    c.run()


if __name__ == "__main__":
    pipes()

请注意,目前 Tkinter 窗口只有在“父”进程通过键盘中断时才能关闭。

每当我单击按钮并调用按钮的命令时,我的程序都会进入错误状态并显示以下消息:

Exception in Tkinter callback
Traceback (most recent call last):
  File "C:\Users\USER\Anaconda3\envs\THS\lib\tkinter\__init__.py", line 1705, in __call__
    return self.func(*args)
  File "C:\Users\USER\PycharmProjects\PythonPlayground\pipes_advanced.py", line 26, in btn1_clicked
    self.pipe_out.send(["hello", 1, 2])
  File "C:\Users\USER\Anaconda3\envs\THS\lib\multiprocessing\connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "C:\Users\USER\Anaconda3\envs\THS\lib\multiprocessing\connection.py", line 280, in _send_bytes
    ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
BrokenPipeError: [WinError 232] The pipe is being closed

起初我认为问题在于我从Entry.get() 调用中收到的值(我的 Tkinter 技能已经生疏了)。我打印了msg 并从小部件中获取了文本。

接下来我尝试将一个常量字符串作为我通过管道发送的参数的值:

def btn1_clicked(self):
    self.pipe_out.send(["hello", "world"])

同样的错误出现了。捕获异常BrokenPipeError 对我没有任何好处(除非我想在管道损坏时处理这种情况)。

如果我对程序的第一个版本(没有 Tkinter)做同样的事情,它就可以工作。这让我相信我的问题来自于我集成 Tkinter 的方式。

【问题讨论】:

【参考方案1】:

您遇到的问题是您轮询管道,但 documentation 说:

投票([超时])

返回是否有数据可供读取。 如果没有指定 timeout 则立即返回。

在第一个示例中它可以工作,因为在启动 Process1 时,您会立即将数据发送到管道:

    def run(self):
        try:
            while True:
                print("Sending message to process 2")
                self.pipe_out.send(["hello"])
        except KeyboardInterrupt:
            pass

并且你不断地这样做,所以.poll 将返回True 并且Process2 中的循环将继续。

tkinter 一样,不会立即将任何内容发送到管道,它会等待用户单击按钮,当任何可能发生时,Process2 已经调用了poll,它立即返回了False 和它甚至没有开始那个循环。如果您注意到,那么它也几乎会立即在终端中打印出

“处理2完成接收”

要解决这个问题似乎最容易使用

while self.pipe_in.poll(None):

根据文档的含义

“如果超时为无,则使用无限超时。”

对于像用户界面这样的东西,这似乎是最合适的(至少从用户的角度来看(或者我认为))所以基本上你在Process2 中的run 方法应该是这样的:

    def run(self):
        try:
            while self.pipe_in.poll(None):
                request = self.pipe_in.recv()
                method = request[0]
                args = request[1:]

                try:
                    getattr(self, method + "_callback")(*args)
                except AttributeError as ae:
                    print("Unknown callback received from pipe", str(ae))

            print("Process 2 done with receiving")
        except (KeyboardInterrupt, EOFError):
            pass

也与问题无关,但似乎不需要从Process1 中的tk.Frame 继承(或Process2 中的object(除非你真的需要使其与Python2兼容)),你几乎可以从tk.Tk 继承,这应该更容易实际将其用作主窗口,因为self 将是Tk 实例

【讨论】:

确实如此。感谢您指出所有问题。我注意到在我运行 Panda3D 的另一个程序中使用了一个通过引擎的任务管理器触发的任务来进行轮询。所以这就是为什么这个问题从未在那里发生过。至于Frame 继承——确实如此。我最初将其命名为object,但随后从互联网上复制了另一个示例,我猜它已经很旧了,并且在其中。 :D 无论如何,Tkinter 依赖项将消失并被 PySide 取代。不过,很好发现。 @rbaleksandar 谢谢,还有另一个问题需要处理(用可能的解决方案进行编辑,但我想应该处理它以便它也关闭进程或……)这是一个@987654345 @,根据docs:recv() 如果没有任何东西可以接收并且另一端已关闭,则引发 EOFError。 LOL 我刚刚收到错误并回头看这里。感谢您的额外步骤。它绝对提供了一个更干净的出口。我还将为管道添加close(),最后但并非最不重要的是使Tkinter窗口的关闭事件能够关闭底层Controller和所有其他进程。

以上是关于尝试通过运行 Tkinter 的发送进程在进程之间通过管道发送任何内容时出现管道损坏错误的主要内容,如果未能解决你的问题,请参考以下文章

尝试在 tkinter mainloop 旁边运行一个 while true 循环

使用管道在进程之间发送字符串

使用命名管道在两个进程之间发送字符串

进程之间的动态绑定

如何在KVM VM中的java和php进程之间的进程间通信中摆脱tcp-ip发送延迟

shell向子进程发送信号