等到线程池进程完成后再提交下一个进程
Posted
技术标签:
【中文标题】等到线程池进程完成后再提交下一个进程【英文标题】:wait until thread pool process is finished before submitting the next process 【发布时间】:2021-11-12 16:44:09 【问题描述】:我正在尝试暂停一个线程,直到初始线程完成。
我的线程池开始如下:
thread_pool_executor = futures.ThreadPoolExecutor(max_workers=5)
def onStart(self,event):
self._quit = False
self.btn_output.Disable()
self.btn_watch.Disable()
self.btn_start.Disable()
thread_pool_executor.submit(self.monitor_folder)
thread_pool_executor.submit(self.active_listening)
def monitor_folder(self):
#wx.CallAfter(self.print1)
while self._quit == False:
#path_to_watch = os.path.abspath (".")
path_to_watch = self.lbl_watch.GetLabel()
# FindFirstChangeNotification sets up a handle for watching
# file changes. The first parameter is the path to be
# watched; the second is a boolean indicating whether the
# directories underneath the one specified are to be watched;
# the third is a list of flags as to what kind of changes to
# watch for. We're just looking at file additions / deletions.
#
change_handle = win32file.FindFirstChangeNotification (
path_to_watch,
0,
win32con.FILE_NOTIFY_CHANGE_FILE_NAME
)
#
# Loop forever, listing any file changes. The WaitFor... will
# time out every half a second allowing for keyboard interrupts
# to terminate the loop.
try:
old_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
while 1:
result = win32event.WaitForSingleObject (change_handle, 500)
# If the WaitFor... returned because of a notification (as
# opposed to timing out or some error) then look for the
# changes in the directory contents.
if result == win32con.WAIT_OBJECT_0:
new_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
added = [f for f in new_path_contents if not f in old_path_contents]
deleted = [f for f in old_path_contents if not f in new_path_contents]
if added:
print ("Added: ", ", ".join (added))
#Get file type
a_string = ", ".join (added)
length = len(a_string)
fType = a_string[length - 4:]
if fType == ".csv" or fType == ".txt":
data_file = a_string
thread_pool_executor.submit(self.process_csv,a_string) # Pause thread, run process_csv, and resume
if deleted: print ("Deleted: ", ", ".join (deleted))
old_path_contents = new_path_contents
win32file.FindNextChangeNotification (change_handle)
finally:
win32file.FindCloseChangeNotification (change_handle)
如果将两个或更多文件添加到目录中,我会在 thread_pool_executor.submit(self.process_csv,a_string)
上收到错误消息。怎样才能等到thread_pool_executor.submit(self.process_csv,a_string)
运行完再提交下一个进程?
我希望这 2 个继续运行。
thread_pool_executor.submit(self.monitor_folder)
thread_pool_executor.submit(self.active_listening)
编辑:
检测到两个或更多文件时
消息=(-2146777998,'OLE 错误 0x800ac472',无,无) 来源=C:\Users\Apache Paint\AppData\Local\Temp\gen_py\3.8\00020813-0000-0000-C000-000000000046x0x1x9.py 堆栈跟踪: 文件“C:\Users\Apache Paint\AppData\Local\Temp\gen_py\3.8\00020813-0000-0000-C000-000000000046x0x1x9.py”,第 41189 行,打开 ret = self.oleobj.InvokeTypes(1923, LCID, 1, (13, 0), ((8, 1), (12, 17), (12, 17), (12, 17 ), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17)),文件名 excel_to_pdf 中的文件“C:\Users\Apache Paint\source\repos\Monitor_Folder\Monitor_Folder.py”,第 247 行 wb = excel.Workbooks.Open(文件路径) 文件“C:\Users\Apache Paint\source\repos\Monitor_Folder\Monitor_Folder.py”,第 233 行,在 process_csv(当前帧) self.excel_to_pdf(df, a_string)
完整代码:
import os
import win32file
import win32event
import win32con
import pythoncom
from win32com import client
import ctypes
import pandas as pd
import csv
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
from openpyxl import load_workbook
import wx
import glob
from os.path import splitext
from concurrent import futures
import sys
import datetime
import time
import re
import configparser
config = configparser.ConfigParser()
thread_pool_executor = futures.ThreadPoolExecutor(max_workers=5)
directory = os.getcwd()
v_dict = 'ProjectNumber' : ['1|F6'],'Yield' : ['1|F5'],'Coil Lot Number' : ['1|D6'],'Coil Thickness' : ['1|D5'],'Web Profile' : ['1|H7'],'Operation Number' : ['1|F7'],'Label Readable?' :
['1|D26','2|E26','3|F26','4|G26','5|H26'],'ICCES Number?' : ['1|D27','2|E27','3|F27','4|G27','5|H27'],'Date of Test' : ['1|D7'],'Start Time of Test' :
['1|H5','1|D10','1|E10','1|F10','1|G10'],'End Time of Test' : ['1|H6','2|H10'],'Part Length' : ['1|D12','2|E12','3|F12','4|G12','5|H12'],'Web Width' :
['1|D13','2|E13','3|F13','4|G13','5|H13'],'Flare Far' : ['1|D14','2|E14','3|F14','4|G14','5|H14'],'Flare Near' : ['1|D15','2|E15','3|F15','4|G15','5|H15'],'Hole Location Width' :
['1|D16','2|E16','3|F16','4|G16','5|H16'],'Hole Location Length' : ['1|D17','2|E17','3|F17','4|G17','5|H17'],'Crown' : ['1|D18','2|E18','3|F18','4|G18','5|H18'],'Camber' :
['1|D19','2|E19','3|F19','4|G19','5|H19'],'Bow' : ['1|D20','2|E20','3|F20','4|G20','5|H20'],'Twist' : ['1|D21','2|E21','3|F21','4|G21','5|H21'],'Flange Width Far' :
['1|D22','2|E22','3|F22','4|G22','5|H22'],'Flange Width Near' : ['1|D23','2|E23','3|F23','4|G23','5|H23'],'Lip Length Far' : ['1|D24','2|E24','3|F24','4|G24','5|H24'],'Lip Length Near' :
['1|D25','2|E25','3|F25','4|G25','5|H25']
class MainFrame(wx.Frame):
def __init__(self, parent, title):
super(MainFrame, self).__init__(parent, title=title,size=(600,400))
global template
font = wx.Font(12, wx.MODERN, wx.ITALIC, wx.BOLD, True, u'Arial Narrow',)
font1 = wx.Font(8, wx.MODERN, wx.NORMAL, wx.NORMAL, False, u'Arial Narrow')
b_font = wx.Font(12, wx.MODERN, wx.ITALIC, wx.BOLD, False, u'Arial Narrow')
self.panel = wx.Panel(self)
self.panel.SetBackgroundColour("light gray")
#Create sizers
vbox = wx.BoxSizer(wx.VERTICAL)
hbox1 = wx.BoxSizer(wx.HORIZONTAL)
hbox2 = wx.BoxSizer(wx.HORIZONTAL)
#Create widgets
self.st1 = wx.StaticText(self.panel, label='Script is not running.',style = wx.ALIGN_CENTRE)
self.lbl_watch = wx.StaticText(self.panel, label= os.path.abspath ("."), style=wx.ALIGN_LEFT)
self.lbl_output = wx.StaticText(self.panel, label=os.path.abspath ("."))
self.tc = wx.TextCtrl(self.panel, style= wx.TE_MULTILINE | wx.SUNKEN_BORDER | wx.TE_READONLY )
self.btn_start = wx.Button(self.panel, label='Run Script', size=(100, 30))
self.btn_watch = wx.Button(self.panel, label='Select Folder to Watch')
self.btn_output = wx.Button(self.panel, label='Select Output Folder ')
self.btn_start.SetBackgroundColour(wx.Colour(198, 89, 17))
self.st1.SetForegroundColour((255,0,0)) # set text color
self.tc.SetFont(font1)
self.st1.SetFont(font)
self.btn_start.SetFont(b_font)
self.btn_start.Bind(wx.EVT_BUTTON, self.onStart)
self.btn_output.Bind(wx.EVT_BUTTON, self.choose_output)
self.btn_watch.Bind(wx.EVT_BUTTON, self.choose_watch)
hbox1.Add(self.btn_watch )
hbox1.Add(self.lbl_watch, 0 , wx.ALL | wx.EXPAND, 5)
hbox2.Add(self.btn_output)
hbox2.Add(self.lbl_output, 0 , wx.ALL | wx.EXPAND, 5)
vbox.Add(self.st1,-1 , wx.ALIGN_CENTRE | wx.ALL, 5)
vbox.Add(self.btn_start, 0, wx.ALIGN_CENTRE | wx.ALL, 5)
vbox.Add(self.tc,2, wx.EXPAND| wx.ALL, 10)
vbox.Add(hbox1,0, wx.EXPAND| wx.ALL, 10)
vbox.Add(hbox2,0, wx.EXPAND| wx.ALL, 10)
self.panel.SetSizer(vbox)
self.Centre()
self.Show()
template = self.resource_path('template.xlsx')
self.write_config()
def write_config(self):
#Write Config
global config
global config_default
if not os.path.exists(self.resource_path('config.ini')):
#config["DEFAULT"] =
# "watch_folder": os.path.abspath ("."),
# "output_folder": os.path.abspath (".")
config['DEFAULT'] = 'watch_folder': os.path.abspath ("."), 'output_folder': os.path.abspath (".")
config.write(open(self.resource_path('config.ini'), 'w'))
#Check if sections are blank
config.read(self.resource_path("config.ini"))
config_default = config["DEFAULT"]
if config_default["output_folder"] == '':
config_default["output_folder"] = os.path.abspath (".")
with open(self.resource_path('config.ini'), 'w') as conf:
config.write(conf)
if config_default["watch_folder"] == '':
config_default["watch_folder"] = os.path.abspath (".")
with open(self.resource_path('config.ini'), 'w') as conf:
config.write(conf)
#Update labels/paths
self.lbl_watch.SetLabel(config_default["watch_folder"])
self.lbl_output.SetLabel(config_default["output_folder"])
def onStart(self,event):
self._quit = False
self.btn_output.Disable()
self.btn_watch.Disable()
self.btn_start.Disable()
thread_pool_executor.submit(self.monitor_folder)
thread_pool_executor.submit(self.active_listening)
def choose_output(self, event):
message = 'Select Output Folder'
f_path = self.set_dir(message)
if f_path == '':
dlg = wx.MessageBox('Do you want to revert path to the default directory?' + '\n' + '\n' + 'Selecting "Yes" will revert the path to the application directory.','Revert Output Filepath?',wx.YES_NO | wx.ICON_QUESTION)
if dlg == wx.YES:
#Update output folder
f_path = os.getcwd()
else:
#f_path = self.lbl_output.SetLabel(f_path)
return
#Update output folder
self.lbl_output.SetLabel(f_path)
config_default["output_folder"] = f_path
#Write changes back to file
with open(self.resource_path('config.ini'), 'w') as conf:
config.write(conf)
def choose_watch(self, event):
message = 'Select Watch Folder'
f_path = self.set_dir(message)
print(f_path)
if f_path == '':
dlg = wx.MessageBox('Do you want to revert path to the default directory?' + '\n' + '\n' + 'Selecting "Yes" will revert the path to the application directory.','Revert Watch Filepath?',wx.YES_NO | wx.ICON_QUESTION)
if dlg == wx.YES:
#Update watch folder
f_path = os.getcwd()
else:
#f_path = self.lbl_watch.SetLabel(f_path)
return
#Update watch folder
self.lbl_watch.SetLabel(f_path)
config_default["watch_folder"] = f_path
#Write changes back to file
with open(self.resource_path('config.ini'), 'w') as conf:
config.write(conf)
def set_dir(self, message):
dlg = wx.DirDialog(
self, message=message,
style=wx.DD_DEFAULT_STYLE)
# Show the dialog and retrieve the user response.
if dlg.ShowModal() == wx.ID_OK:
# load directory
path = dlg.GetPath()
else:
path = ''
# Destroy the dialog.
dlg.Destroy()
return path
def get_pdf_path(self):
folder = self.lbl_output.GetLabel()
if folder == '':
folder = os.getcwd()
self.lbl_output.SetLabel(folder)
config_default["output_folder"] = folder
fileName = 'Test Data Report ' + str(datetime.datetime.now().strftime("%Y_%m_%d__%H.%M.%S")) + '.pdf'# + "\n" %H:%M:%S"
#fileName = fileName.replace(':','.')
path =os.path.join(folder, fileName)
return path
def getDirectory(self, filename): # For Excel Template
# Construct path for file
current_work_dir = os.getcwd()
path = os.path.join(current_work_dir, filename)
return path
def getDirectoryCSV(self, filename): # For Watch Folder
# Construct path for file
current_work_dir = self.lbl_watch.GetLabel()
if current_work_dir == '':
current_work_dir = os.getcwd()
self.lbl_watch.SetLabel(current_work_dir)
config_default["watch_folder"] = current_work_dir
path = os.path.join(current_work_dir, filename)
return path
def process_csv(self,a_string):
data_file = self.getDirectoryCSV(a_string)
print(data_file)
df = pd.read_fwf(data_file, header=None)
df = df[0].str.split(',', expand=True)
df.set_index(0, inplace = True)
df.fillna("", inplace=True)
self.excel_to_pdf(df, a_string)
def excel_to_pdf(self, df,a_string):
# Open Microsoft Excel
pythoncom.CoInitialize()
excel = client.Dispatch("Excel.Application")
excel.Visible = False
excel.ScreenUpdating = False
excel.DisplayAlerts = False
excel.EnableEvents = False
# Read Excel File
filepath = self.getDirectory(template)
print (filepath)
wb = excel.Workbooks.Open(filepath)
work_sheets = wb.Worksheets('Form')
#Write to sheet
for key, items in v_dict.items():
row_id = key
for item in items:
cel = str(item.split('|')[1])
col_num = int(str(item.split('|')[0]))
work_sheets.Range(cel).Value = df.loc[row_id][col_num]
#Format
#print_area = 'C1:I64'
work_sheets.PageSetup.Zoom = False
work_sheets.PageSetup.FitToPagesTall = 1
work_sheets.PageSetup.TopMargin = 10
work_sheets.PageSetup.BottomMargin = 10
work_sheets.PageSetup.RightMargin = 10
work_sheets.PageSetup.LeftMargin = 10
#work_sheets.PageSetup.PrintArea = print_area
# Convert into PDF File
pdf_path = self.get_pdf_path()
work_sheets.ExportAsFixedFormat(0, pdf_path)
excel.ScreenUpdating = True
excel.DisplayAlerts = True
excel.EnableEvents = True
wb.Close(SaveChanges=False)
excel.Quit()
text = 'PDF CREATED: ' + pdf_path
self.tc.AppendText(text + "\n" + "\n")
def df_to_pdf(self,df):
#Convert to PDF
fig, ax =plt.subplots(figsize=(12,4))
ax.axis('tight')
ax.axis('off')
the_table = ax.table(cellText=df.values,colLabels=df.columns,loc='center')
pp = PdfPages("csv_data.pdf") # ADD DATE
pp.savefig(fig, bbox_inches='tight')
pp.close()
def active_listening(self):
font = wx.Font(12, wx.MODERN, wx.ITALIC, wx.BOLD, False, u'Arial Narrow',)
self.st1.SetForegroundColour((0,128,0))
self.st1.SetFont(font)
m = 'Listening'
self.st1.SetLabel(m)
i = 1
while self._quit == False:
time.sleep(1)
if i <= 3:
m = m + "."
self.st1.SetLabel(m)
i = i + 1
else:
i = 1
m = 'Listening'
def monitor_folder(self):
#wx.CallAfter(self.print1)
while self._quit == False:
#path_to_watch = os.path.abspath (".")
path_to_watch = self.lbl_watch.GetLabel()
# FindFirstChangeNotification sets up a handle for watching
# file changes. The first parameter is the path to be
# watched; the second is a boolean indicating whether the
# directories underneath the one specified are to be watched;
# the third is a list of flags as to what kind of changes to
# watch for. We're just looking at file additions / deletions.
#
change_handle = win32file.FindFirstChangeNotification (
path_to_watch,
0,
win32con.FILE_NOTIFY_CHANGE_FILE_NAME
)
#
# Loop forever, listing any file changes. The WaitFor... will
# time out every half a second allowing for keyboard interrupts
# to terminate the loop.
try:
old_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
while 1:
result = win32event.WaitForSingleObject (change_handle, 500)
# If the WaitFor... returned because of a notification (as
# opposed to timing out or some error) then look for the
# changes in the directory contents.
if result == win32con.WAIT_OBJECT_0:
new_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
added = [f for f in new_path_contents if not f in old_path_contents]
deleted = [f for f in old_path_contents if not f in new_path_contents]
if added:
print ("Added: ", ", ".join (added))
#Get file type
a_string = ", ".join (added)
length = len(a_string)
fType = a_string[length - 4:]
if fType == ".csv" or fType == ".txt":
data_file = a_string
thread_pool_executor.submit(self.process_csv,a_string) # Pause thread, run process_csv, and resume
if deleted: print ("Deleted: ", ", ".join (deleted))
old_path_contents = new_path_contents
win32file.FindNextChangeNotification (change_handle)
finally:
win32file.FindCloseChangeNotification (change_handle)
def resource_path(self, relative_path):
""" Get absolute path to resource, works for dev and for PyInstaller """
try:
# PyInstaller creates a temp folder and stores path in _MEIPASS
base_path = sys._MEIPASS
except Exception:
base_path = os.path.abspath(".")
return os.path.join(base_path, relative_path)
def main():
app = wx.App()
ex = MainFrame(None, title='File Monitor')
ex.Show()
app.MainLoop()
if __name__ == '__main__':
main()
【问题讨论】:
【参考方案1】:这是threading.Semaphore
的经典用例
在每个不能同时运行的函数中,使用Semaphore(1)
将其全局(在主线程中)实例化。相应地使用 acquire()
和 release()
方法来序列化对“共享资源”的访问(在主 monitir_folder
函数或 process_csv
函数内,取决于您收到错误的位置 - 在这种情况下,CSV 处理内部状态
由于许可数量为 1,RLock
也很适合,但信号量更好地表达您想要实现的目标。
例子:
# Global declaration
semaphore = threading.Semaphore(1)
...
# Inside the monitor_folder function
semaphore.acquire()
try:
thread_pool_executor.submit(self.process_csv,a_string)
finally:
semaphore.release()
【讨论】:
我对信号量不熟悉。我会调查的。我用错误消息和完整代码更新了我的帖子。 在您的问题中,您询问了如何限制线程任务提交。虽然查看更多代码可能有助于找出问题所在,但直接使用Semaphore
将帮助您自行解决问题。如果您不确定如何使用它,请查看google.com/amp/s/www.geeksforgeeks.org/…
@cmccall95 我添加了一个示例。如果您觉得我的回答有帮助,请采纳。
哇这么多研究,我从来没有遇到过信号量。这么简单的污点。我肯定会继续使用它以上是关于等到线程池进程完成后再提交下一个进程的主要内容,如果未能解决你的问题,请参考以下文章
python全栈脱产第37天------进程池与线程池协程gevent模块单线程下实现并发的套接字通信