与光线的并行化未按预期工作

Posted

技术标签:

【中文标题】与光线的并行化未按预期工作【英文标题】:Parallelization with ray not working as expected 【发布时间】:2022-01-21 16:56:12 【问题描述】:

我是并行处理的初学者,我目前正在尝试一个简单的程序来了解 Ray 的工作原理。

import numpy as np
import time
from pprint import pprint
import ray

ray.init(num_cpus = 4) # Specify this system has 4 CPUs.

data_rows = 800
data_cols = 10000
batch_size = int(data_rows/4)

# Prepare data
np.random.RandomState(100)
arr = np.random.randint(0, 100, size=[data_rows, data_cols])
data = arr.tolist()

# Solution Without Paralleization
def howmany_within_range(row, minimum, maximum):
    """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
    count = 0
    for n in row:
        if minimum <= n <= maximum:
            count = count + 1
    return count

results = []

start = time.time()
for row in data:
    results.append(howmany_within_range(row, minimum=75, maximum=100))
end = time.time()

print("Without parallelization")
print("-----------------------")
pprint(results[:5])
print("Total time: ", end-start, "sec")


# Parallelization with ray
results = []
y = []
z = []
w = []

@ray.remote
def solve(data, minimum, maximum):
    count = 0
    count_row = 0
    for i in data:
        for n in i:
            if minimum <= n <= maximum:
               count = count + 1
        count_row = count
        count = 0    
    return count_row

start = time.time()
results = ray.get([solve.remote(data[i:i+1], 75, 100) for i in range(0, batch_size)])
y = ray.get([solve.remote(data[i:i+1], 75, 100) for i in range(1*batch_size, 2*batch_size)])
z = ray.get([solve.remote(data[i:i+1], 75, 100) for i in range(2*batch_size, 3*batch_size)])
w = ray.get([solve.remote(data[i:i+1], 75, 100) for i in range(3*batch_size, 4*batch_size)])
end = time.time()

results += y+z+w

print("With parallelization")
print("--------------------")
print(results[:5])
print("Total time: ", end-start, "sec")

Ray 的性能越来越慢:

$ python3 raytest.py 
Without parallelization
-----------------------
[2501, 2543, 2530, 2410, 2467]
Total time:  0.5162293910980225 sec
(solve pid=26294) 
With parallelization
--------------------
[2501, 2543, 2530, 2410, 2467]
Total time:  1.1760196685791016 sec

事实上,如果我放大输入数据,我会在终端中收到带有函数 pid 的消息,并且程序会停止。

本质上,我尝试将计算分成几批行,并将每个计算分配给一个 cpu 核心。我做错了什么?

【问题讨论】:

【参考方案1】:

在多处理(您的代码)方面存在两个主要问题

    与生成新进程来完成工作相关的开销。 在不同进程之间传输数据会产生开销。

为了产生一个新进程,python 解释器的一个新实例被创建并初始化(由于GIL)。同样,当您在进程之间传输数据时,必须在发送方/接收方对这些数据进行序列化/反序列化,这在您的程序中发生了两次(一次从主进程到工作人员,一次从工作人员到主进程。),所以在简而言之,您的程序正在花费所有时间来支付这些开销,而不是进行实际计算。

如果您想在 python 中利用多处理的好处,您应该使用尽可能少的数据传输在工作人员处完成更多计算,我通常确定使用多处理是否是一个好主意的方式是任务是否是在单个 cpu 上完成需要超过 5 秒。

另一个减少数据传输的好主意是将数组切片(多行)而不是每个函数调用单行,因为每一行都必须单独序列化,这会增加额外的开销。

【讨论】:

谢谢。你的意思是“在你的程序中发生了两次(一次从主进程到工作人员,一次从工作人员到主进程。),所以简而言之,你的程序正在花费所有时间来支付这个开销而不是做实际的计算。”?您能否向我解释一下这种转移发生在代码中的什么位置以及如何避免它? 函数输入参数和返回必须被传输,如果你想传输更多数据,还有其他方法可以在进程之间传输数据,比如队列,但是为了避免数据传输,你可以构造工作函数中的数据,而不是将其作为参数传递,例如,如果从磁盘读取数据,则可以让工作进程从磁盘而不是主进程读取,因此不必传输数据跨函数调用。

以上是关于与光线的并行化未按预期工作的主要内容,如果未能解决你的问题,请参考以下文章

如何在 python 的类中使用光线并行性?

为什么OpenMP没有并行化vtk IntersectWithLine代码

与 Rborist 并行化

通过 JDBC 进行并行化 - Pyspark - 并行化如何使用 JDBC 工作?

“尴尬平行”的反义词是啥?

C++ 并行化库:OpenMP 与线程构建块 [关闭]