为啥这个函数不并行?
Posted
技术标签:
【中文标题】为啥这个函数不并行?【英文标题】:Why is this function not paralleled?为什么这个函数不并行? 【发布时间】:2022-01-12 08:09:40 【问题描述】:我有一个数据框 df2
,它是 df
的副本。对于 col_2
列中的每个唯一值 c。我想随机提取 col_2
中对应值为 c 的 2 行。如果可用行数小于 2,则提取所有行。然后我在batch
列中将选定的行从 1 标记到 2。
您能否解释一下为什么我的函数不能对列表['a', 'b', 'c']
中的所有值执行这项工作。例如,我观察
这意味着函数没有实现 b
和 c
的值。
import pandas as pd
import os
from multiprocessing import dummy
from random import sample
core = os.cpu_count()
P = dummy.Pool(processes = core)
data = np.array([(3, 'a'), (2, 'a'), (1, 'b'), (0, 'c'), (2, 'c'), (3, 'c')],
dtype=[('col_1', 'i4'), ('col_2', 'U1')])
df = pd.DataFrame.from_records(data)
df['batch'] = 0
df2 = df.copy()
def func(c):
idx = df.col_2 == c
pop = list(df[idx].index)
m = min(2, len(pop))
r = list(sample(pop, m))
df2.loc[r, 'batch'] = list(range(1, m + 1, 1))
P.map(func, ['a', 'b', 'c'])
df2
【问题讨论】:
因为你不能在主进程和其他进程之间共享任何变量。 @Corralien 你的意思是变量df2
在线程之间共享?有没有办法通过并行化获得类似的结果?我的数据集很大,所以按顺序做很慢。
你的数据框有多大,有多少组?
@Corralien 它有 32717928 行和 2193 个组。
【参考方案1】:
我不确定multiprocessing
是不是正确答案。保存下面的代码并执行它。我创建了一个包含 40,000,000 条记录和 2500 个组的 DataFrame。在此代码中,您有 2 个实现,用于多处理和单处理。
输出:
Dataframe: 40000000 records for 2500 groups
[MP] Elapsed time: 5.66 seconds
[SP] Elapsed time: 4.48 seconds
import pandas as pd
import numpy as np
import multiprocessing
import time
def func_mp(col, df):
print(f"Group: col (len(df) records)")
out = df.sample(n=2) if len(df) >= 2 else df
out['batch'] = np.arange(0, len(out))
return out
def func_sp(df):
print(f"Group: df.name (len(df) records)")
out = df.sample(n=2) if len(df) >= 2 else df
out['batch'] = np.arange(0, len(out))
return out
if __name__ == '__main__':
N = 40000000
col_1 = np.random.randint(1, 1000, N)
col_2 = np.random.randint(0, 2500, N)
df = pd.DataFrame('col_1': col_1, 'col_2': col_2)
start = time.time()
with multiprocessing.Pool(multiprocessing.cpu_count()) as pool:
data = pool.starmap(func_mp, df.groupby('col_2'))
out1 = pd.concat(data)
end = time.time()
timemp = end - start
start = time.time()
out2 = df.groupby('col_2', as_index=False).apply(func_sp)
end = time.time()
timesp = end - start
print()
print(f"Dataframe: len(df) records for len(df['col_2'].unique()) groups")
print(f"[MP] Elapsed time: timemp:.2f seconds")
print(f"[SP] Elapsed time: timesp:.2f seconds")
【讨论】:
我在笔记本电脑上运行您的代码。已经5分钟了,跑步还没有结束。我的笔记本电脑的 CPU 包含 6 个内核和 12 个线程。我想知道为什么代码在你的机器上运行只需要 6 秒。 你用示例玩具运行我的代码? 是的,我只是复制并运行它。 我刚刚记录了过程here。请看一下。 我不信任 Jupyter Notebook。请问可以从控制台运行脚本吗?以上是关于为啥这个函数不并行?的主要内容,如果未能解决你的问题,请参考以下文章