如何按时间顺序使用多处理?
Posted
技术标签:
【中文标题】如何按时间顺序使用多处理?【英文标题】:How to use multiprocessing in a chronical order? 【发布时间】:2021-11-04 03:44:55 【问题描述】:我有一个包含 2 个进程的 python 脚本:
-
过程 1:加载和解压缩文件
过程 2:处理文件,用它做一些事情。
在实施多处理之前,该软件似乎按时间顺序完成其工作。加载所有压缩文件,解压缩它们,然后打开它们来做一些事情。
所以我在游戏中引入了多处理,现在似乎在加载和解压缩文件的同时,打开和处理它们的过程已经开始。所以有多个进程同时做事。问题是,当我在大数据(超过 100 个文件)上运行此代码时,我遇到了并发文件访问的问题。这导致PermissionError: [WinError 32] The process cannot access the file because it is being used by another process:
当我在一个小数据集(大约 30 个文件)上运行 sn-p 时,它似乎没问题,因为文件在进程 2 开始时被解压缩得非常快。
我想要什么:我想保留多处理,因为它可以加快速度,但我希望只有在所有文件都已解压缩(例如,进程 1 已完成)时才启动进程 2。
这是我的 sn-p:
import os
import csv
import collections
import datetime
import zipfile
import re
import shutil
import fnmatch
from pathlib import Path
import ntpath
import configparser
from multiprocessing import Pool
def generate_file_lists():
# Change the following line to a real path
data_files = 'c:\desktop\DataEnergy'
pattern = '*.zip'
last_root = None
args = []
for root, dirs, files in os.walk(data_files):
for filename in fnmatch.filter(files, pattern):
if root != last_root:
last_root = root
if args:
yield args
args = []
args.append((root, filename))
if args:
yield args
def unzip(file_list):
"""
file_list is a list of (root, filename) tuples where
root is the same for all tuples.
"""
# Change the following line to a real path:
counter_part = 'c:\desktop\CounterPart'
for root, filename in file_list:
path = os.path.join(root, filename)
date_zipped_file_s = re.search('-(.\d+)-', filename).group(1)
date_zipped_file = datetime.datetime.strptime(date_zipped_file_s, '%Y%m%d').date()
#Create the new directory location
new_dir = os.path.normpath(os.path.join(os.path.relpath(path, start='c:\desktop\DataEnergy'), ".."))
#Join the directory names counter_part and create their paths.
new = os.path.join(counter_part, new_dir)
#Create the directories
if (not os.path.exists(new)):
os.makedirs(new)
zipfile.ZipFile(path).extractall(new)
#Get al the zipped files
files = os.listdir(new)
#Rename all the files in the created directories
for file in files:
filesplit = os.path.splitext(os.path.basename(file))
if not re.search(r'_\d8.', file):
os.rename(os.path.join(new, file), os.path.join(new, filesplit[0]+'_'+date_zipped_file_s+filesplit[1]))
# Required for Windows:
if __name__ == '__main__':
pool = Pool(13)
pool.map(unzip, generate_file_lists())
print('the files have been unzipped!')
#Start proces 2
all_missing_dates = ['20210701', '20210702']
missing_dates = [datetime.datetime.strptime(i, "%Y%m%d").date() for i in all_missing_dates]
dates_to_process = []
root = Path('.\middle_stage').resolve()
at_set = 'Audi', 'Mercedes', 'Volkswagen'
#Only read the rows if they fulfill the following conditions.
def filter_row(r, missing_date):
if set(row).intersection(at_set):
if len(r) > 24 and r[24].isdigit():
aantal_pplgs = int(r[24])
date_time = datetime.datetime.fromisoformat(r[0]) if len(r) > 3 else True
condition_3 = date_time.date() == missing_date if len(r) > 3 else True
return condition_3
return False
#Open the files and read the rows
print("Start reading data")
data_per_date = dict()
for missing_date in missing_dates:
print("\tReading missing date: ", missing_date)
files=[fn for fn in (e for e in root.glob(f"**/*_missing_date:%Y%m%d.txt") if e.is_file())]
if len(files) != 13:
continue
dates_to_process.append(missing_date)
vehicle_loc_dict = collections.defaultdict(list)
for file in files:
with open(file, 'r') as log_file:
reader = csv.reader(log_file, delimiter = ',')
next(reader) # skip header
for row in reader:
if filter_row(row, missing_date):
print('filter_row has been executed!')
data_per_date[missing_date] = vehicle_loc_dict
【问题讨论】:
您需要使用锁定,在另一个进程仍在处理文件时锁定对文件的访问。您也可以使用队列系统,其中解压缩从压缩文件队列中提取,然后将其添加到要处理的队列等中,其他线程从该队列中提取。 @Booboo,这是我昨天和你谈过的地方。 @SeanPowell 这些是一些不错的选择,但是我不知道如何在上面的代码中实现它.. 我添加了一个答案,可以让您大致了解如何实现此功能,如果您需要其他任何内容,请随时问我 :) @Mediterráneo 我刚看到这个问题。我没有收到您的评论通知,因为我之前没有评论过 this 帖子;你应该在你之前的问题上写下这个评论,并附上这个新问题的链接。有机会我会深入研究。 【参考方案1】:问题似乎如下:
如果您在 Windows 下运行(并且根据您的目录名称,情况似乎是这样),每当您创建一个新进程(这里您通过创建一个多处理池来创建 13 个新进程)时,spawn 方法创建进程。这意味着将创建一个新的空地址空间,在其中重新启动 Python 解释器,并从顶部重新执行源程序以通过在全局范围内执行所有语句来初始化地址空间 except 以if __name__ == '__main__':
开头的块中的任何语句,因为在这些新进程中,此条件将为False
。这也是为什么您将创建新进程的代码放置在这样一个块中的原因,即,这样您就不会陷入创建新进程的递归循环ad inifinitum。
也就是说,您所谓的 process 2 语句在全局范围内,不在 if __name__ == '__main__':
块内,因此它们在初始化多处理池时并行执行 13 次。但我可以想象一个场景,池的进程 1 执行代码无效,因为尚未解压缩任何内容,然后现在它已被初始化,它开始解压缩文件。与此同时,池中的其他进程开始运行其初始化代码,现在发生了冲突。
解决方法是将流程2代码移动如下:
import os
import csv
import collections
import datetime
import zipfile
import re
import shutil
import fnmatch
from pathlib import Path
import ntpath
import configparser
from multiprocessing import Pool
def generate_file_lists():
# Change the following line to a real path
data_files = 'c:\desktop\DataEnergy'
pattern = '*.zip'
last_root = None
args = []
for root, dirs, files in os.walk(data_files):
for filename in fnmatch.filter(files, pattern):
if root != last_root:
last_root = root
if args:
yield args
args = []
args.append((root, filename))
if args:
yield args
def unzip(file_list):
"""
file_list is a list of (root, filename) tuples where
root is the same for all tuples.
"""
# Change the following line to a real path:
counter_part = 'c:\desktop\CounterPart'
for root, filename in file_list:
path = os.path.join(root, filename)
date_zipped_file_s = re.search('-(.\d+)-', filename).group(1)
date_zipped_file = datetime.datetime.strptime(date_zipped_file_s, '%Y%m%d').date()
#Create the new directory location
new_dir = os.path.normpath(os.path.join(os.path.relpath(path, start='c:\desktop\DataEnergy'), ".."))
#Join the directory names counter_part and create their paths.
new = os.path.join(counter_part, new_dir)
#Create the directories
if (not os.path.exists(new)):
os.makedirs(new)
zipfile.ZipFile(path).extractall(new)
#Get al the zipped files
files = os.listdir(new)
#Rename all the files in the created directories
for file in files:
filesplit = os.path.splitext(os.path.basename(file))
if not re.search(r'_\d8.', file):
os.rename(os.path.join(new, file), os.path.join(new, filesplit[0]+'_'+date_zipped_file_s+filesplit[1]))
return False
def process1():
pool = Pool(13)
pool.map(unzip, generate_file_lists())
print('the files have been unzipped!')
#Only read the rows if they fulfill the following conditions.
def filter_row(r, missing_date):
if set(row).intersection(at_set):
if len(r) > 24 and r[24].isdigit():
aantal_pplgs = int(r[24])
date_time = datetime.datetime.fromisoformat(r[0]) if len(r) > 3 else True
condition_3 = date_time.date() == missing_date if len(r) > 3 else True
return condition_3
def process2():
#Start proces 2
all_missing_dates = ['20210701', '20210702']
missing_dates = [datetime.datetime.strptime(i, "%Y%m%d").date() for i in all_missing_dates]
dates_to_process = []
root = Path('.\middle_stage').resolve()
at_set = 'Audi', 'Mercedes', 'Volkswagen'
#Open the files and read the rows
print("Start reading data")
data_per_date = dict()
for missing_date in missing_dates:
print("\tReading missing date: ", missing_date)
files=[fn for fn in (e for e in root.glob(f"**/*_missing_date:%Y%m%d.txt") if e.is_file())]
if len(files) != 13:
continue
dates_to_process.append(missing_date)
vehicle_loc_dict = collections.defaultdict(list)
for file in files:
with open(file, 'r') as log_file:
reader = csv.reader(log_file, delimiter = ',')
next(reader) # skip header
for row in reader:
if filter_row(row, missing_date):
print('filter_row has been executed!')
data_per_date[missing_date] = vehicle_loc_dict
def main():
process1()
process2()
if __name__ == '__main__':
main()
【讨论】:
如果我也有进程 3 怎么办?那是否也需要在if __name__ == '__main__'
下缩进
是的。我认为最好创建函数process1
、process2
和process3
,然后在你的if __name__ == '__main__':
块中调用3 个函数是连续的。或者更好的是只调用函数main
,它调用三个进程函数。查看更新的答案。
说得非常简单(抱歉英语不是我的母语)。 if name 块之外的所有内容都与块内的内容并行执行。这会导致两个不同的进程彼此靠近而不是在彼此之后运行。
还有一个问题,如果我也想为进程 2 实现多处理怎么办?例如,我是否需要创建新的文件列表?
但是process2
似乎没有处理相同的文件列表,还是我遗漏了什么?在这种情况下,只需在 main
中创建 pool
并将其传递给两个函数。【参考方案2】:
主线程
在主线程中,我们需要设置队列并将压缩文件添加到队列中
import threading
import queue
zippedQueue = queue.Queue()
unzippedQueue = queue.Queue()
zippedLock = threading.Lock()
for file in files:
zippedQueue.put(file)
工作线程
class ChiSquaredThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
unzipFile()
# add all your zipped files to the zippedQueue
def unzipFile():
zippedLock.acquire()
if not zippedQueue.empty():
zipped = zippedQueue.get()
zippedLock.release()
# unzip the zipped file here
# add unziped file to queue
unzipedQueue.put(unzippedFile)
然后创建一个与工作线程块类似的块,它执行相同的步骤并处理文件。工作线程中的这个示例块应该可以指导您
【讨论】:
好吧,这似乎是很好的第一步。但是,我很难找到你的 sn-p 和我的关系。那么如何将我的压缩文件添加到zippedQueue
循环遍历主线程中的所有文件并使用zippedQueue.put(file)
然后让您的解压缩线程运行 processFile() 函数并添加解压缩文件的部分。然后创建一个类似的函数来处理文件。
主线程是指def processFile
?我是这个处理/线程的新手,但我真的很想了解它。
@Mediterráneo 我现在编辑了它看看是否对你有帮助
那么我们首先要做什么呢?将文件添加到队列或解压缩它们?还有我的 sn-p 的哪一部分必须被删除才能为你的位置腾出位置?以上是关于如何按时间顺序使用多处理?的主要内容,如果未能解决你的问题,请参考以下文章