如何在 Python 中进行并行编程?

Posted

技术标签:

【中文标题】如何在 Python 中进行并行编程?【英文标题】:How to do parallel programming in Python? 【发布时间】:2013-12-31 04:00:11 【问题描述】:

对于C++,我们可以使用OpenMP进行并行编程;但是,OpenMP 不适用于 Python。如果我想并行我的 python 程序的某些部分,我应该怎么做?

代码的结构可以认为是:

solve1(A)
solve2(B)

其中solve1solve2 是两个独立的函数。如何并行而不是顺序运行这种代码以减少运行时间? 代码是:

def solve(Q, G, n):
    i = 0
    tol = 10 ** -4

    while i < 1000:
        inneropt, partition, x = setinner(Q, G, n)
        outeropt = setouter(Q, G, n)

        if (outeropt - inneropt) / (1 + abs(outeropt) + abs(inneropt)) < tol:
            break
            
        node1 = partition[0]
        node2 = partition[1]
    
        G = updateGraph(G, node1, node2)

        if i == 999:
            print "Maximum iteration reaches"
    print inneropt

其中setinnersetouter 是两个独立的函数。这就是我要并行的地方...

【问题讨论】:

看看multiprocessing。注意:Python 的线程不适用于 CPU 密集型任务,仅适用于 I/O 密集型。 @9000 +100 互联网用于提及 CPU 与 I/O 相关任务。 @9000 据我所知,实际上线程根本不适合 CPU 密集型任务!在执行真正的 CPU 密集型任务时,进程是要走的路。 @OmarIthawi:为什么,如果你有很多 CPU 内核(现在像往常一样),线程可以正常工作。然后,您的进程可以运行多个线程并行加载所有这些内核在它们之间隐式共享公共数据(即,没有显式共享内存区域或进程间消息传递)。 @user2134774:嗯,是的,我的第二条评论毫无意义。可能只有发布 GIL 的 C 扩展可以从中受益;例如NumPy 和 Pandas 的一部分就是这样做的。在其他情况下,它是错误的(但我现在无法编辑它)。 【参考方案1】:

CPython 使用全局解释器锁,这使得并行编程比 C++ 更有趣

这个主题有几个有用的例子和挑战描述:

Python Global Interpreter Lock (GIL) workaround on multi-core systems using taskset on Linux?

【讨论】:

您称无法真正同时运行代码“有趣”? :-/【参考方案2】:

您可以使用multiprocessing 模块。对于这种情况,我可能会使用处理池:

from multiprocessing import Pool
pool = Pool()
result1 = pool.apply_async(solve1, [A])    # evaluate "solve1(A)" asynchronously
result2 = pool.apply_async(solve2, [B])    # evaluate "solve2(B)" asynchronously
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)

这将产生可以为您完成通用工作的进程。由于我们没有通过processes,它将为您机器上的每个 CPU 内核生成一个进程。每个 CPU 内核可以同时执行一个进程。

如果您想将列表映射到单个函数,您可以这样做:

args = [A, B]
results = pool.map(solve1, args)

不要使用线程,因为GIL 会锁定对 python 对象的任何操作。

【讨论】:

pool.map 是否也接受字典作为参数?还是只有简单的列表? 只列出我认为的。但是你可以只传入 dict.items() 这将是一个键值元组列表 不幸的是,这以 `unhashable type: 'list'` 错误结束 除了我的最后一条评论:`dict.items()` 工作。出现错误,因为我必须更改对过程功能的变量洞察力的处理。不幸的是,错误消息不是很有帮助......所以:谢谢你的提示。 :-) 这里是什么超时?【参考方案3】:

这可以通过Ray 非常优雅地完成。

要并行化您的示例,您需要使用 @ray.remote 装饰器定义您的函数,然后使用 .remote 调用它们。

import ray

ray.init()

# Define the functions.

@ray.remote
def solve1(a):
    return 1

@ray.remote
def solve2(b):
    return 2

# Start two tasks in the background.
x_id = solve1.remote(0)
y_id = solve2.remote(1)

# Block until the tasks are done and get the results.
x, y = ray.get([x_id, y_id])

与multiprocessing 模块相比,它有许多优点。

    相同的代码将在多核机器和机器集群上运行。 进程通过shared memory and zero-copy serialization高效共享数据。 错误消息传播得很好。

    这些函数调用可以组合在一起,例如,

    @ray.remote
    def f(x):
        return x + 1
    
    x_id = f.remote(1)
    y_id = f.remote(x_id)
    z_id = f.remote(y_id)
    ray.get(z_id)  # returns 4
    
    除了远程调用函数之外,还可以将类远程实例化为actors。

请注意,Ray 是我一直在帮助开发的框架。

【讨论】:

尝试在 python 中安装软件包时,我不断收到一条错误消息,提示“找不到满足要求 ray 的版本(来自版本:)找不到 ray 的匹配分发” 一般这种错误表示需要升级pip。我建议尝试pip install --upgrade pip。如果您需要使用sudo,那么您用于安装raypip 版本可能与升级的版本不同。您可以通过pip --version 查询。此外,Windows 当前不受支持,因此如果您使用的是 Windows,这可能是问题所在。 请注意,这主要用于在多台机器上分配并发作业。 它实际上是针对单机情况和集群设置进行了优化。许多设计决策(例如,共享内存、零拷贝序列化)都旨在很好地支持单机。 如果文档能进一步指出这一点,那就太好了。我从阅读文档中了解到它并不是真正适用于单个机器案例的。【参考方案4】:

正如其他人所说,解决方案是使用多个进程。然而,哪种框架更合适取决于许多因素。除了已经提到的,还有charm4py和mpi4py(我是charm4py的开发者)。

有一种比使用工作池抽象更有效的方法来实现上述示例。在 1000 次迭代中,主循环一遍又一遍地向工作人员发送相同的参数(包括完整的图 G)。由于至少有一个工作人员将驻留在不同的进程上,因此这涉及将参数复制并发送到其他进程。根据对象的大小,这可能非常昂贵。相反,让工作人员存储状态并简单地发送更新的信息是有意义的。

例如,在 charm4py 中可以这样:

class Worker(Chare):

    def __init__(self, Q, G, n):
        self.G = G
        ...

    def setinner(self, node1, node2):
        self.updateGraph(node1, node2)
        ...


def solve(Q, G, n):
    # create 2 workers, each on a different process, passing the initial state
    worker_a = Chare(Worker, onPE=0, args=[Q, G, n])
    worker_b = Chare(Worker, onPE=1, args=[Q, G, n])
    while i < 1000:
        result_a = worker_a.setinner(node1, node2, ret=True)  # execute setinner on worker A
        result_b = worker_b.setouter(node1, node2, ret=True)  # execute setouter on worker B

        inneropt, partition, x = result_a.get()  # wait for result from worker A
        outeropt = result_b.get()  # wait for result from worker B
        ...

请注意,对于此示例,我们实际上只需要一名工人。主循环可以执行其中一个功能,并让工作人员执行另一个。但我的代码有助于说明几件事:

    Worker A 在进程 0 中运行(与主循环相同)。当result_a.get() 被阻塞等待结果时,worker A 在同一个进程中进行计算。 参数通过引用自动传递给工人 A,因为它在同一个 过程(不涉及复制)。

【讨论】:

【参考方案5】:

在某些情况下,可以使用 Numba 自动并行化循环,尽管它只适用于 Python 的一小部分:

from numba import njit, prange

@njit(parallel=True)
def prange_test(A):
    s = 0
    # Without "parallel=True" in the jit-decorator
    # the prange statement is equivalent to range
    for i in prange(A.shape[0]):
        s += A[i]
    return s

不幸的是,Numba 似乎只适用于 Numpy 数组,但不适用于其他 Python 对象。理论上也可以先compile Python to C++再automatically parallelize it using the Intel C++ compiler,虽然我还没试过。

【讨论】:

【参考方案6】:

您可以使用joblib 库进行并行计算和多处理。

from joblib import Parallel, delayed

您可以简单地创建一个您希望并行运行的函数foo,并基于以下代码实现并行处理:

output = Parallel(n_jobs=num_cores)(delayed(foo)(i) for i in input)

其中num_cores可以从multiprocessing库中获取如下:

import multiprocessing

num_cores = multiprocessing.cpu_count()

如果您有一个具有多个输入参数的函数,并且您只想通过列表迭代其中一个参数,您可以使用functools 库中的partial 函数,如下所示:

from joblib import Parallel, delayed
import multiprocessing
from functools import partial
def foo(arg1, arg2, arg3, arg4):
    '''
    body of the function
    '''
    return output
input = [11,32,44,55,23,0,100,...] # arbitrary list
num_cores = multiprocessing.cpu_count()
foo_ = partial(foo, arg2=arg2, arg3=arg3, arg4=arg4)
# arg1 is being fetched from input list
output = Parallel(n_jobs=num_cores)(delayed(foo_)(i) for i in input)

您可以通过几个示例here 找到对 python 和 R 多处理的完整解释。

【讨论】:

【参考方案7】:

我总是使用“多处理”本机库来处理 Python 中的并行性。为了控制队列中的进程数,我使用共享变量作为计数器。在以下示例中,您可以看到简单流程的并行执行是如何工作的。您需要安装的唯一库是“coloredlogs”。

代码

# pip install coloredlogs==15.0.1

from multiprocessing import Pool, Manager, Value, cpu_count
from datetime import datetime
import coloredlogs
import logging
import time
import sys

LOG_LEVEL = "DEBUG"


def setup_logger(name: str = __name__, level: str = LOG_LEVEL) -> logging.Logger:
    assert level in ["NOTSET", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]

    logging.basicConfig(
        format="%(asctime)s %(name)s[%(process)d] %(levelname)s %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
        level=level,
        handlers=[logging.StreamHandler()]
    )

    logger = logging.getLogger(name)
    coloredlogs.install(level=level, logger=logger, isatty=True)

    return logger


def execute_process(name: str, queue: Value) -> None:
    logger = setup_logger()
    logger.info(f"Executing process: name...")
    time.sleep(5)
    queue.value -= 1


def create_processes(processes_names: [str], n_jobs: int = -1, waiting_time: int = 1) -> None:
    logger = setup_logger()

    if n_jobs <= 0:
        n_jobs = cpu_count()

    manager = Manager()
    pool = Pool(processes=n_jobs)
    queue = manager.Value('i', 0)
    lock = manager.Lock()
    start_time = datetime.now()

    with lock:  # Protecting the processes' queue shared variable.
        for name in processes_names:
            while True:
                if queue.value < n_jobs:
                    queue.value += 1

                    # Creating processes in parallel:
                    pool.apply_async(
                        func=execute_process,
                        args=(name, queue)
                    )

                    break
                else:
                    logger.debug(f"Pool full (n_jobs): waiting waiting_time seconds...")
                    time.sleep(waiting_time)

    pool.close()
    pool.join()

    exec_time = datetime.now() - start_time
    logger.info(f"Execution time: exec_time")


if __name__ == '__main__':
    processes_names = ["A", "B", "C", "D", "E", "F"]
    n_jobs = int(sys.argv[1])  # Number of jobs to run in parallel.

    # Creating and executing processes in parallel:
    create_processes(processes_names=processes_names, n_jobs=n_jobs)

执行与输出

user@host:~$ python run.py 1
2021-12-23 12:41:51 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:51 MYMACHINE __mp_main__[12352] INFO Executing process: A...
2021-12-23 12:41:52 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:53 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:54 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:55 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:56 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:57 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:57 MYMACHINE __mp_main__[12352] INFO Executing process: B...
2021-12-23 12:41:58 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:59 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:00 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
...
2021-12-23 12:42:10 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:11 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:12 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:12 MYMACHINE __mp_main__[12352] INFO Executing process: E...
2021-12-23 12:42:13 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:14 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:15 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:16 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:18 MYMACHINE __mp_main__[12352] INFO Executing process: F...
2021-12-23 12:42:23 MYMACHINE __main__[24180] INFO Execution time: 0:00:31.274478
user@host:~$ python run.py 3
2021-12-23 12:33:59 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:33:59 MYMACHINE __mp_main__[19776] INFO Executing process: A...
2021-12-23 12:33:59 MYMACHINE __mp_main__[24632] INFO Executing process: B...
2021-12-23 12:33:59 MYMACHINE __mp_main__[15852] INFO Executing process: C...
2021-12-23 12:34:00 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:01 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:02 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:03 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:04 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:05 MYMACHINE __mp_main__[19776] INFO Executing process: D...
2021-12-23 12:34:05 MYMACHINE __mp_main__[24632] INFO Executing process: E...
2021-12-23 12:34:05 MYMACHINE __mp_main__[15852] INFO Executing process: F...
2021-12-23 12:34:10 MYMACHINE __main__[7628] INFO Execution time: 0:00:11.087672
user@host:~$ python run.py 6
2021-12-23 12:40:48 MYMACHINE __mp_main__[26312] INFO Executing process: A...
2021-12-23 12:40:48 MYMACHINE __mp_main__[11468] INFO Executing process: B...
2021-12-23 12:40:48 MYMACHINE __mp_main__[12000] INFO Executing process: C...
2021-12-23 12:40:48 MYMACHINE __mp_main__[19864] INFO Executing process: D...
2021-12-23 12:40:48 MYMACHINE __mp_main__[25356] INFO Executing process: E...
2021-12-23 12:40:48 MYMACHINE __mp_main__[14504] INFO Executing process: F...
2021-12-23 12:40:53 MYMACHINE __main__[1180] INFO Execution time: 0:00:05.295934

【讨论】:

以上是关于如何在 Python 中进行并行编程?的主要内容,如果未能解决你的问题,请参考以下文章

全局变量如何在 Python 并行编程中工作?

python并行编程

Python并行编程的几个要点

Python并行编程:基于线程的并行

JVM是如何进行多线程并行编程的

40 Python - python并行编程 并行编程概述