等到线程池进程完成后再提交下一个进程

Posted

技术标签:

【中文标题】等到线程池进程完成后再提交下一个进程【英文标题】:wait until thread pool process is finished before submitting the next process 【发布时间】:2021-11-12 16:44:09 【问题描述】:

我正在尝试暂停一个线程,直到初始线程完成。

我的线程池开始如下:

thread_pool_executor = futures.ThreadPoolExecutor(max_workers=5)

def onStart(self,event): 
        self._quit = False
        self.btn_output.Disable()
        self.btn_watch.Disable()
        self.btn_start.Disable()

        thread_pool_executor.submit(self.monitor_folder)
        thread_pool_executor.submit(self.active_listening)

def monitor_folder(self):
        #wx.CallAfter(self.print1)
        while self._quit == False:
            #path_to_watch = os.path.abspath (".")
            path_to_watch = self.lbl_watch.GetLabel()
            
            # FindFirstChangeNotification sets up a handle for watching
            #  file changes. The first parameter is the path to be
            #  watched; the second is a boolean indicating whether the
            #  directories underneath the one specified are to be watched;
            #  the third is a list of flags as to what kind of changes to
            #  watch for. We're just looking at file additions / deletions.
            #
            change_handle = win32file.FindFirstChangeNotification (
              path_to_watch,
              0,
              win32con.FILE_NOTIFY_CHANGE_FILE_NAME
            )
            #
            # Loop forever, listing any file changes. The WaitFor... will
            #  time out every half a second allowing for keyboard interrupts
            #  to terminate the loop.
            try:
              old_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
              while 1:
                result = win32event.WaitForSingleObject (change_handle, 500)
                # If the WaitFor... returned because of a notification (as
                #  opposed to timing out or some error) then look for the
                #  changes in the directory contents.
                if result == win32con.WAIT_OBJECT_0:
                  new_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
                  added = [f for f in new_path_contents if not f in old_path_contents]
                  deleted = [f for f in old_path_contents if not f in new_path_contents]
                  if added: 
                      print ("Added: ", ", ".join (added))
              
                      #Get file type
                      a_string = ", ".join (added)
                      length = len(a_string)
                      fType = a_string[length - 4:]

                      if fType == ".csv" or fType == ".txt":
                        data_file = a_string                        
                        thread_pool_executor.submit(self.process_csv,a_string) # Pause thread, run process_csv, and resume
                        

                  if deleted: print ("Deleted: ", ", ".join (deleted))

                  old_path_contents = new_path_contents
                  win32file.FindNextChangeNotification (change_handle)

            finally:
              win32file.FindCloseChangeNotification (change_handle)

如果将两个或更多文件添加到目录中,我会在 thread_pool_executor.submit(self.process_csv,a_string) 上收到错误消息。怎样才能等到thread_pool_executor.submit(self.process_csv,a_string) 运行完再提交下一个进程?

我希望这 2 个继续运行。

thread_pool_executor.submit(self.monitor_folder)
thread_pool_executor.submit(self.active_listening)

编辑:

检测到两个或更多文件时

消息=(-2146777998,'OLE 错误 0x800ac472',无,无) 来源=C:\Users\Apache Paint\AppData\Local\Temp\gen_py\3.8\00020813-0000-0000-C000-000000000046x0x1x9.py 堆栈跟踪: 文件“C:\Users\Apache Paint\AppData\Local\Temp\gen_py\3.8\00020813-0000-0000-C000-000000000046x0x1x9.py”,第 41189 行,打开 ret = self.oleobj.InvokeTypes(1923, LCID, 1, (13, 0), ((8, 1), (12, 17), (12, 17), (12, 17 ), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17), (12, 17)),文件名 excel_to_pdf 中的文件“C:\Users\Apache Paint\source\repos\Monitor_Folder\Monitor_Folder.py”,第 247 行 wb = excel.Workbooks.Open(文件路径) 文件“C:\Users\Apache Paint\source\repos\Monitor_Folder\Monitor_Folder.py”,第 233 行,在 process_csv(当前帧) self.excel_to_pdf(df, a_string)

完整代码:

import os
import win32file
import win32event
import win32con
import pythoncom

from win32com import client
import ctypes
import pandas as pd
import csv
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
from openpyxl import load_workbook
import wx

import glob
from os.path import splitext
from concurrent import futures
import sys
import datetime
import time
import re
import configparser

config = configparser.ConfigParser()

thread_pool_executor = futures.ThreadPoolExecutor(max_workers=5)
directory = os.getcwd()


v_dict = 'ProjectNumber' : ['1|F6'],'Yield' : ['1|F5'],'Coil Lot Number' : ['1|D6'],'Coil Thickness' : ['1|D5'],'Web Profile' : ['1|H7'],'Operation Number' : ['1|F7'],'Label Readable?' : 
                  ['1|D26','2|E26','3|F26','4|G26','5|H26'],'ICCES Number?' : ['1|D27','2|E27','3|F27','4|G27','5|H27'],'Date of Test' : ['1|D7'],'Start Time of Test' : 
                  ['1|H5','1|D10','1|E10','1|F10','1|G10'],'End Time of Test' : ['1|H6','2|H10'],'Part Length' : ['1|D12','2|E12','3|F12','4|G12','5|H12'],'Web Width' : 
                  ['1|D13','2|E13','3|F13','4|G13','5|H13'],'Flare Far' : ['1|D14','2|E14','3|F14','4|G14','5|H14'],'Flare Near' : ['1|D15','2|E15','3|F15','4|G15','5|H15'],'Hole Location Width' :
                 ['1|D16','2|E16','3|F16','4|G16','5|H16'],'Hole Location Length' : ['1|D17','2|E17','3|F17','4|G17','5|H17'],'Crown' : ['1|D18','2|E18','3|F18','4|G18','5|H18'],'Camber' : 
                 ['1|D19','2|E19','3|F19','4|G19','5|H19'],'Bow' : ['1|D20','2|E20','3|F20','4|G20','5|H20'],'Twist' : ['1|D21','2|E21','3|F21','4|G21','5|H21'],'Flange Width Far' : 
                 ['1|D22','2|E22','3|F22','4|G22','5|H22'],'Flange Width Near' : ['1|D23','2|E23','3|F23','4|G23','5|H23'],'Lip Length Far' : ['1|D24','2|E24','3|F24','4|G24','5|H24'],'Lip Length Near' : 
                 ['1|D25','2|E25','3|F25','4|G25','5|H25']

class MainFrame(wx.Frame):
    def __init__(self, parent, title):
        super(MainFrame, self).__init__(parent, title=title,size=(600,400))
        global template

        font = wx.Font(12, wx.MODERN, wx.ITALIC, wx.BOLD, True, u'Arial Narrow',)
        font1 = wx.Font(8, wx.MODERN, wx.NORMAL, wx.NORMAL, False, u'Arial Narrow')
        b_font = wx.Font(12, wx.MODERN, wx.ITALIC, wx.BOLD, False, u'Arial Narrow')
        
        self.panel = wx.Panel(self)
        self.panel.SetBackgroundColour("light gray")
        #Create sizers
        vbox = wx.BoxSizer(wx.VERTICAL)
        hbox1 = wx.BoxSizer(wx.HORIZONTAL)
        hbox2 = wx.BoxSizer(wx.HORIZONTAL)
        #Create widgets
        self.st1 = wx.StaticText(self.panel, label='Script is not running.',style = wx.ALIGN_CENTRE)
        self.lbl_watch = wx.StaticText(self.panel, label= os.path.abspath ("."), style=wx.ALIGN_LEFT)
        self.lbl_output = wx.StaticText(self.panel, label=os.path.abspath ("."))
        self.tc = wx.TextCtrl(self.panel, style= wx.TE_MULTILINE | wx.SUNKEN_BORDER | wx.TE_READONLY )
        
        self.btn_start = wx.Button(self.panel, label='Run Script', size=(100, 30))
        self.btn_watch = wx.Button(self.panel, label='Select Folder to Watch')
        self.btn_output = wx.Button(self.panel, label='Select Output Folder ')
        self.btn_start.SetBackgroundColour(wx.Colour(198, 89, 17))
        
        self.st1.SetForegroundColour((255,0,0)) # set text color
        self.tc.SetFont(font1)
        self.st1.SetFont(font)
        self.btn_start.SetFont(b_font)


        self.btn_start.Bind(wx.EVT_BUTTON, self.onStart)
        self.btn_output.Bind(wx.EVT_BUTTON, self.choose_output)
        self.btn_watch.Bind(wx.EVT_BUTTON, self.choose_watch)

        hbox1.Add(self.btn_watch )
        hbox1.Add(self.lbl_watch, 0 , wx.ALL | wx.EXPAND, 5)
        hbox2.Add(self.btn_output)
        hbox2.Add(self.lbl_output, 0 , wx.ALL | wx.EXPAND, 5)
        
        vbox.Add(self.st1,-1 ,  wx.ALIGN_CENTRE | wx.ALL, 5)
        vbox.Add(self.btn_start, 0, wx.ALIGN_CENTRE | wx.ALL, 5)
        vbox.Add(self.tc,2, wx.EXPAND| wx.ALL, 10)
        vbox.Add(hbox1,0, wx.EXPAND| wx.ALL, 10)
        vbox.Add(hbox2,0, wx.EXPAND| wx.ALL, 10)

        
        self.panel.SetSizer(vbox)
        self.Centre()
        self.Show()

        template = self.resource_path('template.xlsx')
        self.write_config()

    def write_config(self):  
        #Write Config
        global config
        global config_default

        if not os.path.exists(self.resource_path('config.ini')):
            #config["DEFAULT"] = 
            #    "watch_folder": os.path.abspath ("."),
            #    "output_folder": os.path.abspath (".")
            config['DEFAULT'] = 'watch_folder': os.path.abspath ("."), 'output_folder': os.path.abspath (".")
            config.write(open(self.resource_path('config.ini'), 'w'))
        
        #Check if sections are blank
        config.read(self.resource_path("config.ini"))
        config_default = config["DEFAULT"]

        if config_default["output_folder"] == '':
            config_default["output_folder"] = os.path.abspath (".")
            with open(self.resource_path('config.ini'), 'w') as conf:
                config.write(conf)

        if config_default["watch_folder"] == '': 
            config_default["watch_folder"] = os.path.abspath (".")
            with open(self.resource_path('config.ini'), 'w') as conf:
                config.write(conf)

        #Update labels/paths
        self.lbl_watch.SetLabel(config_default["watch_folder"])
        self.lbl_output.SetLabel(config_default["output_folder"])

    def onStart(self,event): 
        self._quit = False
        self.btn_output.Disable()
        self.btn_watch.Disable()
        self.btn_start.Disable()

        thread_pool_executor.submit(self.monitor_folder)
        thread_pool_executor.submit(self.active_listening)

    def choose_output(self, event):
        message = 'Select Output Folder'
        f_path = self.set_dir(message)

        if f_path == '':
            dlg = wx.MessageBox('Do you want to revert path to the default directory?' +  '\n' + '\n' + 'Selecting "Yes" will revert the path to the application directory.','Revert Output Filepath?',wx.YES_NO | wx.ICON_QUESTION)
            if dlg == wx.YES:
                #Update output folder
                f_path = os.getcwd()

            else:
                #f_path = self.lbl_output.SetLabel(f_path)
                return

        #Update output folder
        self.lbl_output.SetLabel(f_path)
        config_default["output_folder"] = f_path
               
        #Write changes back to file
        with open(self.resource_path('config.ini'), 'w') as conf:
                config.write(conf)

    def choose_watch(self, event):
        message = 'Select Watch Folder'
        f_path = self.set_dir(message)
        print(f_path)
       
        if f_path == '':
            dlg = wx.MessageBox('Do you want to revert path to the default directory?' +  '\n' + '\n' + 'Selecting "Yes" will revert the path to the application directory.','Revert Watch Filepath?',wx.YES_NO | wx.ICON_QUESTION)
            if dlg == wx.YES:
                #Update watch folder
                f_path = os.getcwd()

            else:
                #f_path = self.lbl_watch.SetLabel(f_path)
                return

        #Update watch folder
        self.lbl_watch.SetLabel(f_path)
        config_default["watch_folder"] = f_path
               
        #Write changes back to file
        with open(self.resource_path('config.ini'), 'w') as conf:
                config.write(conf)
        
    def set_dir(self, message):
        dlg = wx.DirDialog(
            self, message=message,
            style=wx.DD_DEFAULT_STYLE)

        # Show the dialog and retrieve the user response.
        if dlg.ShowModal() == wx.ID_OK:
            # load directory
            path = dlg.GetPath()
        else:
            path = ''

        # Destroy the dialog.
        dlg.Destroy()
        return path 

    def get_pdf_path(self):
        folder = self.lbl_output.GetLabel()
        if folder == '':
            folder = os.getcwd()
            self.lbl_output.SetLabel(folder) 
            config_default["output_folder"] = folder

        fileName = 'Test Data Report ' + str(datetime.datetime.now().strftime("%Y_%m_%d__%H.%M.%S")) + '.pdf'# + "\n" %H:%M:%S"
        #fileName = fileName.replace(':','.')
        path =os.path.join(folder, fileName)
        return path
        
    def getDirectory(self, filename): # For Excel Template
        # Construct path for file
        current_work_dir = os.getcwd()
        path = os.path.join(current_work_dir, filename)
        return path

    def getDirectoryCSV(self, filename): # For Watch Folder
        # Construct path for file
        current_work_dir = self.lbl_watch.GetLabel()
        if current_work_dir == '':
            current_work_dir = os.getcwd()
            self.lbl_watch.SetLabel(current_work_dir) 
            config_default["watch_folder"] = current_work_dir

        path = os.path.join(current_work_dir, filename)
        return path

    def process_csv(self,a_string):
        data_file = self.getDirectoryCSV(a_string)
        print(data_file)
        
        df = pd.read_fwf(data_file, header=None)
        df = df[0].str.split(',', expand=True)
        df.set_index(0, inplace = True)
        df.fillna("", inplace=True)

        self.excel_to_pdf(df, a_string)

    def excel_to_pdf(self, df,a_string):
    # Open Microsoft Excel
        pythoncom.CoInitialize()
        excel = client.Dispatch("Excel.Application")
        excel.Visible = False
        excel.ScreenUpdating = False
        excel.DisplayAlerts = False
        excel.EnableEvents = False
       
        # Read Excel File
        filepath = self.getDirectory(template)
        print (filepath)
        wb = excel.Workbooks.Open(filepath)
        work_sheets = wb.Worksheets('Form')

        #Write to sheet
        for key, items in v_dict.items():
            row_id = key
            for item in items:
                cel = str(item.split('|')[1])
                col_num = int(str(item.split('|')[0]))
                work_sheets.Range(cel).Value = df.loc[row_id][col_num]
        
        #Format
        #print_area = 'C1:I64'
        work_sheets.PageSetup.Zoom = False
        work_sheets.PageSetup.FitToPagesTall = 1
        work_sheets.PageSetup.TopMargin = 10
        work_sheets.PageSetup.BottomMargin = 10
        work_sheets.PageSetup.RightMargin = 10
        work_sheets.PageSetup.LeftMargin = 10
        #work_sheets.PageSetup.PrintArea = print_area
        # Convert into PDF File
        pdf_path = self.get_pdf_path()
        
        work_sheets.ExportAsFixedFormat(0, pdf_path)

        excel.ScreenUpdating = True
        excel.DisplayAlerts = True
        excel.EnableEvents = True
        
        wb.Close(SaveChanges=False)
        excel.Quit()

        text = 'PDF CREATED:   ' + pdf_path
        self.tc.AppendText(text + "\n" + "\n")

    def df_to_pdf(self,df):
        #Convert to PDF
        fig, ax =plt.subplots(figsize=(12,4))
        ax.axis('tight')
        ax.axis('off')
        the_table = ax.table(cellText=df.values,colLabels=df.columns,loc='center')

        pp = PdfPages("csv_data.pdf") # ADD DATE
        pp.savefig(fig, bbox_inches='tight')
        pp.close()

    def active_listening(self):
        font = wx.Font(12, wx.MODERN, wx.ITALIC, wx.BOLD, False, u'Arial Narrow',)
        self.st1.SetForegroundColour((0,128,0))
        self.st1.SetFont(font)

        m = 'Listening'
        self.st1.SetLabel(m)

        i = 1
        while self._quit == False:
            time.sleep(1)
            if i <= 3:
              m = m + "."
              self.st1.SetLabel(m)
              i = i + 1
            else: 
              i = 1
              m = 'Listening'

    def monitor_folder(self):
        #wx.CallAfter(self.print1)
        while self._quit == False:
            #path_to_watch = os.path.abspath (".")
            path_to_watch = self.lbl_watch.GetLabel()
            
            # FindFirstChangeNotification sets up a handle for watching
            #  file changes. The first parameter is the path to be
            #  watched; the second is a boolean indicating whether the
            #  directories underneath the one specified are to be watched;
            #  the third is a list of flags as to what kind of changes to
            #  watch for. We're just looking at file additions / deletions.
            #
            change_handle = win32file.FindFirstChangeNotification (
              path_to_watch,
              0,
              win32con.FILE_NOTIFY_CHANGE_FILE_NAME
            )
            #
            # Loop forever, listing any file changes. The WaitFor... will
            #  time out every half a second allowing for keyboard interrupts
            #  to terminate the loop.
            try:
              old_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
              while 1:
                result = win32event.WaitForSingleObject (change_handle, 500)
                # If the WaitFor... returned because of a notification (as
                #  opposed to timing out or some error) then look for the
                #  changes in the directory contents.
                if result == win32con.WAIT_OBJECT_0:
                  new_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
                  added = [f for f in new_path_contents if not f in old_path_contents]
                  deleted = [f for f in old_path_contents if not f in new_path_contents]
                  if added: 
                      print ("Added: ", ", ".join (added))
              
                      #Get file type
                      a_string = ", ".join (added)
                      length = len(a_string)
                      fType = a_string[length - 4:]

                      if fType == ".csv" or fType == ".txt":
                        data_file = a_string                        
                        thread_pool_executor.submit(self.process_csv,a_string) # Pause thread, run process_csv, and resume
                        

                  if deleted: print ("Deleted: ", ", ".join (deleted))

                  old_path_contents = new_path_contents
                  win32file.FindNextChangeNotification (change_handle)

            finally:
              win32file.FindCloseChangeNotification (change_handle)

    def resource_path(self, relative_path):
        """ Get absolute path to resource, works for dev and for PyInstaller """
        try:
            # PyInstaller creates a temp folder and stores path in _MEIPASS
            base_path = sys._MEIPASS
        except Exception:
            base_path = os.path.abspath(".")
        return os.path.join(base_path, relative_path)

def main():
    app = wx.App()
    ex = MainFrame(None, title='File Monitor')
    ex.Show()
    app.MainLoop()

if __name__ == '__main__':
    main()

【问题讨论】:

【参考方案1】:

这是threading.Semaphore的经典用例

在每个不能同时运行的函数中,使用Semaphore(1) 将其全局(在主线程中)实例化。相应地使用 acquire()release() 方法来序列化对“共享资源”的访问(在主 monitir_folder 函数或 process_csv 函数内,取决于您收到错误的位置 - 在这种情况下,CSV 处理内部状态

由于许可数量为 1,RLock 也很适合,但信号量更好地表达您想要实现的目标。

例子:

# Global declaration
semaphore = threading.Semaphore(1)

...

# Inside the monitor_folder function
semaphore.acquire()
try:
  thread_pool_executor.submit(self.process_csv,a_string)
finally:
  semaphore.release()

【讨论】:

我对信号量不熟悉。我会调查的。我用错误消息和完整代码更新了我的帖子。 在您的问题中,您询问了如何限制线程任务提交。虽然查看更多代码可能有助于找出问题所在,但直接使用Semaphore 将帮助您自行解决问题。如果您不确定如何使用它,请查看google.com/amp/s/www.geeksforgeeks.org/… @cmccall95 我添加了一个示例。如果您觉得我的回答有帮助,请采纳。 哇这么多研究,我从来没有遇到过信号量。这么简单的污点。我肯定会继续使用它

以上是关于等到线程池进程完成后再提交下一个进程的主要内容,如果未能解决你的问题,请参考以下文章

高性能异步爬虫概述

高性能异步爬虫概述

python全栈脱产第37天------进程池与线程池协程gevent模块单线程下实现并发的套接字通信

Python入门学习-DAY37-进程池与线程池协程gevent模块

第三十三天

进程池/线程池/协程