Python Pandas 多处理无结果返回
Posted
技术标签:
【中文标题】Python Pandas 多处理无结果返回【英文标题】:Python Pandas multiprocessing no result return 【发布时间】:2021-11-22 16:52:41 【问题描述】:我有一个df,你可以通过复制粘贴来获得:
import pandas as pd
from io import StringIO
df = """
ValOption RB test
0 SLA 4 3
1 AC 5 4
2 SLA 5 5
3 AC 2 4
4 SLA 5 5
5 AC 3 4
6 SLA 4 3
"""
df = pd.read_csv(StringIO(df.strip()), sep='\s+')
输出:
ValOption RB test
0 SLA 4 3
1 AC 5 4
2 SLA 5 5
3 AC 2 4
4 SLA 5 5
5 AC 3 4
6 SLA 4 3
然后我有 2 个函数来为此 df 构建新列:
def func1():
df['r1']=df['test']+1
return df['r1']
def func2():
df['r2']=df['RB']+1
return df['r2']
在我调用这两个函数之后:
func1()
func2()
输出:
ValOption RB test r1 r2
0 SLA 4 3 4 5
1 AC 5 4 5 6
2 SLA 5 5 6 6
3 AC 2 4 5 3
4 SLA 5 5 6 6
5 AC 3 4 5 4
6 SLA 4 3 4 5
但是当我尝试使用多处理时,我无法获得新列:
import multiprocessing
if __name__ == '__main__':
p1 = multiprocessing.Process(target=func1)
p2 = multiprocessing.Process(target=func2)
p1.start()
p2.start()
p1.join()
p2.join()
输出:
ValOption RB test
0 SLA 4 3
1 AC 5 4
2 SLA 5 5
3 AC 2 4
4 SLA 5 5
5 AC 3 4
6 SLA 4 3
多处理没有返回函数中的值。有朋友可以帮忙吗?
【问题讨论】:
【参考方案1】:好的,然后通过创建一个类来更改您的代码:
from multiprocessing import Process
class Test:
def __init__(self, df):
self.df = df
def func1(self):
df['r1'] = df['test']+1
def func2(self):
df['r2'] = df['RB']+1
p1 = Process(target=Test(df).func1())
p2 = Process(target=Test(df).func2())
p1.start()
p2.start()
p1.join()
p2.join()
这肯定会起作用
【讨论】:
【参考方案2】:如果您使用multiprocessing.Pool
并将您的函数重写为更通用,您可以使用将输入映射到输出:
>>> def func(series):
... return series + 1
...
>>> with multiprocessing.Pool(2) as p:
... dat = p.map(func, [df['test'].rename('r1'), df['RB'].rename('r2')])
...
然后在并行处理之外,使用获得的结果修改数据帧,例如使用df.join()
:
>>> df.join(dat)
ValOption RB test r1 r2
0 SLA 4 3 4 5
1 AC 5 4 5 6
2 SLA 5 5 6 6
3 AC 2 4 5 3
4 SLA 5 5 6 6
5 AC 3 4 5 4
6 SLA 4 3 4 5
否则,获得结果的最佳选择是任务队列,请参阅bottom of the multiprocessing page 中的示例。同样,您希望在任务中进行计算,但不修改任何共享数据结构。在它们执行后,您可以再次将它们结合在一起。
更复杂的解决方案可能不得不求助于multiprocessing.Manager
子类,因为pandas 系列和数据框是不适合multiprocessing.sharedctypes.*
选项的复杂对象。
【讨论】:
【参考方案3】:我猜您正在使用笔记本并且您正在尝试包含 if __name__ == '__main__':
的单元格?
如果是这样,只需在其外部运行该函数 - 就像这样:
import multiprocessing
p1 = multiprocessing.Process(target=func1)
p2 = multiprocessing.Process(target=func2)
p1.start()
p2.start()
p1.join()
p2.join()
或保留它,但在这种情况下,将其作为 python 文件执行。
【讨论】:
我在 linux 和 notebook 中都厌倦了这个,不工作【参考方案4】:Pandas 数据帧绝对不是线程安全的。想一想,如果 func2 完成时,你在 func1 中途会发生什么! (而且 pandas 绝对不是原子的)。
幸运的是,multiprocessing 刚刚复制了变量并处理了副本(实际上它已经序列化变量并将其发送到子进程)。所以如果你想在多处理中工作,你可以采用这个工作流程:
将任务分解为步骤 工作函数采取步骤并计算结果 编译结果并将其应用回对象查看multiprocessing.Pool
的一些教程,了解这是如何完成的。
【讨论】:
以上是关于Python Pandas 多处理无结果返回的主要内容,如果未能解决你的问题,请参考以下文章
Python Pandas hdfstore 的 select(where='') 返回不合格的结果