如何在 tkinter 旁边运行循环? - Python 二维码阅读器图形用户界面

Posted

技术标签:

【中文标题】如何在 tkinter 旁边运行循环? - Python 二维码阅读器图形用户界面【英文标题】:How can you run loops alongside tkinter? - Python QR code reader GUI 【发布时间】:2021-10-27 19:00:39 【问题描述】:

我正在尝试编写一个也可以处理二维码的 Tkinter 应用程序。为此,我需要循环检查 QR 码是否有效并且我需要发出发布请求。我完全意识到我编码的方式非常低效。有一个更好的方法吗?这是我目前所拥有的:

import cv2
import tkinter as tk
from PIL import Image, ImageTk
import sys
import os
import pyzbar.pyzbar as zbar
import threading
import requests
import queue

result = []
decodedCode = ""

logo     = "logo.png"
settings = "settings.png"

if os.environ.get('DISPLAY','') == '':
    print('no display found. Using :0.0')
    os.environ.__setitem__('DISPLAY', ':0.0')


#create main window
master = tk.Tk()
master.title("tester")
master.geometry("480x800")
master.configure(bg='white')
ttelogo = tk.PhotoImage(file = logo)
settingslogo = tk.PhotoImage(file = settings)

#settings button
settings_frame = tk.Frame(master,width=50,height=50,bg="white")
settings_frame.pack_propagate(0) # Stops child widgets of label_frame from resizing it
settingsBtn = tk.Button(settings_frame, image=settingslogo).pack()
settings_frame.place(x=430,y=0)

#logo
img = tk.Label(master, image=ttelogo, bg='white')
img.image = ttelogo
img.place(x=176.5,y=10)

#Name Label
label_frame = tk.Frame(master,width=400,height=100,bg="white")
label_frame.pack_propagate(0) # Stops child widgets of label_frame from resizing it
tk.Label(label_frame,bg="white",fg="black",text="John Smith Smithington III",font=("Calibri",22)).pack()
label_frame.place(x=40,y=140)

#Instructions Label
instructions_frame = tk.Frame(master,width=440,height=100,bg="white")
instructions_frame.pack_propagate(0) # Stops child widgets of label_frame from resizing it
tk.Label(instructions_frame,bg="white",fg="black",text="Place your pass under the scanner below.",font=("Calibri",10)).pack()
instructions_frame.place(x=20,y=210)

#Camera Window
cameraFrame = tk.Frame(master, width=440, height=480)
cameraFrame.place(x=20, y=260)

#Camera Feed
lmain = tk.Label(cameraFrame)
lmain.place(x=0, y=0)
cap = cv2.VideoCapture(0)

def startScanning():
    global cap
    _, frame = cap.read()
    frame = cv2.flip(frame, 1)
    cv2image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGBA)
    img = Image.fromarray(cv2image)
    imgtk = ImageTk.PhotoImage(image=img)
    lmain.imgtk = imgtk
    lmain.configure(image=imgtk)
    lmain.after(10, startScanning)

def processScan():
    global decodedCode
    stopped = False
    delay = 1
    
    while(True):
        ret = cv2.waitKey(delay) & 0xFF
        if ret == ord('c'): # continue
            stopped = False
            delay = 1
        if ret == ord('q'):
            break
        if stopped or (ret == ord('s')): # stop
            stopped = True
            delay = 30
            continue

        # Capture frame-by-frame
        ret, frame = cap.read()

        decodedObjects = zbar.decode(frame)
        if len(decodedObjects) > 0:
            stopped = True
            for code in decodedObjects:
                #print("Data", obj.data)
                #API Calls
                decodedCode = code.data.decode('utf-8')

    # When everything done, release the capture
    cap.release()
    cv2.destroyAllWindows()

def checkCode():
    global decodedCode
    while True:
        if decodedCode != "":
            print (decodedCode)
            result = requests.post("https://example.com/login/index.php", data='action': 'validate_scan', 'uuid': decodedCode).text
            print(result)
            decodedCode = ""

startScanning()  #Display 2
threading.Thread(name='background', target=processScan).start()
threading.Thread(name='background2', target=checkCode).start()
master.mainloop()  #Starts GUI

编辑:队列形式的新版本:

# import all the necessary modules
from tkinter import *
from multiprocessing import Process, Queue
from queue import Empty  # for excepting a specific error

import numpy as np
import cv2
from PIL import Image, ImageTk
import sys
import os
import pyzbar.pyzbar as zbar
import threading
import requests


# this is the function that will be run in a child process
def processScan(queue_):  # pass the queue as an argument
    stopped = False
    delay = 1
    
    while(True):
        ret = cv2.waitKey(delay) & 0xFF
        if ret == ord('c'): # continue
            stopped = False
            delay = 1
        if ret == ord('q'):
            break
        if stopped or (ret == ord('s')): # stop
            stopped = True
            delay = 30
            continue

        # Capture frame-by-frame
        ret, frame = cap.read()

        decodedObjects = zbar.decode(frame)
        if len(decodedObjects) > 0:
            stopped = True
            for code in decodedObjects:
                #print("Data", obj.data)
                #API Calls
                queue_.put(code.data.decode('utf-8'))

    # When everything done, release the capture
    cap.release()
    cv2.destroyAllWindows()
    #master.after(2000, processScan)
    #return r


# just a function to not write a `lambda`, just easier
# to read code
def startScanning():
    global cap
    _, frame = cap.read()
    frame = cv2.flip(frame, 1)
    cv2image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGBA)
    img = Image.fromarray(cv2image)
    imgtk = ImageTk.PhotoImage(image=img)
    lmain.imgtk = imgtk
    lmain.configure(image=imgtk)
    lmain.after(10, startScanning)
    #processScan()
    #threading.Thread(name='background', target=processScan).start()
    # set process `daemon = True` so that it gets terminated with the
    # main process, this approach is not suggested if you write to file
    # but otherwise it shouldn't cause any issues (maybe an error but
    # that probably can be handled with `try/except`)
    Process(target=processScan, args=(queue, ), daemon=True).start()


# here is the loop for updating the label
# basically get the info from the queue
def update_label():
    try:
        # try getting data but since it is `block=False`
        # if there is nothing in the queue it will not block
        # this process waiting for data to appear in the queue
        # but it will raise the Empty error
        data = queue.get(block=False)
    except Empty:
        pass
    else:
        # if no error was raised just config
        # label to new data
        labelFrame.config(text=data)
    finally:
        # and simply schedule this function to run again in
        # 100 milliseconds (this btw is not recursion)
        master.after(100, update_label)

# crucial part is to use this if statement because
# child processes run this whole script again
if __name__ == '__main__':
    master = Tk()

    logo     = "logo.png"
    settings = "settings.png"

    if os.environ.get('DISPLAY','') == '':
        print('no display found. Using :0.0')
        os.environ.__setitem__('DISPLAY', ':0.0')


    #create main window
    master.attributes("-fullscreen", True)
    master.title("Title")
    master.geometry("480x800")
    master.configure(bg='white')
    ttelogo = PhotoImage(file = logo)
    settingslogo = PhotoImage(file = settings)

    #settings button
    settings_frame = Frame(master,width=50,height=50,bg="white")
    settings_frame.pack_propagate(0) # Stops child widgets of label_frame from resizing it
    settingsBtn = Button(settings_frame, image=settingslogo).pack()
    settings_frame.place(x=430,y=0)

    #logo
    img = Label(master, image=ttelogo, bg='white')
    img.image = ttelogo
    img.place(x=176.5,y=10)

    #Name Label
    label_frame = Frame(master,width=400,height=100,bg="white")
    label_frame.pack_propagate(0) # Stops child widgets of label_frame from resizing it
    Label(label_frame,bg="white",fg="black",text="John Smith Smithington III",font=("Calibri",22)).pack()
    label_frame.place(x=40,y=140)

    #Instructions Label
    instructions_frame = Frame(master,width=440,height=100,bg="white")
    instructions_frame.pack_propagate(0) # Stops child widgets of label_frame from resizing it
    Label(instructions_frame,bg="white",fg="black",text="Place your pass under the scanner below.",font=("Calibri",15)).pack()
    instructions_frame.place(x=20,y=210)

    #Camera Window
    cameraFrame = Frame(master, width=440, height=480)
    cameraFrame.place(x=20, y=260)

    #Camera Feed
    lmain = Label(cameraFrame)
    lmain.place(x=0, y=0)
    cap = cv2.VideoCapture(0)

    
    # define queue (since it is a global variable now
    # it can be easily used in the functions)
    queue = Queue()
    
    #label = Label(root)
    #label.pack()
    # initially start the update function
    update_label()
    
    # just a button for starting the process, but you can also simply
    # call the function
    #Button(root, text='Start', command=start_process).pack()
    startScanning()

    master.mainloop()

仍然遇到错误。也不确定这是否是正确的队列语法。相机源不是实时的。目前只显示一张静态图片。

【问题讨论】:

一种优化是使用queue 将代码从processScan 发送到checkCode。这样,checkCode 可以在无事可做时阻塞,而不是在紧密循环中占用 100% 的 CPU。 您可以提供@TimRoberts 示例的任何更改吗?我之前尝试过排队,但运气不佳。谢谢! 我还注意到,在运行这两个线程时,CPU 几乎处于 200%……目前效率非常低。我只是不熟悉如何与 tkinter 一起运行。我尝试使用“master.after()”方法,但这也不起作用。 对于 cpu 繁重的任务我听说multiprocessing 更好,因为它跨越多个内核的进程,但是tkinter 本身不能在任何子进程上运行,只能在计算部分甚至那么只有可以腌制的对象,也适用于队列,那么你应该使用multiprocessing.Queue而不是queue.Queue @Matiiss 你能给我举个例子吗?我之前尝试过使用队列,但运气不佳。 【参考方案1】:

这和multiprocessing 一样简单(代码 cmets 中的解释(这就是为什么如果删除 cmets 代码可能看起来有很多事情发生,它大约只有 40 行代码)对于这个例子)):

# import all the necessary modules
from tkinter import Tk, Label, Button
from multiprocessing import Process, Queue
from queue import Empty  # for excepting a specific error


# this is the function that will be run in a child process
def count(queue_):  # pass the queue as an argument
    # these modules are just to show the functionality
    # but time might be necessary otherwise
    # if theses are imported in the global namespace
    # then you don't need to import them here again
    import itertools
    import time
    for i in itertools.count():
        # do the main looping
        # put data in the queue
        queue_.put(i)
        # you may not need to use sleep here depending
        # on how long it takes for the
        # loop to finish the iteration
        # the issue is that the loop may finish
        # way faster than the update function runs
        # and that will fill up the queue
        # I think the queue size can be limited
        # but putting sleep like this to allow
        # the update loop to update will work too
        time.sleep(0.2)


# just a function to not write a `lambda`, just easier
# to read code
def start_process():
    # set process `daemon = True` so that it gets terminated with the
    # main process, this approach is not suggested if you write to file
    # but otherwise it shouldn't cause any issues (maybe an error but
    # that probably can be handled with `try/except`)
    Process(target=count, args=(queue, ), daemon=True).start()


# here is the loop for updating the label
# basically get the info from the queue
def update_label():
    try:
        # try getting data but since it is `block=False`
        # if there is nothing in the queue it will not block
        # this process waiting for data to appear in the queue
        # but it will raise the Empty error
        data = queue.get(block=False)
    except Empty:
        pass
    else:
        # if no error was raised just config
        # label to new data
        label.config(text=data)
    finally:
        # and simply schedule this function to run again in
        # 100 milliseconds (this btw is not recursion)
        root.after(100, update_label)


# crucial part is to use this if statement because
# child processes run this whole script again
if __name__ == '__main__':
    root = Tk()
    
    # define queue (since it is a global variable now
    # it can be easily used in the functions)
    queue = Queue()
    
    label = Label(root)
    label.pack()
    # initially start the update function
    update_label()
    
    # just a button for starting the process, but you can also simply
    # call the function
    Button(root, text='Start', command=start_process).pack()

    root.mainloop()

我对@9​​87654323@ 不太熟悉,所以它可能在子进程中不起作用(我想我已经这样做了),在这种情况下,您将不得不使用threading 或使用subprocess 的两种替代方法:将opencv 部分放在另一个文件中,并使用subprocess.Popen 中的python 解释器运行该文件(不适用于未安装Python 的计算机)或转换其他python将带有opencv 部分的文件添加到.exe 文件或某事并在subprocess.Popen 中运行该可执行文件(这两种方法都可能是不必要的,您可能会使用multiprocessingthreading

要改用threading,请更改这些行:

# from multiprocessing import Process, Queue
from threading import Thread
from queue import Empty, Queue

# Process(target=count, args=(queue, ), daemon=True).start()
Thread(target=count, args=(queue, ), daemon=True).start()

【讨论】:

多线程在这里应该可以正常工作。不需要多处理。应避免显式子流程。 @ChristophRackwitz OP 说 CPU 使用率高,multiprocessing 应该限制的东西,不是吗?无论如何,如果使用队列,那么唯一的区别是使用来自queue 模块的队列并将Process 替换为来自threading 库的Thread,如果你想使用Thread,基本上改变为3 行。否则,除了Process 会使用自己的资源并且不会减慢tkinter 的速度(出于好奇)之外,这种情况是否真的有区别?你也是说不应该以这种方式使用subprocess.Popen(如我的回答中所述)吗? 嗯,你需要了解你的工作量。 multiprocessing 不会以任何方式降低 CPU 利用率。从技术上讲,它实际上增加了开销。它所做的是将处理需求分散到其他 CPU。扫描 QR 码的图像是一项耗时的任务。如果要降低 CPU 负载,则需要降低每秒帧率。 @TimRoberts 好的,我想我终于明白了。 multiprocessing 适用于 CPU 绑定任务,而 threading - 适用于 IO 绑定任务。据我了解,检查 QR 码的任务将是 IO 绑定的,但是这两个模块都不是为了减少 CPU 使用率?它们只允许同时运行代码(在使用 tkinter 之类的东西时是必需的(除非循环不需要太多时间))并且可以减少执行任务所需的时间,但是为了减少 CPU 使用率,代码应该被优化或对于这种情况,运行次数更少? @Matiss tkinter 不需要 CPU。 tkinter 进程花费时间等待点击和击键。

以上是关于如何在 tkinter 旁边运行循环? - Python 二维码阅读器图形用户界面的主要内容,如果未能解决你的问题,请参考以下文章

如何在另一个任务旁边运行 tkinter 的 after event?

Tkinter 和 USB 设备检测

你如何在Tkinter的事件循环中运行自己的代码?

如何在 Kivy GUI 旁边运行 Tornado 事件循环?

在 tkinter python 中的输入框旁边打包标签

如何在不中断 tkinter 主循环的情况下运行一个函数,同时将该函数的信息发送到我的主循环中的小部件?