Tkinter:如何使用线程来防止主事件循环“冻结”

Posted

技术标签:

【中文标题】Tkinter:如何使用线程来防止主事件循环“冻结”【英文标题】:Tkinter: How to use threads to preventing main event loop from "freezing" 【发布时间】:2013-05-20 16:14:54 【问题描述】:

我有一个带有“开始”按钮和进度条的小型 GUI 测试。期望的行为是:

点击开始 进度条振荡 5 秒 进度条停止

观察到的行为是“开始”按钮冻结 5 秒,然后显示进度条(无振荡)。

到目前为止,这是我的代码:

class GUI:
    def __init__(self, master):
        self.master = master
        self.test_button = Button(self.master, command=self.tb_click)
        self.test_button.configure(
            text="Start", background="Grey",
            padx=50
            )
        self.test_button.pack(side=TOP)

    def progress(self):
        self.prog_bar = ttk.Progressbar(
            self.master, orient="horizontal",
            length=200, mode="indeterminate"
            )
        self.prog_bar.pack(side=TOP)

    def tb_click(self):
        self.progress()
        self.prog_bar.start()
        # Simulate long running process
        t = threading.Thread(target=time.sleep, args=(5,))
        t.start()
        t.join()
        self.prog_bar.stop()

root = Tk()
root.title("Test Button")
main_ui = GUI(root)
root.mainloop()

根据 Bryan Oakley here 的信息,我知道我需要使用线程。我尝试创建一个线程,但我猜测由于该线程是从主线程中启动的,它没有帮助。

我的想法是将逻辑部分放在不同的类中,并在该类中实例化 GUI,类似于 A. Rodas here 的示例代码。

我的问题:

我不知道如何编码,所以这个命令:

self.test_button = Button(self.master, command=self.tb_click)

调用位于另一个类中的函数。这是一件坏事还是有可能?我将如何创建可以处理 self.tb_click 的第二类?我尝试遵循 A. Rodas 的示例代码,该代码运行良好。但我不知道如何在触发动作的 Button 小部件的情况下实现他的解决方案。

如果我应该在单个 GUI 类中处理线程,如何创建一个不干扰主线程的线程?

【问题讨论】:

【参考方案1】:

当你在主线程中加入新线程时,它会等到线程完成,所以即使你使用多线程,GUI也会阻塞。

如果要将逻辑部分放在不同的类中,可以直接子类化Thread,然后在按下按钮时启动该类的新对象。这个 Thread 子类的构造函数可以接收一个 Queue 对象,然后您将能够与 GUI 部分进行通信。所以我的建议是:

    在主线程中创建队列对象 创建一个可以访问该队列的新线程 定期检查主线程中的队列

然后您必须解决如果用户单击两次相同的按钮会发生什么的问题(每次单击都会产生一个新线程),但是您可以通过禁用开始按钮并在您之后再次启用它来解决它拨打self.prog_bar.stop()

import queue

class GUI:
    # ...

    def tb_click(self):
        self.progress()
        self.prog_bar.start()
        self.queue = queue.Queue()
        ThreadedTask(self.queue).start()
        self.master.after(100, self.process_queue)

    def process_queue(self):
        try:
            msg = self.queue.get_nowait()
            # Show result of the task if needed
            self.prog_bar.stop()
        except queue.Empty:
            self.master.after(100, self.process_queue)

class ThreadedTask(threading.Thread):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue
    def run(self):
        time.sleep(5)  # Simulate long running process
        self.queue.put("Task finished")

【讨论】:

另一个漂亮的例子。谢谢 A. Rodas :) 我有一个后续问题:如果我注释掉 self.master.after(100, self.process_queue) 并用简单的 self.process_queue() 替换它,行为是一样的。是否有充分的理由拥有 self.master.after... 部分? 是的,使用self.master.after(100, self.process_queue),您每 100 毫秒安排一次此方法,而self.process_queue() 会在每次调用之间无延迟地持续执行它。没有必要这样做,所以after 是一个更好的解决方案来定期检查内容。 @citizen2077 如果你想阻止用户这样做,你可以处理WM_DELETE_PROTOCOL,并且只有在线程不活跃时才销毁GUI。 @citizen2077 添加一个处理程序将是定义如果使用窗口管理器关闭根会发生什么的第一步,但您也可以使用一个标志来通知线程它应该停止执行。请随意单独提出您的问题,因为它与 OP 的问题没有严格的关系。 关于您最近的更新:如果您之前通过 from Queue import Queue 导入,则只需更改这一行即可从 Python 2 切换到 Python 3。此外,还可以使用 @ 987654328@ 在 Python 2 中,它仍然可以在 Python 3 中工作,因为旧语法仍然被接受。【参考方案2】:

我将提交替代解决方案的依据。它本身并不特定于 Tk 进度条,但它当然可以很容易地实现。

这里有一些类允许您在 Tk 的后台运行其他任务,在需要时更新 Tk 控件,并且不会锁定 gui!

这里是 TkRepeatingTask 和 BackgroundTask 类:

import threading

class TkRepeatingTask():

    def __init__( self, tkRoot, taskFuncPointer, freqencyMillis ):
        self.__tk_   = tkRoot
        self.__func_ = taskFuncPointer        
        self.__freq_ = freqencyMillis
        self.__isRunning_ = False

    def isRunning( self ) : return self.__isRunning_ 

    def start( self ) : 
        self.__isRunning_ = True
        self.__onTimer()

    def stop( self ) : self.__isRunning_ = False

    def __onTimer( self ): 
        if self.__isRunning_ :
            self.__func_() 
            self.__tk_.after( self.__freq_, self.__onTimer )

class BackgroundTask():

    def __init__( self, taskFuncPointer ):
        self.__taskFuncPointer_ = taskFuncPointer
        self.__workerThread_ = None
        self.__isRunning_ = False

    def taskFuncPointer( self ) : return self.__taskFuncPointer_

    def isRunning( self ) : 
        return self.__isRunning_ and self.__workerThread_.isAlive()

    def start( self ): 
        if not self.__isRunning_ :
            self.__isRunning_ = True
            self.__workerThread_ = self.WorkerThread( self )
            self.__workerThread_.start()

    def stop( self ) : self.__isRunning_ = False

    class WorkerThread( threading.Thread ):
        def __init__( self, bgTask ):      
            threading.Thread.__init__( self )
            self.__bgTask_ = bgTask

        def run( self ):
            try :
                self.__bgTask_.taskFuncPointer()( self.__bgTask_.isRunning )
            except Exception as e: print repr(e)
            self.__bgTask_.stop()

这是一个 Tk 测试,它演示了这些的使用。如果您想查看实际演示,只需将其附加到包含这些类的模块底部即可:

def tkThreadingTest():

    from tkinter import Tk, Label, Button, StringVar
    from time import sleep

    class UnitTestGUI:

        def __init__( self, master ):
            self.master = master
            master.title( "Threading Test" )

            self.testButton = Button( 
                self.master, text="Blocking", command=self.myLongProcess )
            self.testButton.pack()

            self.threadedButton = Button( 
                self.master, text="Threaded", command=self.onThreadedClicked )
            self.threadedButton.pack()

            self.cancelButton = Button( 
                self.master, text="Stop", command=self.onStopClicked )
            self.cancelButton.pack()

            self.statusLabelVar = StringVar()
            self.statusLabel = Label( master, textvariable=self.statusLabelVar )
            self.statusLabel.pack()

            self.clickMeButton = Button( 
                self.master, text="Click Me", command=self.onClickMeClicked )
            self.clickMeButton.pack()

            self.clickCountLabelVar = StringVar()            
            self.clickCountLabel = Label( master,  textvariable=self.clickCountLabelVar )
            self.clickCountLabel.pack()

            self.threadedButton = Button( 
                self.master, text="Timer", command=self.onTimerClicked )
            self.threadedButton.pack()

            self.timerCountLabelVar = StringVar()            
            self.timerCountLabel = Label( master,  textvariable=self.timerCountLabelVar )
            self.timerCountLabel.pack()

            self.timerCounter_=0

            self.clickCounter_=0

            self.bgTask = BackgroundTask( self.myLongProcess )

            self.timer = TkRepeatingTask( self.master, self.onTimer, 1 )

        def close( self ) :
            print "close"
            try: self.bgTask.stop()
            except: pass
            try: self.timer.stop()
            except: pass            
            self.master.quit()

        def onThreadedClicked( self ):
            print "onThreadedClicked"
            try: self.bgTask.start()
            except: pass

        def onTimerClicked( self ) :
            print "onTimerClicked"
            self.timer.start()

        def onStopClicked( self ) :
            print "onStopClicked"
            try: self.bgTask.stop()
            except: pass
            try: self.timer.stop()
            except: pass                        

        def onClickMeClicked( self ):
            print "onClickMeClicked"
            self.clickCounter_+=1
            self.clickCountLabelVar.set( str(self.clickCounter_) )

        def onTimer( self ) :
            print "onTimer"
            self.timerCounter_+=1
            self.timerCountLabelVar.set( str(self.timerCounter_) )

        def myLongProcess( self, isRunningFunc=None ) :
            print "starting myLongProcess"
            for i in range( 1, 10 ):
                try:
                    if not isRunningFunc() :
                        self.onMyLongProcessUpdate( "Stopped!" )
                        return
                except : pass   
                self.onMyLongProcessUpdate( i )
                sleep( 1.5 ) # simulate doing work
            self.onMyLongProcessUpdate( "Done!" )                

        def onMyLongProcessUpdate( self, status ) :
            print "Process Update: %s" % (status,)
            self.statusLabelVar.set( str(status) )

    root = Tk()    
    gui = UnitTestGUI( root )
    root.protocol( "WM_DELETE_WINDOW", gui.close )
    root.mainloop()

if __name__ == "__main__": 
    tkThreadingTest()

我将强调有关 BackgroundTask 的两个重点:

1) 你在后台任务中运行的函数需要一个函数指针,它将调用和尊重,这允许任务在中途取消 - 如果可能的话。

2) 您需要确保退出应用程序时后台任务已停止。如果您不解决该问题,即使您的 gui 已关闭,该线程仍将运行!

【讨论】:

哇,我认为您不了解after() 方法的工作原理。在接受的答案中,self.master.after(100, self.process_queue) 不会递归地调用 self.process_queue。它只是 安排 在 100 毫秒内再次运行。第二个参数只是函数的名称,而不是对它的调用——它仅在引发异常 Queue.Empty 时才这样做,这意味着 ThreadedTask 尚未将任何内容放入队列中,因此它需要保留检查。 @martineau 我希望你是对的!我通过一些细微的调整运行它,但由于递归调用过多而崩溃。在其他语言和库中,我使用了非常相似的重复计时器,没有问题。我很想看到它看起来应该的工作方式(即非递归)。当我成功时,我会玩这个并收回我的答案。尽管我的 BackgroundTask 类至少在我的示例中仍然运行良好 - 我还没有对它进行足够的测试以知道它会因为 tk 是非线程安全的而窒息,但是,这让我很担心! 我对我所说的话非常有信心。 Tkinter 不是线程安全的并不意味着您不能在多线程应用程序中使用它。只是您必须将同时访问 Tkinter 的线程数限制为一个(通常由主线程决定)。 My answer 到另一个 Tkinter 问题有一个这样做的例子。 你说的很对!我收回了我苛刻的 cmets。我从根本上改变了我的帖子。我确实看到了递归崩溃,但肯定有其他事情发生。 您可以通过将其daemon 属性设置为True 来实现不需要在退出应用程序之前停止后台任务。有关更多详细信息和相关文档的链接,请参阅 my answer 到另一个问题。【参考方案3】:

问题是 t.join() 阻塞了点击事件,主线程没有回到事件循环来处理重绘。 见Why ttk Progressbar appears after process in Tkinter或TTK progress bar blocked when sending email

【讨论】:

【参考方案4】:

我使用了 RxPY,它有一些不错的线程函数,以相当干净的方式解决了这个问题。没有队列,并且我提供了一个在后台线程完成后在主线程上运行的函数。这是一个工作示例:

import rx
from rx.scheduler import ThreadPoolScheduler
import time
import tkinter as tk

class UI:
   def __init__(self):
      self.root = tk.Tk()
      self.pool_scheduler = ThreadPoolScheduler(1) # thread pool with 1 worker thread
      self.button = tk.Button(text="Do Task", command=self.do_task).pack()

   def do_task(self):
      rx.empty().subscribe(
         on_completed=self.long_running_task, 
         scheduler=self.pool_scheduler
      )

   def long_running_task(self):
      # your long running task here... eg:
      time.sleep(3)
      # if you want a callback on the main thread:
      self.root.after(5, self.on_task_complete)

   def on_task_complete(self):
       pass # runs on main thread

if __name__ == "__main__":
    ui = UI()
    ui.root.mainloop()

另一种使用这种结构的方法可能更简洁(取决于偏好):

tk.Button(text="Do Task", command=self.button_clicked).pack()

...

def button_clicked(self):

   def do_task(_):
      time.sleep(3) # runs on background thread
             
   def on_task_done():
      pass # runs on main thread

   rx.just(1).subscribe(
      on_next=do_task, 
      on_completed=lambda: self.root.after(5, on_task_done), 
      scheduler=self.pool_scheduler
   )

【讨论】:

以上是关于Tkinter:如何使用线程来防止主事件循环“冻结”的主要内容,如果未能解决你的问题,请参考以下文章

Tkinter:在主循环中调用事件

树莓派、tkinter 和线程

Python & Tkinter -> 关于调用冻结程序的长时间运行函数

Tkinter GUI 冻结 - 解除阻塞/线程的提示?

Tkinter 嵌套主循环

你如何在Tkinter的事件循环中运行自己的代码?