运行一个并行进程,保存 Python 中主进程的结果

Posted

技术标签:

【中文标题】运行一个并行进程,保存 Python 中主进程的结果【英文标题】:Run a parallel process saving results from a main process in Python 【发布时间】:2021-05-19 08:36:32 【问题描述】:

我有一个函数可以为任务列表创建一些结果。我想将结果即时保存到 1) 释放内存,而不是保存到附加到 results_list 和 2) 有第一部分的结果以防出错。

这是一个非常短的示例代码:

for task in task_list:
    result = do_awesome_stuff_to_task(task)
    save_nice_results_to_db(result)  # Send this job to another process and let the main process continue

主进程有没有办法为 task_list 中的每个任务创建结果,并且每次创建结果时都将其发送到另一个处理器/线程以保存它,因此主循环可以继续而无需等待缓慢的保存过程?

我看过多处理,但这似乎主要是为了加快 task_list 上的循环,而不是允许辅助子进程完成其他部分的工作。我也研究过 asyncio,但这似乎主要用于 I/O。

总而言之,我正在寻找一种方法让主进程在 task_list 上循环。对于完成的每个任务,我想将结果发送到另一个子流程以保存结果。请注意,do_awesome_stuff_to_task 比保存过程快得多,因此,在保存第一个任务之前,主循环将通过多个任务。我想了两种方法来解决这个问题:

    使用多个子进程保存 保存每 xx 次迭代 - save_results 规模正常,所以也许保存过程可以在主循环继续时一次保存 xx 次迭代?

这可能与Python有关吗?在哪里查看以及需要考虑哪些关键因素?

感谢所有帮助。

【问题讨论】:

如果save_results 因磁盘速度而变慢,多处理可能会使情况变得更糟 我可能会因为多种原因变慢。无论如何,我想知道,Python 是否可以实现类似请求的操作 一般来说,在进程之间传递数据不是很快也不是很有效。您可以通过使用类似multiprocessing.shared_memory 的方式更快地传输信息来减少开销,但这并不简单。如果可以让save_results 释放 GIL,线程是你最好的选择。同样,如果您依赖于 spin rust,进行更多并发调用将导致整体吞吐量变慢 也许我应该澄清一下;我建议在任何时候需要在并发发生的事情之间传输大量数据时对多处理进行线程化,因为线程共享内存空间并且没有这样的“传输”。您需要保护对象免受并发访问,以免损坏数据,但不会产生传输开销。然而,GIL 限制了您可以从线程中获得的额外 CPU 收益,但是如果您使用释放 GIL 的东西,您可以解决这个问题(包括许多 IO 操作、numpy 等......) @Aaron 如果您将评论写为答案,我会将其标记为答案 - 也许您会给出一个使用线程以及如何释放 GIL 的简单示例? 【参考方案1】:

如果不进行测试,很难知道在您的情况下什么会更快,但这里有一些关于如何选择要做什么的想法。

如果save_nice_results_to_db 速度很慢,因为它正在将数据写入磁盘或网络,请确保您尚未达到硬件的最大写入速度。根据另一端的服务器,网络流量有时可以从一次打开多个端口进行读/写中受益匪浅,只要您保持在总网络传输速度(mac 接口和 ISP 的)范围内。 SSD 可以从一次启动多个读/写中看到一些有限的好处,但太多会损害性能。当尝试一次做多件事情时,HDD 几乎普遍较慢。每次读取/写入更大的块都更高效。

multiprocessing 通常必须使用pickle 在父进程和子进程之间传输数据,因为它们不共享内存。这有很高的开销,所以如果result 是一个大对象,那么将数据发送到子进程的额外开销可能会比任何并发性所节省的时间都多。 (强调可能。始终为自己测试)。从 3.8 开始,添加了 shared_memory 模块,它可能更高效,但灵活性和易用性要低得多。

threading 受益于所有线程共享内存,因此在线程之间“发送”数据的传输开销为零。然而,由于 GIL(全局解释器锁),Python 线程无法同时执行字节码,因此无法利用多个 CPU 内核来提高计算速度。这是因为 python 本身有很多不是线程安全的部分。用 c 编写的特定函数可能会释放此锁以解决此问题并使用线程利用多个 cpu 内核,但是一旦执行返回到 python 解释器,该锁将再次被持有。通常涉及网络访问或文件 IO 的函数可以释放 GIL,因为解释器正在等待通常是线程安全的操作系统调用。其他流行的库(如 Numpy)也在努力释放 GIL,同时对大型数组进行复杂的数学运算。但是,您只能从 c/c++ 代码中释放 GIL,而不能从 python 本身中释放 GIL。

asyncio 应该在这里特别提及,因为它是专门为并发网络/文件操作而设计的。它使用协程而不是线程(甚至比线程更低的开销,线程本身的开销比进程低得多)来排队一堆操作,然后使用操作系统调用来等待它们中的任何一个完成(事件循环)。使用它还需要您的 do_awesome_stuff_to_task 在协程中发生,以便它与 save_nice_results_to_db 同时发生。

将每个result 触发到要处理的线程的简单示例:

for task in task_list:
    result = do_awesome_stuff_to_task(task)
    threading.Thread(target=save_nice_results_to_db, args=(result,)).start()  # Send this job to another process and let the main process continue

【讨论】:

以上是关于运行一个并行进程,保存 Python 中主进程的结果的主要内容,如果未能解决你的问题,请参考以下文章

Python中主进程和子进程之间的动态通信

python 线程进程并发并行协程进程池互斥锁

python 线程进程并发并行协程进程池互斥锁

与 Python 并行运行子进程

Python3 系列之 并行编程

使用 python 子进程运行 GNU 并行命令