哪个是在 Python 中并行运行多个任务的最佳方式
Posted
技术标签:
【中文标题】哪个是在 Python 中并行运行多个任务的最佳方式【英文标题】:Which is the best way to parallelly running multiple tasks in Python 【发布时间】:2021-05-06 02:25:08 【问题描述】:我有一个函数:
import time
def all_40k():
for _ in range(400000):
print('validate')
print('parsing')
print('inserting')
if __name__ == '__main__':
start_time = time.time()
all_40k()
print(f'used time:time.time()-start_time')
输出是:
used time:9.545064210891724
因为同一个函数重复了 40k 次,所以我想让 4 个并行函数同时运行,每个函数运行 10k,理想情况下这会快 4 倍。
所以我首先尝试了多线程:
import threading
import time
def first_10k():
for _ in range(100000):
print('validate')
print('parsing')
print('inserting')
def second_10k():
for _ in range(100000):
print('validate')
print('parsing')
print('inserting')
def third_10k():
for _ in range(100000):
print('validate')
print('parsing')
print('inserting')
def forth_10k():
for _ in range(100000):
print('validate')
print('parsing')
print('inserting')
thread1 = threading.Thread(target=first_10k)
thread2 = threading.Thread(target=second_10k)
thread3 = threading.Thread(target=third_10k)
thread4 = threading.Thread(target=forth_10k)
thread1.start()
thread2.start()
thread3.start()
thread4.start()
if __name__ == '__main__':
start_time = time.time()
thread1.join()
thread2.join()
thread3.join()
thread4.join()
print(f'used time:time.time()-start_time')
令我惊讶的是,输出是:
used time:23.058093309402466
然后我尝试了 asyncio:
import time
import asyncio
async def test_1():
for _ in range(100000):
print('validate')
print('parsing')
print('inserting')
async def test_2():
for _ in range(100000):
print('validate')
print('parsing')
print('inserting')
async def test_3():
for _ in range(100000):
print('validate')
print('parsing')
print('inserting')
async def test_4():
for _ in range(100000):
print('validate')
print('parsing')
print('inserting')
async def multiple_tasks():
input_coroutines = [test_1(), test_2(), test_3(),test_4()]
res = await asyncio.gather(*input_coroutines, return_exceptions=True)
return res
if __name__ == '__main__':
start_time = time.time()
res1, res2 ,res3,res4 = asyncio.get_event_loop().run_until_complete(multiple_tasks())
print(f'used time:time.time()-start_time')
输出是:
used time:9.295843601226807
最后我尝试了 ProcessPoolExecutor:
import time
from concurrent.futures import ProcessPoolExecutor
def data_handler(urls):
for i in range(urls[0], urls[1]):
print('validate')
print('parsing')
print('inserting')
def run():
urls = [(1,100000),(100001,200000),(2000001,300000),(300001,400000)]
with ProcessPoolExecutor() as excute:
excute.map(data_handler,urls)
if __name__ == '__main__':
start_time = time.time()
run()
stop_time = time.time()
print('used time %s' % (stop_time - start_time))
输出是:
used time 12.726619243621826
那么我怎样才能加快这个过程呢?我想我走错路了。有朋友可以帮忙吗?最好的问候!
【问题讨论】:
新线程或进程需要一些时间来创建它,对于小数据最终可能需要更多时间。它对更大的数据很有用。使用线程也可能是一个问题,Python 有GIL
,它一次只运行一个线程(不是同时运行),线程只能用于使用input-output
的问题,比如对网页或文件的请求。 asyncio
可能会出现类似的问题,它以类似的方式工作,并且仅适用于像 asyncio.sleep
这样的异步函数或从文件或网络读取数据的函数。对于print()
,它不会给出结果。
最好在使用 url 从服务器获取数据而不是使用 print()
的真实代码上尝试它 - 它可能会更快地并行工作。你可以使用httpx,它使用asyncio
嗨@furas 非常感谢您的回复,在我使用了一些我发现的实际功能后,ProcessPoolExecutor 和线程都可以工作,节省了一半的时间。非常感谢!
【参考方案1】:
好的,你注意到了什么:
No parallelism 9.545064210891724
asyncio 9.295843601226807
multithreading 12.726619243621826
Thread Pool 23.058093309402466
首先,Asyncio 实际上并不使用线程,如果您能猜到,性能依赖于一些 I/O。 Asyncio 在一个循环中的任务之间交替,每当遇到await
时就会切换。如果不使用await
,它将最终一次运行每个任务,并且根本不切换。
使用线程,由于Global Interpreter Lock,只有一个线程能够控制 Python 解释器。你在这里最终得到的是来自不同线程的一堆争用,它们都试图同时工作。这种上下文切换会降低您的应用程序的速度。与 asyncio 类似,如果您想在等待某些 I/O 时安排其他工作,您实际上只能获得这些加速。
好的,所以现在肯定多处理案例应该运行得更快.. 对吗?好吧,每个进程都有自己的解释器锁,但是,阻塞在您的print
语句中。每个进程都被阻止尝试将其输出发送到同一个控制台管道。让我举个例子。
假设我们有一个方法要运行 4 次。一次串行一次并行
def run(thread):
print(f"Starting thread: thread")
for i in range(500000):
print('foobar')
print(f"Finished thread: thread")
def run_singlethreaded():
start_time = time.time()
for thread in ["local"] * 4:
run(thread)
stop_time = time.time()
return stop_time - start_time
def run_multiprocessing():
start_time = time.time()
with ProcessPoolExecutor(max_workers=4) as ex:
ex.map(run, ["mp0", "mp1", "mp2", "mp3"])
stop_time = time.time()
return stop_time - start_time
if __name__ == '__main__':
singlethreaded_time = run_singlethreaded()
multiprocessing_time = run_multiprocessing()
print(f"Finished singlethreaded in: singlethreaded_time")
print(f"Finished multiprocessing in: multiprocessing_time")
如果我们运行此程序并打印您会惊讶地看到的时间:
Finished singlethreaded in: 10.513998746871948
Finished multiprocessing in: 12.252000570297241
现在,如果我们将打印更改为更简单的内容,不会导致 IO 瓶颈:
def run(thread):
print(f"Starting thread: thread")
for i in range(100000000):
pass
print(f"Finished thread: thread")
您将获得预期的并行速度:
Finished singlethreaded in: 9.816999435424805
Finished multiprocessing in: 2.503000020980835
这里重要的一点是,在并行性可以帮助您之前,您需要了解您的资源在哪里受限。在 IO 绑定应用程序的情况下,线程或异步可能会有所帮助。在 CPU 密集型应用程序的情况下,多处理可能很有用。在其他时候,两者都不会真正帮助您(例如print
语句),因为瓶颈存在于应用程序外部的系统中。希望这会有所帮助!
【讨论】:
嗨@flakes 我真的很感谢你的回答,在我使用了一些我发现的真实功能之后,ProcessPoolExecutor 和线程都可以工作,节省了一半的时间。非常感谢! @William 很高兴为您提供帮助。作为一般规则,我更喜欢 asyncio 而不是多线程而不是多处理。多处理是一种更复杂的机制,对于非 CPU 绑定的应用程序通常会更慢。多处理还使共享数据更加复杂,因为每个进程都有独立的内存。 Asyncio 优于多线程,因为没有上下文切换。在许多情况下,Asyncio 可以更快且更简单,但缺点是您需要使用实现异步接口的 IO 绑定方法,但情况并非总是如此,尤其是对于某些第三方库。以上是关于哪个是在 Python 中并行运行多个任务的最佳方式的主要内容,如果未能解决你的问题,请参考以下文章
使用不同的参数并行运行相同的函数,并知道哪个并行运行在 python 中结束了