Python 多处理与多线程相结合

Posted

技术标签:

【中文标题】Python 多处理与多线程相结合【英文标题】:Python Multiprocessing combined with Multithreading 【发布时间】:2015-02-11 20:14:58 【问题描述】:

我不确定我想要做的是否是一种有效的做法,但它是这样的: 我需要我的程序高度并行化,所以我认为我可以创建 2-3 个进程,每个进程可以有 2-3 个线程。

1) 这可能吗? 2)这有什么意义吗? 3) 这是我的代码,但是当我尝试加入进程时它挂起。

PQ = multiprocessing.Queue()

[...]

def node(self, files, PQ):

        l1, l2 = self.splitList(files)
        p1 = multiprocessing.Process(target=self.filePro, args=(l1,PQ,))
        p2 = multiprocessing.Process(target=self.filePro, args=(l2,PQ,))
        p1.daemon = True
        p2.daemon = True
        p1.start()
        p2.start()

        p1.join() # HANGS HERE
        p2.join()
        while 1:
            if PQ.empty():
                break
            else:
                print(PQ.get())
        PQ.join()

    def filePro(self,lst,PQ):
        TQ = queue.Queue()
        l1, l2 = self.splitList(lst)
        t1 = threading.Thread(target=self.fileThr, args=('a',l1,TQ,))
        t2 = threading.Thread(target=self.fileThr, args=('b',l2,TQ,))
        t1.daemon = True
        t2.daemon = True
        t1.start()
        t2.start()

        t1.join()
        t2.join()
        while 1:
            if TQ.empty():
                break
            else:
                PQ.put(TQ.get())
                TQ.task_done()
        TQ.join()

def fileThr(self,id,lst,TQ):
        while lst:
            tmp_path = lst.pop()
            if (not tmp_path[1]):
                continue
            for item in tmp_path[1]:
                TQ.put(1)
        TQ.join()

【问题讨论】:

当我需要最大化 cpu 使用率时我使用进程,当我有磁盘访问、网络等阻塞操作时我使用线程。所以如果我有一个脚本来下载许多文件,我会创建一个池线程并使用它。如果我有一个 CPU 峰值的分布式计算,我会使用一个进程池。 如果您希望我们调试您的代码,我们需要minimal, complete, verifiable example。 【参考方案1】:

1) 这可能吗?

是的。


2) 这有什么意义吗?

是的。但通常不是您要寻找的重点。

首先,几乎每个现代操作系统都使用“平面”调度程序;分散在 3 个程序中的 8 个线程或分散在 8 个程序中的 8 个线程之间没有区别。*

* 某些程序可以在您知道您只与来自同一程序的线程共享的某些地方谨慎使用进程内锁或其他同步原语,从而获得显着的好处——当然,通过避免在这些地方共享内存——但你不会通过将作业均匀地分布在线程之间以及将线程均匀地分布在进程中来获得这种好处。

其次,即使您使用的是旧版 SunOS,在默认的 CPython 解释器中,全局解释器锁 (GIL) 也可确保一次只能有一个线程运行 Python 代码。如果您花时间运行显式释放 GIL 的 C 扩展库中的代码(如一些 NumPy 函数),线程可以提供帮助,但除此之外,它们最终都会被序列化。

线程和进程一起使用的主要情况是您同时进行 CPU 密集型和 I/O 密集型工作。在这种情况下,通常一个人在喂另一个人。如果 I/O 为 CPU 供电,则在主进程中使用单个线程池来处理 I/O,然后使用工作进程池来对结果进行 CPU 工作。如果是相反的,使用一个工作进程池来做 CPU 工作,然后让每个工作进程使用一个线程池来做 I/O。


3) 这是我的代码,但是当我尝试加入进程时它挂起。

如果您不提供minimal, complete, verifiable example,则很难调试代码。

但是,我可以看到一个明显的问题。

您正在尝试使用 TQ 作为生产者-消费者队列,t1t2 作为生产者,filePro 父作为消费者。在t1.join()t2.join() 返回之前,您的消费者不会调用TQ.task_done(),这在这些线程完成之前不会发生。但是那些制作人不会完成,因为他们在等你打电话给TQ.task_done()。所以,你遇到了僵局。

而且,由于您的每个子进程的主线程都处于死锁状态,它们永远不会完成,因此p1.join() 将永远阻塞。

如果你真的希望主线程等到其他线程完成后再做任何工作,你不需要生产者-消费者习惯用法;只需让孩子们完成他们的工作并退出而不调用TQ.join(),并且不要在父母中打扰TQ.task_done()。 (请注意,您已经使用 PQ 正确执行此操作。)

另一方面,如果您希望它们并行工作,则在完成循环之前不要尝试join 子线程。

【讨论】:

谢谢!这是一个非常完整的答案,但是现在我对您的第二个答案还有 1 个问题。 1)关于 GIL,这是否意味着如果我生成 30 个线程,它将与生成 1 个线程相同?因为你说它们最终都会被序列化...... @AngeloUknown:不,不完全相同。你没有得到并行性。也就是说,即使您有 32 个内核,使用 30 个线程也不会比使用 1 个线程运行得更快。但是您确实获得了并发。线程自动在你的任务之间交错工作——它们可以自动处理阻塞——例如,如果一个线程正在等待 I/O,系统将安排另一个线程运行而不是阻塞整个程序。除非您编写显式死锁的代码(如您的示例中所示),否则一个线程不会阻止另一个线程继续进行。 @AngeloUknown:我找不到一个很好的资源来讨论 Python 特定术语的差异,但是如果你忽略 Haskell 特定的东西,Haskell wiki 上的 Parallelism vs. Concurrency 是一个很好的概述. 您已经提到 - “全局解释器锁 (GIL) 确保一次只能有一个线程运行 Python 代码”。你的意思是每个进程一个 GIL?【参考方案2】:

我比较了以下 3 种方法在 IO+CPU 和严格的 CPU 昂贵阻塞任务上的行为:

仅限多处理 仅多线程 两者都使用fast_map 函数组合

当使用多处理和多线程组合时,IO+CPU 昂贵任务的结果显示速度显着提高。 “-1”表示 ProcessPoolExecutor 由于“打开的文件太多”而失败。

严格 CPU 昂贵任务的结果表明,多处理本身稍快。

fast_map 函数为每个 cpu-core*2 生成一个进程,并在每个进程中创建足够数量的线程以实现完全并发(除非提供了 threads_limit 参数)。源代码、测试代码更多信息可从fast_map GitHub page获取。

如果有人想玩弄它或只是实际使用它,可以通过以下方式获得它:

python3 -m pip install fast_map

并像这样使用:

from fast_map import fast_map
import time

def wait_and_square(x):
    time.sleep(1)
    return x*x

for i in fast_map(wait_and_square, range(8), threads_limit=None):
    print(i)

【讨论】:

以上是关于Python 多处理与多线程相结合的主要内容,如果未能解决你的问题,请参考以下文章

多线程与多进程的比较

单例模式与多线程

Python多线程与多进程

将 asyncio 与多处理结合起来会出现啥样的问题(如果有的话)?

Python线程队列与多处理管道

将多线程和多处理与 concurrent.futures 相结合