如何在 PyQt5 中对 QProcesses 进行排队?
Posted
技术标签:
【中文标题】如何在 PyQt5 中对 QProcesses 进行排队?【英文标题】:How do I queue QProcesses in PyQt5? 【发布时间】:2018-07-16 14:49:46 【问题描述】:我想在 PyQt5 中对 QProcess 进行排队,或者在仍然使用 readAll() 读取标准输出的同时简单地阻塞。相当于 subprocess.call 而不是 subprocess.Pop。当使用 waitForFinished() 时,带有 readAll() 的 stdout 将在进程结束时立即出现,而不是在处理时流出。
示例脚本:
from PIL import Image
import numpy as np
import sys
from PyQt5 import QtGui,QtCore, QtWidgets
class gui(QtWidgets.QMainWindow):
def __init__(self):
super(gui, self).__init__()
self.initUI()
def dataReady(self):
cursor = self.output.textCursor()
cursor.movePosition(cursor.End)
cursor.insertText(str(self.process.readAll(), "utf-8"))
self.output.ensureCursorVisible()
def callProgram(self):
# run the process
# `start` takes the exec and a list of argument
filepath = 'path\image.tif'
self.process.start('some_command filepath'])
# This will output a file image.tif specified by filepath:
# Import file and do stuff to it:
# E.g.
im = Image.open('filepath')
imarray = np.array(im)
# Get image extents as argument to next process:
ext = str(imarray.size)
imarray[imarray == 10] = 5
# Save changes
im = Image.fromarray(imarray)
im.save(filepath)
# Now the image has been updated and should be in a new process below
cmd = 'some_other_command' + filepath + ext
self.process.start(cmd)
# Same thing goes on here:
self.process.start('some_command filepath')
# Import file once again
im = Image.open('filepath')
imarray[imarray == 10] = 5
# Save changes
im = Image.fromarray(imarray)
im.save(filepath)
def initUI(self):
layout = QtWidgets.QHBoxLayout()
self.runButton = QtWidgets.QPushButton('Run')
self.runButton.clicked.connect(self.callProgram)
self.output = QtWidgets.QTextEdit()
layout.addWidget(self.output)
layout.addWidget(self.runButton)
centralWidget = QtWidgets.QWidget()
centralWidget.setLayout(layout)
self.setCentralWidget(centralWidget)
# QProcess object for external app
self.process = QtCore.QProcess(self)
# QProcess emits `readyRead` when there is data to be read
self.process.readyRead.connect(self.dataReady)
# Just to prevent accidentally running multiple times
# Disable the button when process starts, and enable it when it finishes
self.process.started.connect(lambda: self.runButton.setEnabled(False))
self.process.finished.connect(lambda: self.runButton.setEnabled(True))
#Function Main Start
def main():
app = QtWidgets.QApplication(sys.argv)
ui=gui()
ui.show()
sys.exit(app.exec_())
#Function Main END
if __name__ == '__main__':
main()
【问题讨论】:
@eyllanesc 任务 n 在任务 n-1 之后执行。感谢您的帮助! @eyllanesc 例如如果我运行 self.process.start('ping',['127.0.0.1']),如果我 readAll(),它将在标准输出中实时输出打印语句。我想这样做的过程是这样的:任务 n 在任务 n-1 之后执行。但是,虽然它们仍然实时输出到标准输出,但我无法使用 waitForFinished(),因为在每个过程完成后都会打印整个输出。 @eyllanesc 原因是我用 QProcess 调用了加载条的输出增量。如果所有这些增量仅在使用 waitForFinished() 过程完成时才打印,它会破坏加载栏的目的。我使用 QProcess 有多个调用,它们不能并行运行,因为输出文件在另一个中使用。 @eyllanesc 在我的原始脚本中,我有许多 subprocess.call() 一个接一个,每个都输出下一个所需的文件(这就是我不能使用 subprocess.Popen() 的原因)。我想将此转移到 QProcess,因为我想在我的 UI 中为加载栏使用实时打印语句。 @eyllanesc 完成。 【参考方案1】:这种情况下的解决方案是创建一个TaskManager
类,负责处理任务之间的顺序。
import sys
from PyQt5 import QtCore, QtWidgets
from functools import partial
class TaskManager(QtCore.QObject):
started = QtCore.pyqtSignal()
finished = QtCore.pyqtSignal()
progressChanged = QtCore.pyqtSignal(int, QtCore.QByteArray)
def __init__(self, parent=None):
QtCore.QObject.__init__(self, parent)
self._process = QtCore.QProcess(self)
self._process.finished.connect(self.handleFinished)
self._progress = 0
def start_tasks(self, tasks):
self._tasks = iter(tasks)
self.fetchNext()
self.started.emit()
self._progress = 0
def fetchNext(self):
try:
task = next(self._tasks)
except StopIteration:
return False
else:
self._process.start(*task)
return True
def processCurrentTask(self):
output = self._process.readAllStandardOutput()
self._progress += 1
self.progressChanged.emit(self._progress, output)
def handleFinished(self):
self.processCurrentTask()
if not self.fetchNext():
self.finished.emit()
class gui(QtWidgets.QMainWindow):
def __init__(self):
super(gui, self).__init__()
self.initUI()
def dataReady(self, progress, result):
self.output.append(str(result, "utf-8"))
self.progressBar.setValue(progress)
def callProgram(self):
tasks = [("ping", ["8.8.8.8"]),
("ping", ["8.8.8.8"]),
("ping", ["8.8.8.8"])]
self.progressBar.setMaximum(len(tasks))
self.manager.start_tasks(tasks)
def initUI(self):
layout = QtWidgets.QVBoxLayout()
self.runButton = QtWidgets.QPushButton('Run')
self.runButton.clicked.connect(self.callProgram)
self.output = QtWidgets.QTextEdit()
self.progressBar = QtWidgets.QProgressBar()
layout.addWidget(self.output)
layout.addWidget(self.runButton)
layout.addWidget(self.progressBar)
centralWidget = QtWidgets.QWidget()
centralWidget.setLayout(layout)
self.setCentralWidget(centralWidget)
self.manager = TaskManager(self)
self.manager.progressChanged.connect(self.dataReady)
self.manager.started.connect(partial(self.runButton.setEnabled, False))
self.manager.finished.connect(partial(self.runButton.setEnabled, True))
def main():
app = QtWidgets.QApplication(sys.argv)
ui=gui()
ui.show()
sys.exit(app.exec_())
if __name__ == '__main__':
main()
更新:
概括问题,可以说第n个进程需要默认参数和附加参数。
默认参数是独立且固定的
附加参数依赖于之前通过某个函数的过程。
因此可以使用以下表达式进行概括:
result_n = process_n(default_arguments, additional_args_n)`
additional_args_n = fun_n(result_n-1)`
或使用下图:
________ _________ ________ _________ ________
| | | | | | | | | |
| | | | | | | | | |
| TASK-1 |--->| FUN1TO2 |--->| TASK-2 |--->| FUN2TO3 |--->| TASK-3 |
| | | | | | | | | |
|________| |_________| |________| |_________| |________|
然后为了构造这个过程,创建了以下字典:
task_n = "program": program, "args": default_arguments, "function": fun
其中fun
是用于处理此任务的输出以获得下一个任务的附加参数的函数。
在以下示例中,我将使用scriptX.py
作为程序而不是ping
。
#script1.py
import sys
def foo(*args):
v, = args
return "1-"+"".join(v)
arg = sys.argv[1:]
print(foo(arg))
#script2.py
import sys
def foo(*args):
v, = args
return "2-"+"".join(v)
arg = sys.argv[1:]
print(foo(arg))
#script3.py
import sys
def foo(*args):
v, = args
return "3-"+"".join(v)
arg = sys.argv[1:]
print(foo(arg))
fun1to2
是使用 process-1 的结果来生成 process-2 所需的附加参数并且必须返回它的函数。 fun2to3
的类似情况
def fun1to2(*args):
return "additional_arg_for_process2_from_result1"
def fun2to3(*args):
return "additional_arg_for_process3_from_result2"
所以基于以上我们创建任务:
tasks = ["program": "python", "args": ["scripts/script1.py", "default_argument1"], "function": fun1to2,
"program": "python", "args": ["scripts/script2.py", "default_argument2"], "function": fun2to3,
"program": "python", "args": ["scripts/script3.py", "default_argument3"]]
综合以上所有,最终实现如下:
import sys
from PyQt5 import QtCore, QtWidgets
from functools import partial
class TaskManager(QtCore.QObject):
started = QtCore.pyqtSignal()
finished = QtCore.pyqtSignal()
progressChanged = QtCore.pyqtSignal(int, QtCore.QByteArray)
def __init__(self, parent=None):
QtCore.QObject.__init__(self, parent)
self._process = QtCore.QProcess(self)
self._process.finished.connect(self.handleFinished)
self._progress = 0
self._currentTask = None
def start_tasks(self, tasks):
self._tasks = iter(tasks)
self.fetchNext()
self.started.emit()
self._progress = 0
def fetchNext(self, additional_args=None):
try:
self._currentTask = next(self._tasks)
except StopIteration:
return False
else:
program = self._currentTask.get("program")
args = self._currentTask.get("args")
if additional_args is not None:
args += additional_args
self._process.start(program, args)
return True
def processCurrentTask(self):
output = self._process.readAllStandardOutput()
self._progress += 1
fun = self._currentTask.get("function")
res = None
if fun:
res = fun(output)
self.progressChanged.emit(self._progress, output)
return res
def handleFinished(self):
args = self.processCurrentTask()
if not self.fetchNext(args):
self.finished.emit()
def fun1to2(args):
return "-additional_arg_for_process2_from_result1"
def fun2to3(args):
return "-additional_arg_for_process3_from_result2"
class gui(QtWidgets.QMainWindow):
def __init__(self):
super(gui, self).__init__()
self.initUI()
def dataReady(self, progress, result):
self.output.append(str(result, "utf-8"))
self.progressBar.setValue(progress)
def callProgram(self):
tasks = ["program": "python", "args": ["scripts/script1.py", "default_argument1"], "function": fun1to2,
"program": "python", "args": ["scripts/script2.py", "default_argument2"], "function": fun2to3,
"program": "python", "args": ["scripts/script3.py", "default_argument3"]]
self.progressBar.setMaximum(len(tasks))
self.manager.start_tasks(tasks)
def initUI(self):
layout = QtWidgets.QVBoxLayout()
self.runButton = QtWidgets.QPushButton('Run')
self.runButton.clicked.connect(self.callProgram)
self.output = QtWidgets.QTextEdit()
self.progressBar = QtWidgets.QProgressBar()
layout.addWidget(self.output)
layout.addWidget(self.runButton)
layout.addWidget(self.progressBar)
centralWidget = QtWidgets.QWidget()
centralWidget.setLayout(layout)
self.setCentralWidget(centralWidget)
self.manager = TaskManager(self)
self.manager.progressChanged.connect(self.dataReady)
self.manager.started.connect(partial(self.runButton.setEnabled, False))
self.manager.finished.connect(partial(self.runButton.setEnabled, True))
def main():
app = QtWidgets.QApplication(sys.argv)
ui=gui()
ui.show()
sys.exit(app.exec_())
if __name__ == '__main__':
main()
结果:
【讨论】:
非常感谢!按预期工作!但是,如果让我们说第一个任务输出一个我想在启动下一个任务之前导入并执行操作的文件。这可能吗? 第一个进程输出一个文件,我将其作为数组导入并对其进行更改。然后我将该数组保存为文件并在下一个过程中使用该文件。 再次尝试更新!很抱歉对于这个误会!是的,您完全正确:您是否希望第 n 个进程将一些取决于第 (n-1) 个进程的输出的数据作为参数? 我希望和你说的完全一样:第 n 个进程将一些数据作为参数,这些数据取决于第 (n-1) 个进程的输出! @Natetronn 在我的回答中是启动 (n + 1) qprocess 的第 n 个完成的 qprocess 信号。我的回答你有什么不明白的地方?以上是关于如何在 PyQt5 中对 QProcesses 进行排队?的主要内容,如果未能解决你的问题,请参考以下文章