Python Pandas 多处理无结果返回

Posted

技术标签:

【中文标题】Python Pandas 多处理无结果返回【英文标题】:Python Pandas multiprocessing no result return 【发布时间】:2021-11-22 16:52:41 【问题描述】:

我有一个df,你可以通过复制粘贴来获得:

import pandas as pd
from io import StringIO

df = """
  ValOption  RB test 
0       SLA  4  3       
1       AC   5  4           
2       SLA  5  5          
3       AC   2  4       
4       SLA  5  5         
5       AC   3  4          
6       SLA  4  3         

"""
df = pd.read_csv(StringIO(df.strip()), sep='\s+')

输出:

ValOption   RB  test
0     SLA   4   3
1     AC    5   4
2     SLA   5   5
3     AC    2   4
4     SLA   5   5
5     AC    3   4
6     SLA   4   3

然后我有 2 个函数来为此 df 构建新列:

def func1():
    df['r1']=df['test']+1
    return df['r1']

def func2():
    df['r2']=df['RB']+1
    return df['r2']

在我调用这两个函数之后:

func1()
func2()

输出:

ValOption   RB  test    r1  r2
0    SLA    4   3      4    5
1     AC    5   4      5    6
2     SLA   5   5      6    6
3     AC    2   4      5    3
4     SLA   5   5      6    6
5     AC    3   4      5    4
6     SLA   4   3      4    5

但是当我尝试使用多处理时,我无法获得新列:

import multiprocessing
if __name__ ==  '__main__':

    p1 = multiprocessing.Process(target=func1)
    p2 = multiprocessing.Process(target=func2)

    p1.start()
    p2.start()

    p1.join()
    p2.join()

输出:

ValOption   RB  test
0    SLA    4   3
1     AC    5   4
2    SLA    5   5
3     AC    2   4
4    SLA    5   5
5     AC    3   4
6    SLA    4   3

多处理没有返回函数中的值。有朋友可以帮忙吗?

【问题讨论】:

【参考方案1】:

好的,然后通过创建一个类来更改您的代码:

from multiprocessing import Process

class Test:
    def __init__(self, df):
        self.df = df
        
    def func1(self):
        df['r1'] = df['test']+1

    def func2(self):
        df['r2'] = df['RB']+1

p1 = Process(target=Test(df).func1())
p2 = Process(target=Test(df).func2())

p1.start()
p2.start()

p1.join()
p2.join()

这肯定会起作用

【讨论】:

【参考方案2】:

如果您使用multiprocessing.Pool 并将您的函数重写为更通用,您可以使用将输入映射到输出:

>>> def func(series):
...   return series + 1
... 
>>> with multiprocessing.Pool(2) as p:
...   dat = p.map(func, [df['test'].rename('r1'), df['RB'].rename('r2')])
... 

然后在并行处理之外,使用获得的结果修改数据帧,例如使用df.join():

>>> df.join(dat)
  ValOption  RB  test  r1  r2
0       SLA   4     3   4   5
1        AC   5     4   5   6
2       SLA   5     5   6   6
3        AC   2     4   5   3
4       SLA   5     5   6   6
5        AC   3     4   5   4
6       SLA   4     3   4   5

否则,获得结果的最佳选择是任务队列,请参阅bottom of the multiprocessing page 中的示例。同样,您希望在任务中进行计算,但不修改任何共享数据结构。在它们执行后,您可以再次将它们结合在一起。

更复杂的解决方案可能不得不求助于multiprocessing.Manager 子类,因为pandas 系列和数据框是不适合multiprocessing.sharedctypes.* 选项的复杂对象。

【讨论】:

【参考方案3】:

我猜您正在使用笔记本并且您正在尝试包含 if __name__ == '__main__': 的单元格?

如果是这样,只需在其外部运行该函数 - 就像这样:

import multiprocessing

p1 = multiprocessing.Process(target=func1)
p2 = multiprocessing.Process(target=func2)

p1.start()
p2.start()

p1.join()
p2.join()

或保留它,但在这种情况下,将其作为 python 文件执行。

【讨论】:

我在 linux 和 notebook 中都厌倦了这个,不工作【参考方案4】:

Pandas 数据帧绝对不是线程安全的。想一想,如果 func2 完成时,你在 func1 中途会发生什么! (而且 pandas 绝对不是原子的)。

幸运的是,multiprocessing 刚刚复制了变量并处理了副本(实际上它已经序列化变量并将其发送到子进程)。所以如果你想在多处理中工作,你可以采用这个工作流程:

将任务分解为步骤 工作函数采取步骤并计算结果 编译结果并将其应用回对象

查看multiprocessing.Pool 的一些教程,了解这是如何完成的。

【讨论】:

以上是关于Python Pandas 多处理无结果返回的主要内容,如果未能解决你的问题,请参考以下文章

Python+Pandas入门2——导出csv

Python Pandas hdfstore 的 select(where='') 返回不合格的结果

Python 多处理管理器在烧瓶 API 中使用时显示错误

Python多处理:如何创建x个进程并返回返回值

处理来自不同函数的多个返回数据集 python pandas

如何优雅处理多参数返回/无参数返回——std::optional