多处理的并行性几乎没有减少时间

Posted

技术标签:

【中文标题】多处理的并行性几乎没有减少时间【英文标题】:Parallelism by multiprocessing is barely reducing time 【发布时间】:2021-12-23 22:03:29 【问题描述】:

我使用 this 和 this 并行运行 2 个函数调用,但时间几乎没有改善。这是我的代码:

顺序:

from nltk import pos_tag

def posify(txt):
    return ' '.join([pair[1] for pair in pos_tag(txt.split())])

df1['pos'] = df1['txt'].apply(posify)  # ~15 seconds
df2['pos'] = df2['txt'].apply(posify)  # ~15 seconds
# Total Time: 30 seconds

平行:

from nltk import pos_tag
import multiprocessing

def posify(txt):
    return ' '.join([pair[1] for pair in pos_tag(txt.split())])

def posify_parallel(ser, key_name, shared_dict):
    shared_dict[key_name] = ser.apply(posify)

manager = multiprocessing.Manager()
return_dict = manager.dict()
p1 = multiprocessing.Process(target=posify_parallel, args=(df1['txt'], 'df1', return_dict))
p1.start()
p2 = multiprocessing.Process(target=posify_parallel, args=(df2['txt'], 'df2', return_dict))
p2.start()
p1.join(), p2.join()
df1['pos'] = return_dict['df1']
df2['pos'] = return_dict['df2']
# Total Time: 27 seconds

我预计总时间约为 15 秒,但我得到了 27 秒。 如果有什么不同的话,我有一个 6 核(12 个逻辑)的 i7 2.6GHz CPU。

是否有可能在 15 秒左右达到目标?这是否与pos_tag 函数本身有关?


编辑:

我最终只是做了以下事情,现在是 15 秒:

with Pool(cpu_count()) as pool:
    df1['pos'] = pool.map(posify, df1['txt'])
    df2['pos'] = pool.map(posify, df2['txt'])

我认为这样行运行顺序,但它们中的每一个在内部并行运行。只要是 15 秒,我就可以。

【问题讨论】:

你有一个非常大的数据框吗? @BrutusForcus - 每行 9K 行。 【参考方案1】:

从进程传回数据的更常用方法是通过multiprocessing.Queue 实例。由于不知道数据帧数据的具体细节和处理结果,我无法量化从托管字典切换会提高多少性能,但使用队列应该会提高性能。

from nltk import pos_tag
import multiprocessing

def posify(txt):
    return ' '.join([pair[1] for pair in pos_tag(txt.split())])

def posify_parallel(ser, which_df, q):
    # Pass back the results along with which dataframe the results are for:
    q.put((which_df, ser.apply(posify)))

q = multiprocessing.Queue()
p1 = multiprocessing.Process(target=posify_parallel, args=(df1['txt'], 1, q))
p1.start()
p2 = multiprocessing.Process(target=posify_parallel, args=(df2['txt'], 2, q))
p2.start()
# Get the results:
for _ in range(2):
    # Must do the gets before joing the processes!
    which_df, results = q.get()
    if which_df == 1:
        df1['pos'] = results
    else:
        # assert(which_df == 2)
        df2['pos'] = results
p1.join()
p2.join()

要使用多处理池:

from nltk import pos_tag
import multiprocessing

def posify(txt):
    return ' '.join([pair[1] for pair in pos_tag(txt.split())])

def posify_parallel(ser):
    return ser.apply(posify)

pool = multiprocessing.Pool(2)
results1 = pool.apply_async(posify_parallel, args=(df1['txt'],))
results2 = pool.apply_async(posify_parallel, args=(df2['txt'],))
df1['pos'] = results1.get()
df2['pos'] = results2.get()

【讨论】:

谢谢我尝试了你的第二个选项,它让我得到了 21-22 秒 因此,将数据从一个地址空间(即进程)移动到另一个地址空间(即进程)总是会产生开销,而在非并行版本中您没有,这可能是一个只会消失的问题当原始数据在共享内存中时。我不能说,不熟悉nltk,您是否还有其他问题。

以上是关于多处理的并行性几乎没有减少时间的主要内容,如果未能解决你的问题,请参考以下文章

为啥 Haskell 中没有隐式并行性?

make 并行性的系统限制

使用 OPTION (MAXDOP 1) 减少 SQL Server 中的并行性是不是安全?

有没有办法在这个 R 代码中进行并行处理?

多线程也不一定比单线程快

在本书的背景下,所谓的“细粒度并行”究竟意味着什么?