如何暂停线程池、运行函数和恢复?
Posted
技术标签:
【中文标题】如何暂停线程池、运行函数和恢复?【英文标题】:How to pause ThreadPool, run function, and resume? 【发布时间】:2021-11-10 23:14:05 【问题描述】:以下代码旨在在将文件添加到其文件夹时触发。如果文件类型为.csv
或.txt
,则需要读取文件。起初,我收到消息
Message=[Errno 13] Permission denied: 'TestDataFile2021-09-11_15-54.csv'
尝试读取文件时。我相信这是因为该文件夹正在被工作线程使用。我一直在想办法暂停线程、运行函数并继续监视文件夹。
错误:
# Loop the data lines
with open(data_file, 'r+') as temp_f:
我需要暂停的地方:
if fType == ".csv" or fType == ".txt":
data_file = a_string
thread_pool_executor.submit(self.process_csv) # Pause thread, run process_csv, and resume
self.process_csv()
完整代码:
import os
import win32file
import win32event
import win32con
from win32com import client
import ctypes
import pandas as pd
import csv
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
from openpyxl import load_workbook
import wx
import glob
from os.path import splitext
from concurrent import futures
import sys
import time
thread_pool_executor = futures.ThreadPoolExecutor(max_workers=3)
class MainFrame(wx.Frame):
def __init__(self, parent, title):
super(MainFrame, self).__init__(parent, title=title,size=(600,400))
self.panel = wx.Panel(self)
self.panel.SetBackgroundColour("light gray")
#Create sizers
vbox = wx.BoxSizer(wx.VERTICAL)
#Create widgets
st1 = wx.StaticText(self.panel, label='Script is not running.')
tc = wx.TextCtrl(self.panel)
btn_start = wx.Button(self.panel, label='Run Script', size=(100, 30))
btn_start.SetBackgroundColour(wx.Colour(198, 89, 17))
#self.btn_login.SetFont(self.b_font)
btn_start.Bind(wx.EVT_BUTTON, self.onStart)
vbox.Add(st1,-1 , wx.ALIGN_CENTRE | wx.ALL, 5)
vbox.Add(btn_start, 0, wx.ALIGN_CENTRE | wx.ALL, 5)
vbox.Add(tc,2, wx.EXPAND| wx.ALL, 10)
#Layout
self.panel.SetSizer(vbox)
self.Centre()
self.Show()
def onStart(self,event):
print('Listening')
self._quit = False
thread_pool_executor.submit(self.monitor_folder)
thread_pool_executor.submit(self.active_listening)
def process_csv(self):
data_file = "TestDataFile2021-09-11_15-54.csv"
data_file_delimiter = ','
largest_column_count = 0
# Loop the data lines
with open(data_file, 'r+') as temp_f:
lines = temp_f.readlines()
for l in lines:
column_count = len(l.split(data_file_delimiter)) + 1
largest_column_count = column_count if largest_column_count < column_count else largest_column_count
column_names = [i for i in range(0, largest_column_count)]
# Read csv
df = pd.read_csv(data_file, delimiter=data_file_delimiter, names=column_names) #header=None,
df.fillna("", inplace=True)
print(df)
def active_listening(self):
m = 'Listening'
i = 1
while self._quit == False:
time.sleep(2)
if i <= 3:
m = m + "."
print(m)
i = i + 1
else:
i = 1
m = 'Listening'
def monitor_folder(self):
#wx.CallAfter(self.print1)
while self._quit == False:
path_to_watch = os.path.abspath (".")
# FindFirstChangeNotification sets up a handle for watching
# file changes. The first parameter is the path to be
# watched; the second is a boolean indicating whether the
# directories underneath the one specified are to be watched;
# the third is a list of flags as to what kind of changes to
# watch for. We're just looking at file additions / deletions.
#
change_handle = win32file.FindFirstChangeNotification (
path_to_watch,
0,
win32con.FILE_NOTIFY_CHANGE_FILE_NAME
)
#
# Loop forever, listing any file changes. The WaitFor... will
# time out every half a second allowing for keyboard interrupts
# to terminate the loop.
try:
old_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
while 1:
result = win32event.WaitForSingleObject (change_handle, 500)
# If the WaitFor... returned because of a notification (as
# opposed to timing out or some error) then look for the
# changes in the directory contents.
if result == win32con.WAIT_OBJECT_0:
new_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
added = [f for f in new_path_contents if not f in old_path_contents]
deleted = [f for f in old_path_contents if not f in new_path_contents]
if added:
print ("Added: ", ", ".join (added))
#Get file type
a_string = ", ".join (added)
length = len(a_string)
fType = a_string[length - 4:]
if fType == ".csv" or fType == ".txt":
data_file = a_string
thread_pool_executor.submit(self.process_csv) # Pause thread, run process_csv, and resume
self.process_csv()
else:
print('Not what we want')
if deleted: print ("Deleted: ", ", ".join (deleted))
old_path_contents = new_path_contents
win32file.FindNextChangeNotification (change_handle)
finally:
win32file.FindCloseChangeNotification (change_handle)
def main():
app = wx.App()
ex = MainFrame(None, title='Border')
ex.Show()
app.MainLoop()
if __name__ == '__main__':
main()
【问题讨论】:
【参考方案1】:无法运行你的程序,所以这有点推测。
如果访问文件确实是程序中的竞争条件,则可以使用锁来解决。
from threading import Lock
....
foo = Lock() #this is a global level variable
每次访问文件/目录/其他受保护的资源都会用锁封装操作:
with foo:
with open(data_file, 'r+') as temp_f:
...
重要的部分是确保代码中访问资源的每个位置都像这样封装。这确保只有一个线程可以在任何特定时间访问资源。如果一个线程在被另一个线程持有的情况下尝试获取锁,则该尝试会阻塞,并且只有在另一个线程释放锁时才会继续。
如果问题是与外部来源的竞争条件,那么这不起作用。如果外部程序创建了您的程序尝试读取的那些文件,您的操作系统可能会保护该文件,同时保持打开状态以供其他进程写入,并且不允许您访问它。这种情况下,正确的做法是捕获异常,稍等片刻再试,一直试,直到成功。
【讨论】:
据我了解,这些文件是由软件生成的。只要没有线程运行,我就可以打开它。但是,我需要立即恢复线程功能完成。如何解锁? 我尝试了上述方法,但不幸的是它仍然对我不起作用。【参考方案2】:我想出了一个解决办法。当另一个线程与 Win32 在同一文件夹中交互时,您似乎无法与文件交互。
这个:
data_file = "TestDataFile2021-09-11_15-54.csv"
data_file_delimiter = ','
largest_column_count = 0
#with FileLock('TestDataFile2021-09-11_15-54.csv'):
#with thread_lock:
#Loop the data lines
with open(data_file, 'r+') as temp_f:
lines = temp_f.readlines()
for l in lines:
column_count = len(l.split(data_file_delimiter)) + 1
largest_column_count = column_count if largest_column_count < column_count else largest_column_count
column_names = [i for i in range(0, largest_column_count-1)]
# Read csv
df = pd.read_csv(data_file, delimiter=data_file_delimiter, names=column_names) #header=None, # , names=column_names
df.fillna("", inplace=True)
df.set_index(0, inplace = True)
被替换为:
df = pd.read_fwf('TestDataFile2021-09-11_15-54.csv', header=None)
df = df[0].str.split(',', expand=True)
df.set_index(0, inplace = True)
df.fillna("", inplace=True)
【讨论】:
以上是关于如何暂停线程池、运行函数和恢复?的主要内容,如果未能解决你的问题,请参考以下文章