Python多处理:全局对象未正确复制给子对象

Posted

技术标签:

【中文标题】Python多处理:全局对象未正确复制给子对象【英文标题】:Python multiprocessing: Global objects not being copied to children properly 【发布时间】:2021-07-12 01:18:24 【问题描述】:

几天前,我回复了question on SO,关于并行读取 tar 文件。

这是问题的要点:

import bz2
import tarfile
from multiprocessing import Pool

tr = tarfile.open('data.tar')

def clean_file(tar_file_entry):
    if '.bz2' not in str(tar_file_entry):
        return
    with tr.extractfile(tar_file_entry) as bz2_file:
        with bz2.open(bz2_file, "rt") as bzinput:
            # Reading bz2 file
            ....
            .... 


def process_serial():
    members = tr.getmembers()
    processed_files = []
    for i, member in enumerate(members):
        processed_files.append(clean_file(member))
        print(f'done i/len(members)')


def process_parallel():
    members = tr.getmembers()
    with Pool() as pool:
        processed_files = pool.map(clean_file, members)
        print(processed_files)


def main():
    process_serial() # No error
    process_parallel() # Error


if __name__ == '__main__':
    main()

正如the answer 中提到的那样,我们只需在子进程而不是父进程中打开 tar 文件即可使错误消失。

我无法理解为什么会这样。

即使我们在父进程中打开tarfile,子进程也会得到一个新的副本。 那么为什么在子进程中打开 tarfile 会显式地产生任何影响呢?

这是否意味着在第一种情况下,子进程以某种方式改变了公共 tarfile 对象并由于并发写入而导致内存损坏?

【问题讨论】:

open 创建一个绑定到进程的文件句柄。在类 UNIX 系统上,它只是一个数字。该数字对于其他进程并不意味着相同。 您可以在here987654323@这个主题上找到一篇有趣的帖子 当我回答您的原始问题时,我发布了代码,展示了如何初始化池中的每个进程以打开 tar 文件,就像您在上面尝试做的那样,每个进程只打开一次池中的进程,而不是每个被提取的成员。你试过运行代码吗? @Booboo 我不是问这个问题的人。我是回答它的人。我试过你的答案,效果很好。其实你和我的回答基本上是一样的。 @AnmolSinghJaggi 我似乎错过了这一点。我突然想到,正如 OP 应该在提出带有 regex 标记的问题时指定使用的语言一样,OP 应该在发布带有 multiprocessing 标记的问题时指定使用的平台。我之前的评论适用于使用spawn 的平台,例如Windows。在我对原始问题的回答中,我还建议 OP 使用 spawn 【参考方案1】:

FWIW,cmets wrt open 中的答案实际上在类 UNIX 系统上关于文件句柄编号是不正确的。

如果 multiprocessing 使用 fork()(它在 Linux 和类似系统下使用,尽管我读到在 macOS 上分叉存在问题),文件句柄和其他所有内容都会愉快地复制到子进程(通过“愉快地”我的意思是它在许多边缘情况下很复杂,例如分叉线程,但它仍然适用于文件句柄。

以下对我来说很好用:

import multiprocessing

this = open(__file__, 'r')


def read_file():
    print(len(this.read()))


def main():
    process = multiprocessing.Process(target=read_file)
    process.start()
    process.join()


if __name__ == '__main__':
    main()

问题可能是tarfile 在读取时具有内部结构和/或缓冲,您也可以通过尝试同时查找和读取同一存档的不同部分来简单地遇到冲突。即,我推测在这种情况下使用没有任何同步的线程池可能会遇到完全相同的问题。

编辑:澄清一下,从 Tar 存档中提取文件可能(我尚未检查确切的细节)如下:(1) 寻求封装部分(文件)的偏移量,(2)读取封装文件的一个块,将该块写入目标文件(或管道,或 w/e),(3)重复(2)直到整个文件提取出来的。

通过尝试以非同步方式从使用相同文件句柄的并行进程中执行此操作,可能会导致这些步骤的混合,即开始处理文件 #2 将远离文件 #1,而我们正在在读取文件#1等的中间

Edit2 回答下面的评论:内存表示是为子进程重新分叉的,这是真的;但是在内核端管理的资源(例如文件句柄和内核缓冲区)是共享的。

举例说明:

import multiprocessing

this = open(__file__, 'rb')


def read_file(worker):
    print(worker, this.read(80))


def main():
    processes = []

    for number in (1, 2):
        processes.append(
            multiprocessing.Process(target=read_file, args=(number,)))

    for process in processes:
        process.start()
    for process in processes:
        process.join()


if __name__ == '__main__':
    main()

在 Linux 上运行我得到:

$ python3.8 test.py 
1 b"import multiprocessing\n\nthis = open(__file__, 'rb')\n\n\ndef read_file(worker):\n   "
2 b''

如果查找和读取是独立的,则两个过程都会打印相同的结果,但它们不会。由于这是一个小文件,并且 Python 选择缓冲少量数据(8 KiB),第一个进程读取到 EOF,而第二个进程没有数据要读取(除非它当然要回溯)。

【讨论】:

但是tarfile的内存表示应该重新复制到每个子进程;那么一个搜索将如何干扰另一个搜索?你的意思是磁盘上的实际tarfile吗?在那种情况下,OSX(或任何现代操作系统)不保证多个进程对单个文件的并发读取能力吗?事实上,这就是为什么第二个程序可以正常运行的原因! @AnmolSinghJaggi 查看更新后的答案;我的意思是磁盘上的实际源 tar 文件,在竞争读取和查找操作的工作人员之间共享。回答时,我确实假设选择 fork() 的 Unix Python,我的回答在 macOS 上可能无效:docs.python.org/3/library/… 你是对的。当我们使用 fork 方法时,文件偏移量在进程之间以某种方式共享。然而,在 spawn 下,我们得到了 2 个完全不同的文件句柄。如果你在程序顶部写multiprocessing.set_start_method('spawn'),你会注意到不同的输出。

以上是关于Python多处理:全局对象未正确复制给子对象的主要内容,如果未能解决你的问题,请参考以下文章

何时在面向对象的 php 中的超级全局变量中使用“@”? [复制]

python 多进程共享全局变量之Manager()

将状态传递给子组件时未定义状态

为啥多处理工作者不能更改全局变量? [复制]

多处理全局变量更新未返回给父级

AJAX 全局阵列存储