如何并行化一个简单的 Python 循环?

Posted

技术标签:

【中文标题】如何并行化一个简单的 Python 循环?【英文标题】:How do I parallelize a simple Python loop? 【发布时间】:2012-04-04 21:05:28 【问题描述】:

这可能是一个微不足道的问题,但是如何在 python 中并行化以下循环?

# setup output lists
output1 = list()
output2 = list()
output3 = list()

for j in range(0, 10):
    # calc individual parameter value
    parameter = j * offset
    # call the calculation
    out1, out2, out3 = calc_stuff(parameter = parameter)

    # put results into correct output list
    output1.append(out1)
    output2.append(out2)
    output3.append(out3)

我知道如何在 Python 中启动单线程,但我不知道如何“收集”结果。

多个进程也可以 - 在这种情况下最简单的。我目前使用的是 Linux,但代码也应该在 Windows 和 Mac 上运行。

并行化此代码的最简单方法是什么?

【问题讨论】:

一个非常简单的并行化 for 循环的解决方案尚未被提及作为答案 - 这将通过使用 deco 包简单地装饰两个函数 【参考方案1】:

由于全局解释器锁 (GIL),在 CPython 上使用多个线程不会为纯 Python 代码提供更好的性能。我建议改用multiprocessing 模块:

pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))

请注意,这在交互式解释器中不起作用。

为了避免 GIL 周围的常见 FUD:无论如何,在这个示例中使用线程不会有任何优势。您想要在这里使用进程,而不是线程,因为它们可以避免一大堆问题。

【讨论】:

既然这是选择的答案,是否有可能有一个更全面的例子? calc_stuff 的论据是什么? @EduardoPignatelli 请阅读multiprocessing 模块的文档以获得更全面的示例。 Pool.map() 基本上像 map() 一样工作,但是是并行的。 有没有办法简单地在这个代码结构中添加一个 tqdm 加载栏?我使用了 tqdm(pool.imap(calc_stuff, range(0, 10 * offset, offset))) 但我没有得到完整的加载条图形。 @user8188120 我以前从未听说过 tqdm,很抱歉,我无能为力。 为了避免其他人落入我刚刚做的陷阱 - 池的实例化和 pool.map 的调用需要在函数内:***.com/questions/32995897/…【参考方案2】:
from joblib import Parallel, delayed
def process(i):
    return i * i
    
results = Parallel(n_jobs=2)(delayed(process)(i) for i in range(10))
print(results)  # prints [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

以上在我的机器上运行良好(Ubuntu,包 joblib 已预先安装,但可以通过 pip install joblib 安装)。

取自https://blog.dominodatalab.com/simple-parallelization/


2021 年 3 月 31 日编辑:joblibmultiprocessingthreadingasyncio

上面代码中的joblib 在后台使用import multiprocessing(因此是多个进程,这通常是跨内核运行 CPU 工作的最佳方式 - 因为 GIL) 您可以让joblib 使用多个线程而不是多个进程,但这(或直接使用import threading)只有在线程在 I/O 上花费大量时间(例如读/写磁盘、发送HTTP 请求)。对于 I/O 工作,GIL 不会阻塞另一个线程的执行 从 Python 3.7 开始,作为 threading 的替代方案,您可以使用 asyncio 并行工作,但同样的建议适用于 import threading(尽管与后者相比,仅使用 1 个线程;在另外,asyncio 有很多不错的功能,对异步编程很有帮助) 使用多个进程会产生开销。想一想:通常,每个进程都需要初始化/加载运行计算所需的一切。您需要检查自己是否上述代码 sn-p 提高了您的挂墙时间。这是另一个,我确认joblib 产生更好的结果:
import time
from joblib import Parallel, delayed

def countdown(n):
    while n>0:
        n -= 1
    return n


t = time.time()
for _ in range(20):
    print(countdown(10**7), end=" ")
print(time.time() - t)  
# takes ~10.5 seconds on medium sized Macbook Pro


t = time.time()
results = Parallel(n_jobs=2)(delayed(countdown)(10**7) for _ in range(20))
print(results)
print(time.time() - t)
# takes ~6.3 seconds on medium sized Macbook Pro

【讨论】:

我尝试了您的代码,但在我的系统上,此代码的顺序版本大约需要半分钟,而上述并行版本需要 4 分钟。为什么会这样? 感谢您的回答!我认为这是 2019 年最优雅的方式。 @tyrex 感谢分享!这个 joblib 包很棒,这个例子对我有用。不过,不幸的是,在更复杂的情况下,我遇到了一个错误。 github.com/joblib/joblib/issues/949 @shaifaliGupta 我认为这实际上取决于您的函数 processInput 每个样本需要多长时间。如果每个 i 的时间都很短,您将看不到任何改进。我实际上尝试了代码,看看函数 processInput 是否需要很少的时间,然后 for-loops 实际上执行得更好。但是,如果您的函数 processInput 需要很长时间才能运行。使用这种并行方法要优越得多。 这行得通,但对于任何试图在 windows 上使用它并通过 jupyter notebook 显示输出的人,你会在这里遇到问题***.com/questions/55955330/…【参考方案3】:

为了并行化一个简单的 for 循环,joblib 为原始使用多处理带来了很多价值。不仅是简短的语法,而且还包括在迭代非常快时(以消除开销)或捕获子进程的回溯,以提供更好的错误报告。

免责声明:我是joblib的原作者。

【讨论】:

我用 jupyter 尝试了 joblib,它不起作用。在并行延迟调用之后,页面停止工作。 您好,我在使用 joblib (***.com/questions/52166572/…) 时遇到问题,您知道可能是什么原因吗?非常感谢。 好像我想试一试!是否可以将它与双循环一起使用,例如 for i in range(10): for j in range(20)【参考方案4】:

并行化此代码的最简单方法是什么?

使用来自concurrent.futures 的 PoolExecutor。将原始代码与此并排比较。首先,最简洁的方法是使用executor.map

...
with ProcessPoolExecutor() as executor:
    for out1, out2, out3 in executor.map(calc_stuff, parameters):
        ...

或通过单独提交每个调用来分解:

...
with ThreadPoolExecutor() as executor:
    futures = []
    for parameter in parameters:
        futures.append(executor.submit(calc_stuff, parameter))

    for future in futures:
        out1, out2, out3 = future.result() # this will block
        ...

离开上下文向执行者发出释放资源的信号

您可以使用线程或进程并使用完全相同的接口。

一个工作示例

这里是工作示例代码,它将展示 :

把它放在一个文件中——futuretest.py:

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from time import time
from http.client import HTTPSConnection

def processor_intensive(arg):
    def fib(n): # recursive, processor intensive calculation (avoid n > 36)
        return fib(n-1) + fib(n-2) if n > 1 else n
    start = time()
    result = fib(arg)
    return time() - start, result

def io_bound(arg):
    start = time()
    con = HTTPSConnection(arg)
    con.request('GET', '/')
    result = con.getresponse().getcode()
    return time() - start, result

def manager(PoolExecutor, calc_stuff):
    if calc_stuff is io_bound:
        inputs = ('python.org', '***.com', 'stackexchange.com',
                  'noaa.gov', 'parler.com', 'aaronhall.dev')
    else:
        inputs = range(25, 32)
    timings, results = list(), list()
    start = time()
    with PoolExecutor() as executor:
        for timing, result in executor.map(calc_stuff, inputs):
            # put results into correct output list:
            timings.append(timing), results.append(result)
    finish = time()
    print(f'calc_stuff.__name__, PoolExecutor.__name__')
    print(f'wall time to execute: finish-start')
    print(f'total of timings for each call: sum(timings)')
    print(f'time saved by parallelizing: sum(timings) - (finish-start)')
    print(dict(zip(inputs, results)), end = '\n\n')

def main():
    for computation in (processor_intensive, io_bound):
        for pool_executor in (ProcessPoolExecutor, ThreadPoolExecutor):
            manager(pool_executor, calc_stuff=computation)

if __name__ == '__main__':
    main()

这是python -m futuretest 的一次运行的输出:

processor_intensive, ProcessPoolExecutor
wall time to execute: 0.7326343059539795
total of timings for each call: 1.8033506870269775
time saved by parallelizing: 1.070716381072998
25: 75025, 26: 121393, 27: 196418, 28: 317811, 29: 514229, 30: 832040, 31: 1346269

processor_intensive, ThreadPoolExecutor
wall time to execute: 1.190223217010498
total of timings for each call: 3.3561410903930664
time saved by parallelizing: 2.1659178733825684
25: 75025, 26: 121393, 27: 196418, 28: 317811, 29: 514229, 30: 832040, 31: 1346269

io_bound, ProcessPoolExecutor
wall time to execute: 0.533886194229126
total of timings for each call: 1.2977914810180664
time saved by parallelizing: 0.7639052867889404
'python.org': 301, '***.com': 200, 'stackexchange.com': 200, 'noaa.gov': 301, 'parler.com': 200, 'aaronhall.dev': 200

io_bound, ThreadPoolExecutor
wall time to execute: 0.38941240310668945
total of timings for each call: 1.6049387454986572
time saved by parallelizing: 1.2155263423919678
'python.org': 301, '***.com': 200, 'stackexchange.com': 200, 'noaa.gov': 301, 'parler.com': 200, 'aaronhall.dev': 200

处理器密集型分析

在 Python 中执行处理器密集型计算时,预计 ProcessPoolExecutorThreadPoolExecutor 的性能更高。

由于 Global Interpreter Lock(又名 GIL),线程不能使用多个处理器,因此预计每次计算的时间和 wall time(经过的实时)会更长。

IO 绑定分析

另一方面,在执行 IO 绑定操作时,期望 ThreadPoolExecutorProcessPoolExecutor 性能更高。

Python 的线程是真实的,操作系统,线程。操作系统可以让它们进入睡眠状态,并在它们的信息到达时重新唤醒它们。

最后的想法

我怀疑多处理在 Windows 上会更慢,因为 Windows 不支持分叉,所以每个新进程都需要时间来启动。

您可以在多个进程中嵌套多个线程,但建议不要使用多个线程来拆分多个进程。

如果在 Python 中遇到繁重的处理问题,您可以通过额外的进程轻松扩展 - 但使用线程就不行了。

【讨论】:

ThreadPoolExecutor 是否绕过了 GIL 的限制?您也不需要 join() 来等待执行程序完成,或者这是否在上下文管理器中隐式处理 不,不,是“隐式处理” 出于某种原因,当扩大问题规模时,多线程处理速度非常快,但多处理会产生一堆卡住的进程(在 macOS 中)。知道为什么会这样吗?该过程仅包含嵌套循环和数学,没有任何异国情调。 @komodovaran_ 一个进程是一个完整的 Python 进程,每个进程一个,而一个线程只是一个执行线程,它有自己的堆栈,共享进程、它的字节码和它在内存中的所有其他内容所有其他线程 - 这有帮助吗? 感谢您提供了一个完整的示例【参考方案5】:

这是最简单的方法!

您可以使用 asyncio。 (可以找到文档here)。它被用作多个 Python 异步框架的基础,提供高性能网络和 Web 服务器、数据库连接库、分布式任务队列等。此外,它具有高级和低级 API 以适应任何类型的问题.

import asyncio

def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)

    return wrapped

@background
def your_function(argument):
    #code

现在这个函数将在调用时并行运行,而不会使主程序进入等待状态。您也可以使用它来并行化 for 循环。当调用 for 循环时,虽然循环是顺序的,但每次迭代都会在解释器到达主程序时并行运行。 例如:

@background
def your_function(argument):
    time.sleep(5)
    print('function finished for '+str(argument))


for i in range(10):
    your_function(i)


print('loop finished')

这会产生以下输出:

loop finished
function finished for 4
function finished for 8
function finished for 0
function finished for 3
function finished for 6
function finished for 2
function finished for 5
function finished for 7
function finished for 9
function finished for 1

【讨论】:

谢谢!我同意这是最简单的方法 想象你在 your_function() 中有不同的打印,有没有办法强制它执行所有打印然后传递到 for 循环中的下一个 i? 很好的例子,有没有办法在最终打印之前等待 - print('loop finished') 你有没有找到最后打印“循环完成”的方法? 请注意,使用asyncio 完全是浪费。 asyncio 的重点是高效地运行异步 (async/await) 代码,对于其他一切它只会增加开销。 .run_in_executor(None, ...) 只是封装了一个concurrent.futures 线程池,也可以直接使用。【参考方案6】:

使用Ray 有很多好处:

除了多核之外,您还可以在多台机器上进行并行处理(使用相同的代码)。 通过共享内存(和零拷贝序列化)有效处理数值数据。 分布式调度的高任务吞吐量。 容错。

在你的情况下,你可以启动 Ray 并定义一个远程函数

import ray

ray.init()

@ray.remote(num_return_vals=3)
def calc_stuff(parameter=None):
    # Do something.
    return 1, 2, 3

然后并行调用它

output1, output2, output3 = [], [], []

# Launch the tasks.
for j in range(10):
    id1, id2, id3 = calc_stuff.remote(parameter=j)
    output1.append(id1)
    output2.append(id2)
    output3.append(id3)

# Block until the results have finished and get the results.
output1 = ray.get(output1)
output2 = ray.get(output2)
output3 = ray.get(output3)

要在集群上运行相同的示例,唯一会改变的行是对 ray.init() 的调用。相关文档可以在here找到。

请注意,我正在帮助开发 Ray。

【讨论】:

对于任何考虑使用 ray 的人来说,知道它本身并不支持 Windows 可能是相关的。使用 WSL(适用于 Linux 的 Windows 子系统)的一些 hack 让它在 Windows 中工作是可能的,但如果你想使用 Windows,它几乎不是开箱即用的。 遗憾的是它还不支持 Python 3.9。【参考方案7】:

我发现joblib 对我很有用。请看下面的例子:

from joblib import Parallel, delayed
def yourfunction(k):   
    s=3.14*k*k
    print "Area of a circle with a radius ", k, " is:", s

element_run = Parallel(n_jobs=-1)(delayed(yourfunction)(k) for k in range(1,10))

n_jobs=-1:使用所有可用内核

【讨论】:

您知道,最好在发布自己的答案之前检查已经存在的答案。 This answer 也建议使用joblib【参考方案8】:

为什么不使用线程和一个互斥锁来保护一个全局列表?

import os
import re
import time
import sys
import thread

from threading import Thread

class thread_it(Thread):
    def __init__ (self,param):
        Thread.__init__(self)
        self.param = param
    def run(self):
        mutex.acquire()
        output.append(calc_stuff(self.param))
        mutex.release()   


threads = []
output = []
mutex = thread.allocate_lock()

for j in range(0, 10):
    current = thread_it(j * offset)
    threads.append(current)
    current.start()

for t in threads:
    t.join()

#here you have output list filled with data

请记住,您将与最慢的线程一样快

【讨论】:

我知道这是一个非常古老的答案,所以突然得到一个随机的反对票是很糟糕的。我只投了反对票,因为线程不会并行化任何东西。由于全局解释器锁,Python 中的线程一次只能绑定到一个在解释器上执行的线程,因此它们支持concurrent programming, but not parallel OP 请求。 @skrrgwasme 我知道你知道这一点,但是当你使用“它们不会并行化任何东西”这个词时,这可能会误导读者。如果操作需要很长时间,因为它们是 IO 绑定的,或者在等待事件时处于休眠状态,那么解释器就会被释放以运行其他线程,因此这将导致人们在这些情况下所希望的速度提高。只有 CPU 绑定线程真正受到 skrrgwasme 所说的影响。【参考方案9】:

感谢@iuryxavier

from multiprocessing import Pool
from multiprocessing import cpu_count


def add_1(x):
    return x + 1

if __name__ == "__main__":
    pool = Pool(cpu_count())
    results = pool.map(add_1, range(10**12))
    pool.close()  # 'TERM'
    pool.join()   # 'KILL'

【讨论】:

-1。这是一个仅限代码的答案。我建议添加一个解释,告诉读者您发布的代码是做什么的,也许他们可以在哪里找到更多信息。【参考方案10】:

短线期货;我很惊讶还没有人提到它。 . .

from dask.distributed import Client

client = Client(n_workers=8) # In this example I have 8 cores and processes (can also use threads if desired)

def my_function(i):
    output = <code to execute in the for loop here>
    return output

futures = []

for i in <whatever you want to loop across here>:
    future = client.submit(my_function, i)
    futures.append(future)

results = client.gather(futures)
client.close()

【讨论】:

【参考方案11】:

假设我们有一个异步函数

async def work_async(self, student_name: str, code: str, loop):
"""
Some async function
"""
    # Do some async procesing    

这需要在大型阵列上运行。一些属性被传递给程序,一些被数组中字典元素的属性使用。

async def process_students(self, student_name: str, loop):
    market = sys.argv[2]
    subjects = [...] #Some large array
    batchsize = 5
    for i in range(0, len(subjects), batchsize):
        batch = subjects[i:i+batchsize]
        await asyncio.gather(*(self.work_async(student_name,
                                           sub['Code'],
                                           loop)
                       for sub in batch))

【讨论】:

【参考方案12】:

这在用 Python 实现多处理和并行/分布式计算时可能很有用。

YouTube tutorial on using techila package

Techila 是一个分布式计算中间件,它使用 techila 包直接与 Python 集成。包中的 peach 函数可用于并行化循环结构。 (以下代码sn-p来自Techila Community Forums)

techila.peach(funcname = 'theheavyalgorithm', # Function that will be called on the compute nodes/ Workers
    files = 'theheavyalgorithm.py', # Python-file that will be sourced on Workers
    jobs = jobcount # Number of Jobs in the Project
    )

【讨论】:

虽然此链接可能会回答问题,但最好在此处包含答案的基本部分并提供链接以供参考。如果链接页面发生更改,仅链接的答案可能会失效。 @S.L.Barth 感谢您的反馈。我在答案中添加了一个小示例代码。【参考方案13】:

tqdm library 的 concurrent 包装器是并行化长时间运行代码的好方法。 tqdm 通过智能进度表提供当前进度和剩余时间的反馈,我发现这对于长时间计算非常有用。

循环可以重写为通过简单调用thread_map作为并发线程运行,或通过简单调用process_map作为并发多进程运行:

from tqdm.contrib.concurrent import thread_map, process_map


def calc_stuff(num, multiplier):
    import time

    time.sleep(1)

    return num, num * multiplier


if __name__ == "__main__":

    # let's parallelize this for loop:
    # results = [calc_stuff(i, 2) for i in range(64)]

    loop_idx = range(64)
    multiplier = [2] * len(loop_idx)

    # either with threading:
    results_threading = thread_map(calc_stuff, loop_idx, multiplier)

    # or with multi-processing:
    results_processes = process_map(calc_stuff, loop_idx, multiplier)

【讨论】:

【参考方案14】:

看看这个;

http://docs.python.org/library/queue.html

这可能不是正确的方法,但我会做类似的事情;

实际代码;

from multiprocessing import Process, JoinableQueue as Queue 

class CustomWorker(Process):
    def __init__(self,workQueue, out1,out2,out3):
        Process.__init__(self)
        self.input=workQueue
        self.out1=out1
        self.out2=out2
        self.out3=out3
    def run(self):
            while True:
                try:
                    value = self.input.get()
                    #value modifier
                    temp1,temp2,temp3 = self.calc_stuff(value)
                    self.out1.put(temp1)
                    self.out2.put(temp2)
                    self.out3.put(temp3)
                    self.input.task_done()
                except Queue.Empty:
                    return
                   #Catch things better here
    def calc_stuff(self,param):
        out1 = param * 2
        out2 = param * 4
        out3 = param * 8
        return out1,out2,out3
def Main():
    inputQueue = Queue()
    for i in range(10):
        inputQueue.put(i)
    out1 = Queue()
    out2 = Queue()
    out3 = Queue()
    processes = []
    for x in range(2):
          p = CustomWorker(inputQueue,out1,out2,out3)
          p.daemon = True
          p.start()
          processes.append(p)
    inputQueue.join()
    while(not out1.empty()):
        print out1.get()
        print out2.get()
        print out3.get()
if __name__ == '__main__':
    Main()

希望对您有所帮助。

【讨论】:

【参考方案15】:

并行处理的一个非常简单的例子是

from multiprocessing import Process

output1 = list()
output2 = list()
output3 = list()

def yourfunction():
    for j in range(0, 10):
        # calc individual parameter value
        parameter = j * offset
        # call the calculation
        out1, out2, out3 = calc_stuff(parameter=parameter)

        # put results into correct output list
        output1.append(out1)
        output2.append(out2)
        output3.append(out3)

if __name__ == '__main__':
    p = Process(target=pa.yourfunction, args=('bob',))
    p.start()
    p.join()

【讨论】:

这里的for循环没有并行性,你只是产生了一个运行整个循环的进程;这不是 OP 的意图。

以上是关于如何并行化一个简单的 Python 循环?的主要内容,如果未能解决你的问题,请参考以下文章

使用 numpy 数组和共享内存并行化 python 循环

如何并行化具有多个参数的简单 python def [重复]

使用 CUDA 在 python 中展开一个可并行化的 for 循环

在 Python 中并行化四个嵌套循环

如何使用 OpenMP 通过 C++ std::list 并行化 for 循环?

在 Python 中通过线程/核心/节点并行化 for 循环