哪个是在 Python 中并行运行多个任务的最佳方式

Posted

技术标签:

【中文标题】哪个是在 Python 中并行运行多个任务的最佳方式【英文标题】:Which is the best way to parallelly running multiple tasks in Python 【发布时间】:2021-05-06 02:25:08 【问题描述】:

我有一个函数:

import time

def all_40k():
    for _ in range(400000):
        print('validate')
        print('parsing')
        print('inserting')
if __name__ == '__main__':
    start_time = time.time()
    all_40k()
    print(f'used time:time.time()-start_time')

输出是:

used time:9.545064210891724

因为同一个函数重复了 40k 次,所以我想让 4 个并行函数同时运行,每个函数运行 10k,理想情况下这会快 4 倍。

所以我首先尝试了多线程:

import threading
import time
def first_10k():
    for _ in range(100000):
        print('validate')
        print('parsing')
        print('inserting')


def second_10k():
    for _ in range(100000):
        print('validate')
        print('parsing')
        print('inserting')

def third_10k():
    for _ in range(100000):
        print('validate')
        print('parsing')
        print('inserting')

def forth_10k():
    for _ in range(100000):
        print('validate')
        print('parsing')
        print('inserting')

thread1 = threading.Thread(target=first_10k)
thread2 = threading.Thread(target=second_10k)
thread3 = threading.Thread(target=third_10k)
thread4 = threading.Thread(target=forth_10k)

thread1.start()
thread2.start()
thread3.start()
thread4.start()
if __name__ == '__main__':
    start_time = time.time()
    thread1.join()
    thread2.join()
    thread3.join()
    thread4.join()
    print(f'used time:time.time()-start_time')

令我惊讶的是,输出是:

used time:23.058093309402466

然后我尝试了 asyncio:

import time
import asyncio

async def test_1():
    for _ in range(100000):
        print('validate')
        print('parsing')
        print('inserting')


async def test_2():
    for _ in range(100000):
        print('validate')
        print('parsing')
        print('inserting')


async def test_3():
    for _ in range(100000):
        print('validate')
        print('parsing')
        print('inserting')


async def test_4():
    for _ in range(100000):
        print('validate')
        print('parsing')
        print('inserting')


async def multiple_tasks():
  input_coroutines = [test_1(), test_2(), test_3(),test_4()]
  res = await asyncio.gather(*input_coroutines, return_exceptions=True)
  return res

if __name__ == '__main__':
  start_time = time.time()
  res1, res2 ,res3,res4 = asyncio.get_event_loop().run_until_complete(multiple_tasks())
  print(f'used time:time.time()-start_time')

输出是:

used time:9.295843601226807

最后我尝试了 ProcessPoolExecutor:

import time
from concurrent.futures import ProcessPoolExecutor
def data_handler(urls):
    for i in range(urls[0], urls[1]):
        print('validate')
        print('parsing')
        print('inserting')

def run():
    urls = [(1,100000),(100001,200000),(2000001,300000),(300001,400000)]
    with ProcessPoolExecutor() as excute:
        excute.map(data_handler,urls)

if __name__ == '__main__':
    start_time = time.time()
    run()
    stop_time = time.time()
    print('used time %s' % (stop_time - start_time))

输出是:

used time 12.726619243621826

那么我怎样才能加快这个过程呢?我想我走错路了。有朋友可以帮忙吗?最好的问候!

【问题讨论】:

新线程或进程需要一些时间来创建它,对于小数据最终可能需要更多时间。它对更大的数据很有用。使用线程也可能是一个问题,Python 有GIL,它一次只运行一个线程(不是同时运行),线程只能用于使用input-output 的问题,比如对网页或文件的请求。 asyncio 可能会出现类似的问题,它以类似的方式工作,并且仅适用于像 asyncio.sleep 这样的异步函数或从文件或网络读取数据的函数。对于print(),它不会给出结果。 最好在使用 url 从服务器获取数据而不是使用 print() 的真实代码上尝试它 - 它可能会更快地并行工作。你可以使用httpx,它使用asyncio 嗨@furas 非常感谢您的回复,在我使用了一些我发现的实际功能后,ProcessPoolExecutor 和线程都可以工作,节省了一半的时间。非常感谢! 【参考方案1】:

好的,你注意到了什么:

No parallelism   9.545064210891724
asyncio          9.295843601226807
multithreading   12.726619243621826
Thread Pool      23.058093309402466

首先,Asyncio 实际上并不使用线程,如果您能猜到,性能依赖于一些 I/O。 Asyncio 在一个循环中的任务之间交替,每当遇到await 时就会切换。如果不使用await,它将最终一次运行每个任务,并且根本不切换。

使用线程,由于Global Interpreter Lock,只有一个线程能够控制 Python 解释器。你在这里最终得到的是来自不同线程的一堆争用,它们都试图同时工作。这种上下文切换会降低您的应用程序的速度。与 asyncio 类似,如果您想在等待某些 I/O 时安排其他工作,您实际上只能获得这些加速。

好的,所以现在肯定多处理案例应该运行得更快.. 对吗?好吧,每个进程都有自己的解释器锁,但是,阻塞在您的print 语句中。每个进程都被阻止尝试将其输出发送到同一个控制台管道。让我举个例子。

假设我们有一个方法要运行 4 次。一次串行一次并行

def run(thread):
    print(f"Starting thread: thread")
    for i in range(500000):
        print('foobar')
    print(f"Finished thread: thread")


def run_singlethreaded():
    start_time = time.time()

    for thread in ["local"] * 4:
        run(thread)

    stop_time = time.time()
    return stop_time - start_time


def run_multiprocessing():
    start_time = time.time()

    with ProcessPoolExecutor(max_workers=4) as ex:
        ex.map(run, ["mp0", "mp1", "mp2", "mp3"])

    stop_time = time.time()
    return stop_time - start_time

if __name__ == '__main__':
    singlethreaded_time = run_singlethreaded()
    multiprocessing_time = run_multiprocessing()
    print(f"Finished singlethreaded in:  singlethreaded_time")
    print(f"Finished multiprocessing in: multiprocessing_time")

如果我们运行此程序并打印您会惊讶地看到的时间:

Finished singlethreaded in:  10.513998746871948
Finished multiprocessing in: 12.252000570297241

现在,如果我们将打印更改为更简单的内容,不会导致 IO 瓶颈:

def run(thread):
    print(f"Starting thread: thread")
    for i in range(100000000):
        pass
    print(f"Finished thread: thread")

您将获得预期的并行速度:

Finished singlethreaded in:  9.816999435424805
Finished multiprocessing in: 2.503000020980835

这里重要的一点是,在并行性可以帮助您之前,您需要了解您的资源在哪里受限。在 IO 绑定应用程序的情况下,线程或异步可能会有所帮助。在 CPU 密集型应用程序的情况下,多处理可能很有用。在其他时候,两者都不会真正帮助您(例如print 语句),因为瓶颈存在于应用程序外部的系统中。希望这会有所帮助!

【讨论】:

嗨@flakes 我真的很感谢你的回答,在我使用了一些我发现的真实功能之后,ProcessPoolExecutor 和线程都可以工作,节省了一半的时间。非常感谢! @William 很高兴为您提供帮助。作为一般规则,我更喜欢 asyncio 而不是多线程而不是多处理。多处理是一种更复杂的机制,对于非 CPU 绑定的应用程序通常会更慢。多处理还使共享数据更加复杂,因为每个进程都有独立的内存。 Asyncio 优于多线程,因为没有上下文切换。在许多情况下,Asyncio 可以更快且更简单,但缺点是您需要使用实现异步接口的 IO 绑定方法,但情况并非总是如此,尤其是对于某些第三方库。

以上是关于哪个是在 Python 中并行运行多个任务的最佳方式的主要内容,如果未能解决你的问题,请参考以下文章

python 之进程

多任务-线程

使用不同的参数并行运行相同的函数,并知道哪个并行运行在 python 中结束了

Python并发编程—进程

使用 python 3.6 将多个文件并行加载到内存中的最佳方法是啥?

Python:多任务,并发,并行的理解及线程进程的对比