在 Python 中划分大文件以进行多处理的最佳方法是啥?
Posted
技术标签:
【中文标题】在 Python 中划分大文件以进行多处理的最佳方法是啥?【英文标题】:What's the best way to divide large files in Python for multiprocessing?在 Python 中划分大文件以进行多处理的最佳方法是什么? 【发布时间】:2010-12-21 20:27:18 【问题描述】:我遇到了很多“令人尴尬的并行”项目,我想与 multiprocessing
模块并行化。但是,它们通常涉及读取大文件(大于 2gb),逐行处理它们,运行基本计算,然后写入结果。使用 Python 的多处理模块拆分文件并处理它的最佳方法是什么?应该使用multiprocessing
中的Queue
或JoinableQueue
吗?还是Queue
模块本身?或者,我应该使用multiprocessing
将可迭代的文件映射到进程池吗?我已经尝试过这些方法,但是逐行分发数据的开销是巨大的。我已经通过使用cat file | process1 --out-file out1 --num-processes 2 | process2 --out-file out2
确定了轻量级管道过滤器设计,它将第一个进程的输入的一定百分比直接传递给第二个输入(参见this post),但我希望有一个完全包含的解决方案在 Python 中。
令人惊讶的是,Python 文档并没有建议这样做的规范方法(尽管multiprocessing
文档中关于编程指南的部分很长)。
谢谢, 文斯
附加信息:每行的处理时间各不相同。有些问题很快,几乎不受 I/O 限制,有些受 CPU 限制。受 CPU 限制的非依赖任务将从并行化中获得优势,因此即使将数据分配给处理功能的低效方式在挂钟时间方面仍然是有益的。
一个典型的例子是一个脚本,它从行中提取字段,检查各种按位标志,并将带有某些标志的行以全新的格式写入新文件。这似乎是一个 I/O 绑定问题,但是当我使用带有管道的廉价并发版本运行它时,它快了大约 20%。当我使用 pool 和 map 运行它,或者在 multiprocessing
中排队时,它总是慢 100% 以上。
【问题讨论】:
这是我对原本花哨的脚本语言的一大抱怨——简单的并发计算是没有线程的痛苦。当然,您可以完成它,但是使用线程和锁模型,有些工作会简单得多。 线程“并行”版本(我相信)永远不会更快,除了线程比进程创建速度更快。 GIL 是 CPU 密集型多线程程序的一个巨大瓶颈。此外,没有需要在进程/线程之间共享的可变对象,因此多线程并不真正需要多处理。 @Vince 实际上,这完全取决于具体情况。在你的,它可能永远不会。在其他情况下,它可能。我的观点是,对于我需要执行的大多数并发操作(在 C 中),当线程和锁提供更简单的模型时,很少有理由使用正确 IPC 所需的额外操作。对于需要更好地扩展并跨不同机器的更大问题,情况就不同了。 @san,我不应该说“从不”——我同意。对于某些网络绑定或 I/O 绑定的情况,线程肯定会更快。 @Vince 是的,这就是我的来历。除了我的硕士研究(我在 Python 中完成的)之外,我的实际并发编程一直处于这种情况下:要么从慢速物理设备读取并在另一个线程上做出反应或计算,要么只是试图在我 / O 正在进行中。 【参考方案1】:最好的架构之一已经成为 Linux 操作系统的一部分。不需要特殊的库。
你想要一个“扇出”设计。
“主”程序创建了许多通过管道连接的子进程。
主程序读取文件,将行写入管道,执行将行处理到适当的子进程所需的最小过滤。
每个子进程可能应该是从标准输入读取和写入的不同进程的管道。
您不需要队列数据结构,这正是内存中的管道——两个并发进程之间的字节队列。
【讨论】:
我将研究在 Python 中实现这种方法,因为多处理模块具有管道。正如您在原始帖子中看到的,我在 shell 中使用了这种方法,并取得了巨大的成功。我曾经天真地认为我永远无法使用管道实现数据并行。 简单的壳管是并行的理想形式。这是 Linux 最擅长的。这通常是完美的解决方案。 这是结果:github.com/vsbuffalo 和在 32 CPU 机器上的结果 paste.pocoo.org/show/154252。谢谢 S.Lott! @Vince:请使用enumerate
函数而不是您自己的计数器。它从事物中删除了另外两行代码,从而导致了更多的简化。操作系统功能——用于简单的并行进程——有时就是你所需要的。【参考方案2】:
一种策略是为每个工作人员分配一个偏移量,因此如果您有 8 个工作进程,则分配编号 0 到 7。工作编号 0 读取第一个记录处理,然后跳过 7 并继续处理第 8 个记录等, 1号工人读取第二条记录然后跳过7并处理第9条记录............
这个方案有很多优点。不管文件有多大,工作总是被平均分配,同一台机器上的进程将以大致相同的速率处理,并使用相同的缓冲区,因此您不会产生任何过多的 I/O 开销。只要文件尚未更新,您就可以重新运行各个线程以从故障中恢复。
【讨论】:
【参考方案3】:你没有提到你是如何处理这些行的;可能是最重要的信息。
每一行都是独立的吗?计算是否依赖于一行在下一行之前?必须分块处理吗?每条线的处理需要多长时间?是否有一个处理步骤必须在最后包含“所有”数据?或者是否可以丢弃中间结果而只保留一个运行总数?可以通过将文件大小除以线程数来初始拆分文件吗?或者它会随着您的处理而增长?
如果行是独立的并且文件没有增长,您唯一需要的协调就是将“起始地址”和“长度”分配给每个工作人员;他们可以独立打开并查找文件,然后您必须简单地协调他们的结果;也许通过等待 N 个结果返回到队列中。
如果行不是独立的,答案将很大程度上取决于文件的结构。
【讨论】:
对不起,每个文件都是独立的,没有任何依赖,没有任何共享(可选计数器除外)。一个典型的例子是一个函数,它接受一行,决定是否要保留它,对保留的行执行一些小计算,格式化这些计算,然后将这些行写入该过程的文件。然后可以在一个单独的过程中将所有文件连接在一起。关于文件搜索——搜索是通过 Python 中的字节数来完成的,这可能会在匹配行和字节时引入复杂性。值得吗? PS:文件不会增长,中间结果会附加到文件中(每个进程一个文件,以防止 I/O 写入冲突)。这确实是一个令人尴尬的并行问题。【参考方案4】:我知道您专门询问过 Python,但我建议您查看 Hadoop (http://hadoop.apache.org/):它实现了专为解决此类问题而设计的 Map 和 Reduce 算法。
祝你好运
【讨论】:
你还不知道它是否是为这个问题而设计的。正如其他人指出的那样,我们对这个问题了解不够。 @ San Jacinto ... 我似乎读到“它们通常涉及读取大文件(大于 2gb),逐行处理它们,运行基本计算,然后写入结果”这已经足够好了对我来说,因为我没有给出具体的实现细节,而是一个一般性的观察。冷静点,伙计。 我以前用过hadoop和map/reduce。我都喜欢,并且 map/reduce 可以(并且在某种程度上)被应用在这里。 Hadoop 通过其 HFS (IIRC) 解决了一些 I/O 问题。我在询问 map/reduce 之前的步骤:采用哪种方法来划分文件以在其上映射函数。排队?文件可迭代? @Arrieta 没问题。也只是陈述一个观察结果:你回答了一个没有被问到的问题:)。如果你冒犯了,请把它重新读成“我们有”而不是“你有”。你会注意到,我没有投反对票。【参考方案5】:这在很大程度上取决于文件的格式。
在任何地方拆分它有意义吗?还是您需要将其拆分为新行?或者您是否需要确保在对象定义的末尾拆分它?
您应该在同一个文件上使用多个阅读器,而不是拆分文件,使用os.lseek
跳转到文件的适当部分。
更新:海报补充说他想在新行上拆分。然后我提出以下建议:
假设您有 4 个进程。然后简单的解决方案是 os.lseek 到文件的 0%、25%、50% 和 75%,并读取字节,直到你打到第一个新行。这是每个过程的起点。您不需要拆分文件来执行此操作,只需在每个进程中寻找大文件中的正确位置并从那里开始读取。
【讨论】:
我更新了评论以解释如何在您的情况下使用 os.lseek。【参考方案6】:Fredrik Lundh 的Some Notes on Tim Bray's Wide Finder Benchmark 是一本有趣的读物,关于一个非常相似的用例,提供了很多好的建议。其他许多作者也实现了相同的东西,其中一些链接自文章,但您可能想尝试在谷歌上搜索“python Wide finder”或其他内容以找到更多内容。 (还有一个基于 multiprocessing
模块的解决方案,但似乎不再可用)
【讨论】:
很遗憾,许多提交的来源难以追踪。从widefinder/widefinder2的条目中可以学到很多有用的技术。【参考方案7】:如果运行时间很长,则不要让每个进程通过Queue
读取其下一行,而是让进程读取成批的行。这样,开销将分摊到几行(例如数千行或更多行)。
【讨论】:
以上是关于在 Python 中划分大文件以进行多处理的最佳方法是啥?的主要内容,如果未能解决你的问题,请参考以下文章
在 Python/PySpark 中 Spark 复制数据框列的最佳实践?
python尝试不同的随机数进行数据划分使用卡方检验依次计算不同随机数划分下训练接和测试集所有分类特征的卡方检验的p值,如果所有p值都大于0.05则训练集和测试集都具有统计显著性数据划分合理