如何并行化一个简单的 Python 循环?
Posted
技术标签:
【中文标题】如何并行化一个简单的 Python 循环?【英文标题】:How do I parallelize a simple Python loop? 【发布时间】:2012-04-04 21:05:28 【问题描述】:这可能是一个微不足道的问题,但是如何在 python 中并行化以下循环?
# setup output lists
output1 = list()
output2 = list()
output3 = list()
for j in range(0, 10):
# calc individual parameter value
parameter = j * offset
# call the calculation
out1, out2, out3 = calc_stuff(parameter = parameter)
# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)
我知道如何在 Python 中启动单线程,但我不知道如何“收集”结果。
多个进程也可以 - 在这种情况下最简单的。我目前使用的是 Linux,但代码也应该在 Windows 和 Mac 上运行。
并行化此代码的最简单方法是什么?
【问题讨论】:
一个非常简单的并行化for
循环的解决方案尚未被提及作为答案 - 这将通过使用 deco
包简单地装饰两个函数
【参考方案1】:
由于全局解释器锁 (GIL),在 CPython 上使用多个线程不会为纯 Python 代码提供更好的性能。我建议改用multiprocessing
模块:
pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))
请注意,这在交互式解释器中不起作用。
为了避免 GIL 周围的常见 FUD:无论如何,在这个示例中使用线程不会有任何优势。您想要在这里使用进程,而不是线程,因为它们可以避免一大堆问题。
【讨论】:
既然这是选择的答案,是否有可能有一个更全面的例子?calc_stuff
的论据是什么?
@EduardoPignatelli 请阅读multiprocessing
模块的文档以获得更全面的示例。 Pool.map()
基本上像 map()
一样工作,但是是并行的。
有没有办法简单地在这个代码结构中添加一个 tqdm 加载栏?我使用了 tqdm(pool.imap(calc_stuff, range(0, 10 * offset, offset))) 但我没有得到完整的加载条图形。
@user8188120 我以前从未听说过 tqdm,很抱歉,我无能为力。
为了避免其他人落入我刚刚做的陷阱 - 池的实例化和 pool.map
的调用需要在函数内:***.com/questions/32995897/…【参考方案2】:
from joblib import Parallel, delayed
def process(i):
return i * i
results = Parallel(n_jobs=2)(delayed(process)(i) for i in range(10))
print(results) # prints [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
以上在我的机器上运行良好(Ubuntu,包 joblib 已预先安装,但可以通过 pip install joblib
安装)。
取自https://blog.dominodatalab.com/simple-parallelization/
2021 年 3 月 31 日编辑:joblib
、multiprocessing
、threading
和 asyncio
上面代码中的joblib
在后台使用import multiprocessing
(因此是多个进程,这通常是跨内核运行 CPU 工作的最佳方式 - 因为 GIL)
您可以让joblib
使用多个线程而不是多个进程,但这(或直接使用import threading
)只有在线程在 I/O 上花费大量时间(例如读/写磁盘、发送HTTP 请求)。对于 I/O 工作,GIL 不会阻塞另一个线程的执行
从 Python 3.7 开始,作为 threading
的替代方案,您可以使用 asyncio 并行工作,但同样的建议适用于 import threading
(尽管与后者相比,仅使用 1 个线程;在另外,asyncio
有很多不错的功能,对异步编程很有帮助)
使用多个进程会产生开销。想一想:通常,每个进程都需要初始化/加载运行计算所需的一切。您需要检查自己是否上述代码 sn-p 提高了您的挂墙时间。这是另一个,我确认joblib
产生更好的结果:
import time
from joblib import Parallel, delayed
def countdown(n):
while n>0:
n -= 1
return n
t = time.time()
for _ in range(20):
print(countdown(10**7), end=" ")
print(time.time() - t)
# takes ~10.5 seconds on medium sized Macbook Pro
t = time.time()
results = Parallel(n_jobs=2)(delayed(countdown)(10**7) for _ in range(20))
print(results)
print(time.time() - t)
# takes ~6.3 seconds on medium sized Macbook Pro
【讨论】:
我尝试了您的代码,但在我的系统上,此代码的顺序版本大约需要半分钟,而上述并行版本需要 4 分钟。为什么会这样? 感谢您的回答!我认为这是 2019 年最优雅的方式。 @tyrex 感谢分享!这个 joblib 包很棒,这个例子对我有用。不过,不幸的是,在更复杂的情况下,我遇到了一个错误。 github.com/joblib/joblib/issues/949 @shaifaliGupta 我认为这实际上取决于您的函数 processInput 每个样本需要多长时间。如果每个 i 的时间都很短,您将看不到任何改进。我实际上尝试了代码,看看函数 processInput 是否需要很少的时间,然后 for-loops 实际上执行得更好。但是,如果您的函数 processInput 需要很长时间才能运行。使用这种并行方法要优越得多。 这行得通,但对于任何试图在 windows 上使用它并通过 jupyter notebook 显示输出的人,你会在这里遇到问题***.com/questions/55955330/…【参考方案3】:为了并行化一个简单的 for 循环,joblib 为原始使用多处理带来了很多价值。不仅是简短的语法,而且还包括在迭代非常快时(以消除开销)或捕获子进程的回溯,以提供更好的错误报告。
免责声明:我是joblib的原作者。
【讨论】:
我用 jupyter 尝试了 joblib,它不起作用。在并行延迟调用之后,页面停止工作。 您好,我在使用 joblib (***.com/questions/52166572/…) 时遇到问题,您知道可能是什么原因吗?非常感谢。 好像我想试一试!是否可以将它与双循环一起使用,例如 for i in range(10): for j in range(20)【参考方案4】:并行化此代码的最简单方法是什么?
使用来自concurrent.futures
的 PoolExecutor。将原始代码与此并排比较。首先,最简洁的方法是使用executor.map
:
...
with ProcessPoolExecutor() as executor:
for out1, out2, out3 in executor.map(calc_stuff, parameters):
...
或通过单独提交每个调用来分解:
...
with ThreadPoolExecutor() as executor:
futures = []
for parameter in parameters:
futures.append(executor.submit(calc_stuff, parameter))
for future in futures:
out1, out2, out3 = future.result() # this will block
...
离开上下文向执行者发出释放资源的信号
您可以使用线程或进程并使用完全相同的接口。
一个工作示例
这里是工作示例代码,它将展示 :
把它放在一个文件中——futuretest.py:
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from time import time
from http.client import HTTPSConnection
def processor_intensive(arg):
def fib(n): # recursive, processor intensive calculation (avoid n > 36)
return fib(n-1) + fib(n-2) if n > 1 else n
start = time()
result = fib(arg)
return time() - start, result
def io_bound(arg):
start = time()
con = HTTPSConnection(arg)
con.request('GET', '/')
result = con.getresponse().getcode()
return time() - start, result
def manager(PoolExecutor, calc_stuff):
if calc_stuff is io_bound:
inputs = ('python.org', '***.com', 'stackexchange.com',
'noaa.gov', 'parler.com', 'aaronhall.dev')
else:
inputs = range(25, 32)
timings, results = list(), list()
start = time()
with PoolExecutor() as executor:
for timing, result in executor.map(calc_stuff, inputs):
# put results into correct output list:
timings.append(timing), results.append(result)
finish = time()
print(f'calc_stuff.__name__, PoolExecutor.__name__')
print(f'wall time to execute: finish-start')
print(f'total of timings for each call: sum(timings)')
print(f'time saved by parallelizing: sum(timings) - (finish-start)')
print(dict(zip(inputs, results)), end = '\n\n')
def main():
for computation in (processor_intensive, io_bound):
for pool_executor in (ProcessPoolExecutor, ThreadPoolExecutor):
manager(pool_executor, calc_stuff=computation)
if __name__ == '__main__':
main()
这是python -m futuretest
的一次运行的输出:
processor_intensive, ProcessPoolExecutor
wall time to execute: 0.7326343059539795
total of timings for each call: 1.8033506870269775
time saved by parallelizing: 1.070716381072998
25: 75025, 26: 121393, 27: 196418, 28: 317811, 29: 514229, 30: 832040, 31: 1346269
processor_intensive, ThreadPoolExecutor
wall time to execute: 1.190223217010498
total of timings for each call: 3.3561410903930664
time saved by parallelizing: 2.1659178733825684
25: 75025, 26: 121393, 27: 196418, 28: 317811, 29: 514229, 30: 832040, 31: 1346269
io_bound, ProcessPoolExecutor
wall time to execute: 0.533886194229126
total of timings for each call: 1.2977914810180664
time saved by parallelizing: 0.7639052867889404
'python.org': 301, '***.com': 200, 'stackexchange.com': 200, 'noaa.gov': 301, 'parler.com': 200, 'aaronhall.dev': 200
io_bound, ThreadPoolExecutor
wall time to execute: 0.38941240310668945
total of timings for each call: 1.6049387454986572
time saved by parallelizing: 1.2155263423919678
'python.org': 301, '***.com': 200, 'stackexchange.com': 200, 'noaa.gov': 301, 'parler.com': 200, 'aaronhall.dev': 200
处理器密集型分析
在 Python 中执行处理器密集型计算时,预计 ProcessPoolExecutor
比 ThreadPoolExecutor
的性能更高。
由于 Global Interpreter Lock(又名 GIL),线程不能使用多个处理器,因此预计每次计算的时间和 wall time(经过的实时)会更长。
IO 绑定分析
另一方面,在执行 IO 绑定操作时,期望 ThreadPoolExecutor
比 ProcessPoolExecutor
性能更高。
Python 的线程是真实的,操作系统,线程。操作系统可以让它们进入睡眠状态,并在它们的信息到达时重新唤醒它们。
最后的想法
我怀疑多处理在 Windows 上会更慢,因为 Windows 不支持分叉,所以每个新进程都需要时间来启动。
您可以在多个进程中嵌套多个线程,但建议不要使用多个线程来拆分多个进程。
如果在 Python 中遇到繁重的处理问题,您可以通过额外的进程轻松扩展 - 但使用线程就不行了。
【讨论】:
ThreadPoolExecutor 是否绕过了 GIL 的限制?您也不需要 join() 来等待执行程序完成,或者这是否在上下文管理器中隐式处理 不,不,是“隐式处理” 出于某种原因,当扩大问题规模时,多线程处理速度非常快,但多处理会产生一堆卡住的进程(在 macOS 中)。知道为什么会这样吗?该过程仅包含嵌套循环和数学,没有任何异国情调。 @komodovaran_ 一个进程是一个完整的 Python 进程,每个进程一个,而一个线程只是一个执行线程,它有自己的堆栈,共享进程、它的字节码和它在内存中的所有其他内容所有其他线程 - 这有帮助吗? 感谢您提供了一个完整的示例【参考方案5】:这是最简单的方法!
您可以使用 asyncio。 (可以找到文档here)。它被用作多个 Python 异步框架的基础,提供高性能网络和 Web 服务器、数据库连接库、分布式任务队列等。此外,它具有高级和低级 API 以适应任何类型的问题.
import asyncio
def background(f):
def wrapped(*args, **kwargs):
return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)
return wrapped
@background
def your_function(argument):
#code
现在这个函数将在调用时并行运行,而不会使主程序进入等待状态。您也可以使用它来并行化 for 循环。当调用 for 循环时,虽然循环是顺序的,但每次迭代都会在解释器到达主程序时并行运行。 例如:
@background
def your_function(argument):
time.sleep(5)
print('function finished for '+str(argument))
for i in range(10):
your_function(i)
print('loop finished')
这会产生以下输出:
loop finished
function finished for 4
function finished for 8
function finished for 0
function finished for 3
function finished for 6
function finished for 2
function finished for 5
function finished for 7
function finished for 9
function finished for 1
【讨论】:
谢谢!我同意这是最简单的方法 想象你在 your_function() 中有不同的打印,有没有办法强制它执行所有打印然后传递到 for 循环中的下一个 i? 很好的例子,有没有办法在最终打印之前等待 -print('loop finished')
你有没有找到最后打印“循环完成”的方法?
请注意,使用asyncio
完全是浪费。 asyncio
的重点是高效地运行异步 (async
/await
) 代码,对于其他一切它只会增加开销。 .run_in_executor(None, ...)
只是封装了一个concurrent.futures
线程池,也可以直接使用。【参考方案6】:
使用Ray 有很多好处:
除了多核之外,您还可以在多台机器上进行并行处理(使用相同的代码)。 通过共享内存(和零拷贝序列化)有效处理数值数据。 分布式调度的高任务吞吐量。 容错。在你的情况下,你可以启动 Ray 并定义一个远程函数
import ray
ray.init()
@ray.remote(num_return_vals=3)
def calc_stuff(parameter=None):
# Do something.
return 1, 2, 3
然后并行调用它
output1, output2, output3 = [], [], []
# Launch the tasks.
for j in range(10):
id1, id2, id3 = calc_stuff.remote(parameter=j)
output1.append(id1)
output2.append(id2)
output3.append(id3)
# Block until the results have finished and get the results.
output1 = ray.get(output1)
output2 = ray.get(output2)
output3 = ray.get(output3)
要在集群上运行相同的示例,唯一会改变的行是对 ray.init() 的调用。相关文档可以在here找到。
请注意,我正在帮助开发 Ray。
【讨论】:
对于任何考虑使用 ray 的人来说,知道它本身并不支持 Windows 可能是相关的。使用 WSL(适用于 Linux 的 Windows 子系统)的一些 hack 让它在 Windows 中工作是可能的,但如果你想使用 Windows,它几乎不是开箱即用的。 遗憾的是它还不支持 Python 3.9。【参考方案7】:我发现joblib
对我很有用。请看下面的例子:
from joblib import Parallel, delayed
def yourfunction(k):
s=3.14*k*k
print "Area of a circle with a radius ", k, " is:", s
element_run = Parallel(n_jobs=-1)(delayed(yourfunction)(k) for k in range(1,10))
n_jobs=-1:使用所有可用内核
【讨论】:
您知道,最好在发布自己的答案之前检查已经存在的答案。 This answer 也建议使用joblib
。【参考方案8】:
为什么不使用线程和一个互斥锁来保护一个全局列表?
import os
import re
import time
import sys
import thread
from threading import Thread
class thread_it(Thread):
def __init__ (self,param):
Thread.__init__(self)
self.param = param
def run(self):
mutex.acquire()
output.append(calc_stuff(self.param))
mutex.release()
threads = []
output = []
mutex = thread.allocate_lock()
for j in range(0, 10):
current = thread_it(j * offset)
threads.append(current)
current.start()
for t in threads:
t.join()
#here you have output list filled with data
请记住,您将与最慢的线程一样快
【讨论】:
我知道这是一个非常古老的答案,所以突然得到一个随机的反对票是很糟糕的。我只投了反对票,因为线程不会并行化任何东西。由于全局解释器锁,Python 中的线程一次只能绑定到一个在解释器上执行的线程,因此它们支持concurrent programming, but not parallel OP 请求。 @skrrgwasme 我知道你知道这一点,但是当你使用“它们不会并行化任何东西”这个词时,这可能会误导读者。如果操作需要很长时间,因为它们是 IO 绑定的,或者在等待事件时处于休眠状态,那么解释器就会被释放以运行其他线程,因此这将导致人们在这些情况下所希望的速度提高。只有 CPU 绑定线程真正受到 skrrgwasme 所说的影响。【参考方案9】:感谢@iuryxavier
from multiprocessing import Pool
from multiprocessing import cpu_count
def add_1(x):
return x + 1
if __name__ == "__main__":
pool = Pool(cpu_count())
results = pool.map(add_1, range(10**12))
pool.close() # 'TERM'
pool.join() # 'KILL'
【讨论】:
-1。这是一个仅限代码的答案。我建议添加一个解释,告诉读者您发布的代码是做什么的,也许他们可以在哪里找到更多信息。【参考方案10】:短线期货;我很惊讶还没有人提到它。 . .
from dask.distributed import Client
client = Client(n_workers=8) # In this example I have 8 cores and processes (can also use threads if desired)
def my_function(i):
output = <code to execute in the for loop here>
return output
futures = []
for i in <whatever you want to loop across here>:
future = client.submit(my_function, i)
futures.append(future)
results = client.gather(futures)
client.close()
【讨论】:
【参考方案11】:假设我们有一个异步函数
async def work_async(self, student_name: str, code: str, loop):
"""
Some async function
"""
# Do some async procesing
这需要在大型阵列上运行。一些属性被传递给程序,一些被数组中字典元素的属性使用。
async def process_students(self, student_name: str, loop):
market = sys.argv[2]
subjects = [...] #Some large array
batchsize = 5
for i in range(0, len(subjects), batchsize):
batch = subjects[i:i+batchsize]
await asyncio.gather(*(self.work_async(student_name,
sub['Code'],
loop)
for sub in batch))
【讨论】:
【参考方案12】:这在用 Python 实现多处理和并行/分布式计算时可能很有用。
YouTube tutorial on using techila package
Techila 是一个分布式计算中间件,它使用 techila 包直接与 Python 集成。包中的 peach 函数可用于并行化循环结构。 (以下代码sn-p来自Techila Community Forums)
techila.peach(funcname = 'theheavyalgorithm', # Function that will be called on the compute nodes/ Workers
files = 'theheavyalgorithm.py', # Python-file that will be sourced on Workers
jobs = jobcount # Number of Jobs in the Project
)
【讨论】:
虽然此链接可能会回答问题,但最好在此处包含答案的基本部分并提供链接以供参考。如果链接页面发生更改,仅链接的答案可能会失效。 @S.L.Barth 感谢您的反馈。我在答案中添加了一个小示例代码。【参考方案13】:tqdm library 的 concurrent 包装器是并行化长时间运行代码的好方法。 tqdm 通过智能进度表提供当前进度和剩余时间的反馈,我发现这对于长时间计算非常有用。
循环可以重写为通过简单调用thread_map
作为并发线程运行,或通过简单调用process_map
作为并发多进程运行:
from tqdm.contrib.concurrent import thread_map, process_map
def calc_stuff(num, multiplier):
import time
time.sleep(1)
return num, num * multiplier
if __name__ == "__main__":
# let's parallelize this for loop:
# results = [calc_stuff(i, 2) for i in range(64)]
loop_idx = range(64)
multiplier = [2] * len(loop_idx)
# either with threading:
results_threading = thread_map(calc_stuff, loop_idx, multiplier)
# or with multi-processing:
results_processes = process_map(calc_stuff, loop_idx, multiplier)
【讨论】:
【参考方案14】:看看这个;
http://docs.python.org/library/queue.html
这可能不是正确的方法,但我会做类似的事情;
实际代码;
from multiprocessing import Process, JoinableQueue as Queue
class CustomWorker(Process):
def __init__(self,workQueue, out1,out2,out3):
Process.__init__(self)
self.input=workQueue
self.out1=out1
self.out2=out2
self.out3=out3
def run(self):
while True:
try:
value = self.input.get()
#value modifier
temp1,temp2,temp3 = self.calc_stuff(value)
self.out1.put(temp1)
self.out2.put(temp2)
self.out3.put(temp3)
self.input.task_done()
except Queue.Empty:
return
#Catch things better here
def calc_stuff(self,param):
out1 = param * 2
out2 = param * 4
out3 = param * 8
return out1,out2,out3
def Main():
inputQueue = Queue()
for i in range(10):
inputQueue.put(i)
out1 = Queue()
out2 = Queue()
out3 = Queue()
processes = []
for x in range(2):
p = CustomWorker(inputQueue,out1,out2,out3)
p.daemon = True
p.start()
processes.append(p)
inputQueue.join()
while(not out1.empty()):
print out1.get()
print out2.get()
print out3.get()
if __name__ == '__main__':
Main()
希望对您有所帮助。
【讨论】:
【参考方案15】:并行处理的一个非常简单的例子是
from multiprocessing import Process
output1 = list()
output2 = list()
output3 = list()
def yourfunction():
for j in range(0, 10):
# calc individual parameter value
parameter = j * offset
# call the calculation
out1, out2, out3 = calc_stuff(parameter=parameter)
# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)
if __name__ == '__main__':
p = Process(target=pa.yourfunction, args=('bob',))
p.start()
p.join()
【讨论】:
这里的for循环没有并行性,你只是产生了一个运行整个循环的进程;这不是 OP 的意图。以上是关于如何并行化一个简单的 Python 循环?的主要内容,如果未能解决你的问题,请参考以下文章
如何并行化具有多个参数的简单 python def [重复]
使用 CUDA 在 python 中展开一个可并行化的 for 循环