如何暂停线程池、运行函数和恢复?

Posted

技术标签:

【中文标题】如何暂停线程池、运行函数和恢复?【英文标题】:How to pause ThreadPool, run function, and resume? 【发布时间】:2021-11-10 23:14:05 【问题描述】:

以下代码旨在在将文件添加到其文件夹时触发。如果文件类型为.csv.txt,则需要读取文件。起初,我收到消息

Message=[Errno 13] Permission denied: 'TestDataFile2021-09-11_15-54.csv'

尝试读取文件时。我相信这是因为该文件夹正在被工作线程使用。我一直在想办法暂停线程、运行函数并继续监视文件夹。

错误:

# Loop the data lines
with open(data_file, 'r+') as temp_f:

我需要暂停的地方:

                      if fType == ".csv" or fType == ".txt":
                          data_file = a_string
                          thread_pool_executor.submit(self.process_csv) # Pause thread, run process_csv, and resume
                          self.process_csv()    

完整代码:

import os
import win32file
import win32event
import win32con

from win32com import client
import ctypes
import pandas as pd
import csv
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
from openpyxl import load_workbook
import wx

import glob
from os.path import splitext
from concurrent import futures
import sys
import time

thread_pool_executor = futures.ThreadPoolExecutor(max_workers=3)

class MainFrame(wx.Frame):
    def __init__(self, parent, title):
        super(MainFrame, self).__init__(parent, title=title,size=(600,400))

        self.panel = wx.Panel(self)
        self.panel.SetBackgroundColour("light gray")
        #Create sizers
        vbox = wx.BoxSizer(wx.VERTICAL)
        #Create widgets
        st1 = wx.StaticText(self.panel, label='Script is not running.')
        tc = wx.TextCtrl(self.panel)
        btn_start = wx.Button(self.panel, label='Run Script', size=(100, 30))
        btn_start.SetBackgroundColour(wx.Colour(198, 89, 17))
        #self.btn_login.SetFont(self.b_font)
        btn_start.Bind(wx.EVT_BUTTON, self.onStart)

        vbox.Add(st1,-1 ,  wx.ALIGN_CENTRE | wx.ALL, 5)
        vbox.Add(btn_start, 0, wx.ALIGN_CENTRE | wx.ALL, 5)
        vbox.Add(tc,2, wx.EXPAND| wx.ALL, 10)

        #Layout
        self.panel.SetSizer(vbox)
        self.Centre()
        self.Show()

    def onStart(self,event):
        print('Listening')
        self._quit = False
        thread_pool_executor.submit(self.monitor_folder)
        thread_pool_executor.submit(self.active_listening)

    def process_csv(self):
        data_file = "TestDataFile2021-09-11_15-54.csv"
        data_file_delimiter = ','
        largest_column_count = 0

        # Loop the data lines
        with open(data_file, 'r+') as temp_f:
            lines = temp_f.readlines()
            for l in lines:
                column_count = len(l.split(data_file_delimiter)) + 1
                largest_column_count = column_count if largest_column_count < column_count else largest_column_count
        column_names = [i for i in range(0, largest_column_count)]

        # Read csv
        df = pd.read_csv(data_file,  delimiter=data_file_delimiter, names=column_names) #header=None,
        df.fillna("", inplace=True)
        print(df)

    def active_listening(self):
        m = 'Listening'
        i = 1
        while self._quit == False:
            time.sleep(2)
            if i <= 3:
              m = m + "."
              print(m)
              i = i + 1
            else:
              i = 1
              m = 'Listening'

    def monitor_folder(self):
        #wx.CallAfter(self.print1)
        while self._quit == False:
            path_to_watch = os.path.abspath (".")

            # FindFirstChangeNotification sets up a handle for watching
            #  file changes. The first parameter is the path to be
            #  watched; the second is a boolean indicating whether the
            #  directories underneath the one specified are to be watched;
            #  the third is a list of flags as to what kind of changes to
            #  watch for. We're just looking at file additions / deletions.
            #
            change_handle = win32file.FindFirstChangeNotification (
              path_to_watch,
              0,
              win32con.FILE_NOTIFY_CHANGE_FILE_NAME
            )
            #
            # Loop forever, listing any file changes. The WaitFor... will
            #  time out every half a second allowing for keyboard interrupts
            #  to terminate the loop.
            try:
              old_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
              while 1:
                result = win32event.WaitForSingleObject (change_handle, 500)
                # If the WaitFor... returned because of a notification (as
                #  opposed to timing out or some error) then look for the
                #  changes in the directory contents.
                if result == win32con.WAIT_OBJECT_0:
                  new_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
                  added = [f for f in new_path_contents if not f in old_path_contents]
                  deleted = [f for f in old_path_contents if not f in new_path_contents]
                  if added:
                      print ("Added: ", ", ".join (added))

                      #Get file type
                      a_string = ", ".join (added)
                      length = len(a_string)
                      fType = a_string[length - 4:]

                      if fType == ".csv" or fType == ".txt":
                          data_file = a_string
                          thread_pool_executor.submit(self.process_csv) # Pause thread, run process_csv, and resume
                          self.process_csv()

                      else:
                          print('Not what we want')

                  if deleted: print ("Deleted: ", ", ".join (deleted))

                  old_path_contents = new_path_contents
                  win32file.FindNextChangeNotification (change_handle)

            finally:
              win32file.FindCloseChangeNotification (change_handle)

def main():
    app = wx.App()
    ex = MainFrame(None, title='Border')
    ex.Show()
    app.MainLoop()

if __name__ == '__main__':
    main()

【问题讨论】:

【参考方案1】:

无法运行你的程序,所以这有点推测。

如果访问文件确实是程序中的竞争条件,则可以使用锁来解决。

from threading import Lock
....

foo = Lock()    #this is a global level variable

每次访问文件/目录/其他受保护的资源都会用锁封装操作:

with foo:
    with open(data_file, 'r+') as temp_f:
       ...

重要的部分是确保代码中访问资源的每个位置都像这样封装。这确保只有一个线程可以在任何特定时间访问资源。如果一个线程在被另一个线程持有的情况下尝试获取锁,则该尝试会阻塞,并且只有在另一个线程释放锁时才会继续。

如果问题是与外部来源的竞争条件,那么这不起作用。如果外部程序创建了您的程序尝试读取的那些文件,您的操作系统可能会保护该文件,同时保持打开状态以供其他进程写入,并且不允许您访问它。这种情况下,正确的做法是捕获异常,稍等片刻再试,一直试,直到成功。

【讨论】:

据我了解,这些文件是由软件生成的。只要没有线程运行,我就可以打开它。但是,我需要立即恢复线程功能完成。如何解锁? 我尝试了上述方法,但不幸的是它仍然对我不起作用。【参考方案2】:

我想出了一个解决办法。当另一个线程与 Win32 在同一文件夹中交互时,您似乎无法与文件交互。

这个:

        data_file = "TestDataFile2021-09-11_15-54.csv"
        data_file_delimiter = ','
        largest_column_count = 0

        #with FileLock('TestDataFile2021-09-11_15-54.csv'):
        #with thread_lock:
        #Loop the data lines
        with open(data_file, 'r+') as temp_f:
            lines = temp_f.readlines()
            for l in lines:
                column_count = len(l.split(data_file_delimiter)) + 1
                largest_column_count = column_count if largest_column_count < column_count else largest_column_count

        column_names = [i for i in range(0, largest_column_count-1)]

        # Read csv
        df = pd.read_csv(data_file,  delimiter=data_file_delimiter, names=column_names) #header=None, # , names=column_names
        df.fillna("", inplace=True)
        df.set_index(0, inplace = True)

被替换为:

        df = pd.read_fwf('TestDataFile2021-09-11_15-54.csv', header=None)
        df = df[0].str.split(',', expand=True)
        df.set_index(0, inplace = True)
        df.fillna("", inplace=True)

【讨论】:

以上是关于如何暂停线程池、运行函数和恢复?的主要内容,如果未能解决你的问题,请参考以下文章

创建可暂停/可恢复线程的最佳方法是啥

ExecutorService线程池中怎么去暂停和继续一个线程

线程池大小如何设置

JAVA中sleep() 和 wait() 有什么差别?

线程池配置对实例性能的影响

多线程与线程池