为啥这个函数不并行?

Posted

技术标签:

【中文标题】为啥这个函数不并行?【英文标题】:Why is this function not paralleled?为什么这个函数不并行? 【发布时间】:2022-01-12 08:09:40 【问题描述】:

我有一个数据框 df2,它是 df 的副本。对于 col_2 列中的每个唯一值 c。我想随机提取 col_2 中对应值为 c 的 2 行。如果可用行数小于 2,则提取所有行。然后我在batch 列中将选定的行从 1 标记到 2。

您能否解释一下为什么我的函数不能对列表['a', 'b', 'c'] 中的所有值执行这项工作。例如,我观察

这意味着函数没有实现 bc 的值。

import pandas as pd
import os
from multiprocessing import dummy
from random import sample
core = os.cpu_count()
P = dummy.Pool(processes = core)

data = np.array([(3, 'a'), (2, 'a'), (1, 'b'), (0, 'c'), (2, 'c'), (3, 'c')],
                dtype=[('col_1', 'i4'), ('col_2', 'U1')])
df = pd.DataFrame.from_records(data)
df['batch'] = 0
df2 = df.copy()

def func(c):
    idx = df.col_2 == c
    pop = list(df[idx].index)
    m = min(2, len(pop))
    r = list(sample(pop, m))    
    df2.loc[r, 'batch'] = list(range(1, m + 1, 1))
    
    
P.map(func, ['a', 'b', 'c'])
df2

【问题讨论】:

因为你不能在主进程和其他进程之间共享任何变量。 @Corralien 你的意思是变量df2 在线程之间共享?有没有办法通过并行化获得类似的结果?我的数据集很大,所以按顺序做很慢。 你的数据框有多大,有多少组? @Corralien 它有 32717928 行和 2193 个组。 【参考方案1】:

我不确定multiprocessing 是不是正确答案。保存下面的代码并执行它。我创建了一个包含 40,000,000 条记录和 2500 个组的 DataFrame。在此代码中,您有 2 个实现,用于多处理和单处理。

输出:

Dataframe: 40000000 records for 2500 groups
[MP] Elapsed time: 5.66 seconds
[SP] Elapsed time: 4.48 seconds
import pandas as pd
import numpy as np
import multiprocessing
import time

def func_mp(col, df):
    print(f"Group: col (len(df) records)")
    out = df.sample(n=2) if len(df) >= 2 else df
    out['batch'] = np.arange(0, len(out))
    return out

def func_sp(df):
    print(f"Group: df.name (len(df) records)")
    out = df.sample(n=2) if len(df) >= 2 else df
    out['batch'] = np.arange(0, len(out))
    return out

if __name__ == '__main__':
    N = 40000000
    col_1 = np.random.randint(1, 1000, N)
    col_2 = np.random.randint(0, 2500, N)
    df = pd.DataFrame('col_1': col_1, 'col_2': col_2)

    start = time.time()
    with multiprocessing.Pool(multiprocessing.cpu_count()) as pool:
        data = pool.starmap(func_mp, df.groupby('col_2'))
        out1 = pd.concat(data)
    end = time.time()
    timemp = end - start

    start = time.time()
    out2 = df.groupby('col_2', as_index=False).apply(func_sp)
    end = time.time()
    timesp = end - start

    print()
    print(f"Dataframe: len(df) records for len(df['col_2'].unique()) groups")
    print(f"[MP] Elapsed time: timemp:.2f seconds")
    print(f"[SP] Elapsed time: timesp:.2f seconds")

【讨论】:

我在笔记本电脑上运行您的代码。已经5分钟了,跑步还没有结束。我的笔记本电脑的 CPU 包含 6 个内核和 12 个线程。我想知道为什么代码在你的机器上运行只需要 6 秒。 你用示例玩具运行我的代码? 是的,我只是复制并运行它。 我刚刚记录了过程here。请看一下。 我不信任 Jupyter Notebook。请问可以从控制台运行脚本吗?

以上是关于为啥这个函数不并行?的主要内容,如果未能解决你的问题,请参考以下文章

为啥 Apache Spark 的功能不并行?

为啥 Haskell 中没有隐式并行性?

为啥我的线程函数中的局部变量会被其他线程中断?

为啥 pool.map 会删除数据操作?

为啥这个字谜函数不正确?

为啥这个函数不超过值 2^31?