从 Python 并行批量读取文件
Posted
技术标签:
【中文标题】从 Python 并行批量读取文件【英文标题】:Read file in parallel batches from Python 【发布时间】:2017-06-26 19:56:38 【问题描述】:我在二进制文件中有数百 GB 的数据。我想对数据进行随机抽样,多次随机读取几条连续的记录。
数据存储在许多文件中。主文件不按任何特定顺序存储数据,因此每个文件都有一个排序的索引文件。我现在的代码是这样的,只不过文件很多:
index = open("foo.index", 'rb')
data = open("foo", 'rb')
index_offset_format = 'Q'
index_offset_size = struct.calcsize(index_offset_format)
record_set = []
for _ in range(n_batches):
# Read `batch_size` offsets from the index - these are consecutive,
# so they can be read in one operation
index_offset_start = random.randint(0, N_RECORDS - batch_size)
index.seek(index_offset_start)
data_offsets = struct.iter_unpack(
index_offset_format,
index.read(index_offset_size * batch_size))
# Read actual records from data file. These are not consecutive
records = []
for offset in data_offsets:
data.seek(offset)
records.append(data.read(RECORD_SIZE))
record_set.append(records)
然后对记录进行其他操作。从 profiling 中,我看到该程序是 IO 密集型的,大部分时间都花在index.read
和data.read
上。我怀疑这是因为read
正在阻塞:解释器在请求下一个随机数据块之前等待操作系统从磁盘读取数据,因此操作系统没有机会优化磁盘访问模式。那么:是否有一些文件 API 可以传递一批指令?比如:
def read_many(file, offsets, lengths):
'''
@param file: the file to read from
@param offsets: the offsets to seek to
@param lengths: the lengths of data to read
@return an iterable over the file contents at the requested offsets
'''
或者,打开多个文件对象并使用多线程请求多次读取就足够了吗?或者GIL 会阻止它发挥作用吗?
【问题讨论】:
相关:***.com/questions/29270818/…. 文件的最小、最大和平均大小是多少? 【参考方案1】:由于进程是 IO 绑定的,因此读取的边界由操作系统的磁盘操作调度程序和磁盘的缓存设置。
使用multiprocessing.Pool.imap_unordered()
可以轻松实现实际的每核并行化:
def pmap(fun, tasks):
from multiprocessing import Pool
with Pool() as pool:
yield from pool.imap_unordered(fun, tasks)
for record_set in pmap(process_one_file, filenames):
print(record_set)
同时打开多个文件,并且每个内核可能正在执行read()
,应该允许磁盘调度程序找出比文件名列表强加的串行调度更好的调度。
imap_unordered()
的美妙之处在于它将后处理与一个任务比另一个更早完成的时间、方式和原因分离(不同运行的顺序可能不同)。
如 cmets 中所述,GIL 仅在 Python 代码执行期间涉及,而对于 I/O 阻塞的程序则不涉及。
【讨论】:
以上是关于从 Python 并行批量读取文件的主要内容,如果未能解决你的问题,请参考以下文章
如何批量读取csv格式的文件名及文件内容到新的Excel中?