从并行化函数返回数据帧字典?

Posted

技术标签:

【中文标题】从并行化函数返回数据帧字典?【英文标题】:Returning a dictionary of dataframes from parallelized function? 【发布时间】:2021-05-08 10:49:31 【问题描述】:

这里是 Python 和 GitHub/*** 新手,第一次尝试使用 joblib 和多处理来加快我在 Python 中的工作流程。

我定义了一个空的 OrderedDict 来存储由函数 (my_function) 生成的 DataFrame。该函数接受一个单独的 DataFrame 的列的元素,执行一些操作,并且应该返回(希望填充的)OrderedDict 和另一个 DataFrame。

请允许我提供一些伪代码来解释这一点:

from joblib import Parallel, delayed
from collections import OrderedDict
from tqdm import tqdm

import pandas as pd
import multiprocessing

my_dict = OrderedDict()
my_df = DataFrameofvalues

def my_function(k):

  my_dict[k] = someoperationswithpandasresultinginDataFrames
  
  my_df = someooperationswithpandas
  
  return my_dict, my_df
  
num_cores = multiprocessing.cpu_count()
inputs = tqdm(my_df['my_column'])

if __name__ == '__main__':
  my_dict, my_df = Parallel(n_jobs = num_cores)(delayed(my_function)(k for k in inputs)

这会导致以下错误:

  File "<ipython-input-52-df771b916ba5>", line 8, in <module>
    my_dict, my_df = Parallel(n_jobs = num_cores)(delayed(my_function)(k) for k in inputs)

ValueError: too many values to unpack (expected 2)

我想我忽略了一些小问题,但我就是找不到。有人可以帮我看看吗?

我无法在网上找到太多关于如何弄清楚我的函数试图解包多少值,确切地说(我猜输入中的元素数量?)或者它是否给了我所有的 DataFrames应该立即进入 OrderedDict。

谢谢,非常感谢!

根据进一步的故障排除进行编辑:

我想我知道问题出在哪里:该函数正在迭代输入并简单地生成数据帧,然后它无法将其组合为它所期望的字典。我通过设置 inputs = tqdm(my_df.loc[0:1, 'my_column']) 来解决这个问题。当我这样做时它可以工作,但如果我将其设置为输入 = tqdm(my_df.loc[0:2, 'my_column']) 则无法解压。不过目前还没有解决方案。

【问题讨论】:

【参考方案1】:

我相信它与语法有关。您没有以正确的方式为您的函数提供参数。您可以尝试将最后一行拆分成更小的部分,以确定哪一部分中断。

另外,这不是正确的列表理解:

(delayed(my_function)(k) for k in inputs)

也许你想要这个:

[delayed(my_function(k)) for k in inputs]

希望这对您有所帮助。祝你好运!

【讨论】:

谢谢乔普,我尝试了你的解决方案,它给了我一个 SyntaxError。我想我知道问题出在哪里:该函数正在迭代输入并简单地生成数据帧,然后它不能将它们组合成一个字典。我通过设置 inputs = tqdm(my_df.loc[0:1, 'my_column']) 解决了这个问题。当我这样做时它可以工作,但是如果我将它设置为 inputs = tqdm(my_df.loc[0:2, 'my_column']) 没错,我提出的解决方案并不完整。我正试图引导你朝着正确的方向前进。请谷歌如何在 Python 中使用函数参数。此外,您不能在一行中执行两个相邻的函数(例如,func1()func2()),您需要将它们放在不同的行中。【参考方案2】:

想出了如何获得我想要并认为我会分享的东西。

以下sn-p的伪代码:

if __name__ == '__main__':
  my_dict, my_df = Parallel(n_jobs = num_cores)(delayed(my_function)(k for k in inputs)

实际上是给了我一个数据框列表。我把它改成:

if __name__ == '__main__':
  my_list = Parallel(n_jobs = num_cores)(delayed(my_function)(k for k in inputs)

for i in range(len(my_list)):
   if len(results[i]) > 0:
        my_list[i] = my_list[i].reset_index(drop = True)            
        my_dict[str(my_list[i].loc[0,'col1'])] = my_list[i]

现在返回一个数据帧字典。不完全是我一开始就在寻找的东西,但就我的目的而言,甚至更好。

【讨论】:

【参考方案3】:

TL;DR:

#Python3

from multiprocessing import Process, Manager
from collections import OrderedDict


def update_dict(my_dict, key):
    # Insert your DataFrame calculations here!
    my_dict[key] = '1st df': 'result_df_1',
                    '2nd df': 'result_df_2'
    return


if __name__ == "__main__":
    # whatever your inputs are
    inputs = [x for x in range(4)]

    manager = Manager()
    global_dict = manager.dict()
    job = [Process(target=update_dict, args=(global_dict, _input)) for _input in inputs]
    _ = [p.start() for p in job]
    _ = [p.join() for p in job]

    [print(f"x") for x in global_dict.items()]
    
    # N.B assumes numeric (sortable) keys:
    # dictionary sorted by key -- OrderedDict(sorted(d.items()) also works
    ordered_global_dict = OrderedDict(sorted(global_dict.items(), key=lambda t: t[0]))
    print(ordered_global_dict.items())

    # accessing the results dataframe from the dict
    results_df_01 = ordered_global_dict[0]['results_df_1']
    results_df_02 = ordered_global_dict[0]['results_df_2']

#output:
odict_items([(0, '1st df': 'result_df_1', '2nd df': 'result_df_2'), (1, '1st df': 'result_df_1', '2nd df': 'result_df_2'), (2, '1st df': 'result_df_1', '2nd df': 'result_df_2'), (3, '1st df': 'result_df_1', '2nd df': 'result_df_2')])


说明

好问题,尽管您要实现的目标有点模棱两可。例如,您从每个进程返回 my_df,但错误地将 all 的输出数据帧分配给单个变量: my_dict, my_df = Parallel(....

根据我的理解,我会回答,好像您需要my_function 来更新表单的全局字典: key: secondary_key: &lt;dataframe&gt;.

让我们一点一点地来看看。 这是我从related question on S.O 中找到的答案,我的答案基于:

在 (1) 启动每个进程和 (2) 必须跨多个进程复制 pandas.DataFrame (等)时,您会产生大量开销。如果您只需要并行填充字典,我建议您使用共享内存字典。如果没有键会被覆盖,那么这很容易,您不必担心锁。

这是他们提供的解决方案:

>>> from multiprocess import Process, Manager
>>> 
>>> def f(d, x):
...   d[x] = x**2
... 
>>> manager = Manager()
>>> d = manager.dict()
>>> job = [Process(target=f, args=(d, i)) for i in range(5)]
>>> _ = [p.start() for p in job]
>>> _ = [p.join() for p in job]
>>> print d
0: 0, 1: 1, 2: 4, 3: 9, 4: 16

只需进行少量修改,我们就可以轻松地重新调整此代码的用途以供您使用。

def f(d, x):
    d[x] = x**2

变成(使用你的变量名)。

def update_dict(my_dict, key):
    # Insert your DataFrame calculations here!
    my_dict[key] = '1st df': 'result_df_1',
                    '2nd df': 'result_df_2'
    return
 

而剩下的代码变成:

inputs = [x for x in range(4)]

manager = Manager()
global_dict = manager.dict()
job = [Process(target=update_dict, args=(global_dict, _input)) for _input in inputs]
_ = [p.start() for p in job]
_ = [p.join() for p in job]
[print(f"x") for x in global_dict.items()]

打印如下:

#python3.9
(1, '1st df': 'result_df_1', '2nd df': 'result_df_2')
(0, '1st df': 'result_df_1', '2nd df': 'result_df_2')
(2, '1st df': 'result_df_1', '2nd df': 'result_df_2')
(3, '1st df': 'result_df_1', '2nd df': 'result_df_2')

注意这里的字典是无序的。最后一步是让我们订购您的字典。假设您的键是整数,您可以使用:

from collections import OrderedDict
# dictionary sorted by key -- OrderedDict(sorted(d.items()) also works
ordered_global_dict = OrderedDict(sorted(global_dict.items())

【讨论】:

谢谢乔!我实际上正在考虑做其他事情,但这教会了我很多。 很高兴我能帮上忙

以上是关于从并行化函数返回数据帧字典?的主要内容,如果未能解决你的问题,请参考以下文章

从具有数据帧格式的函数返回两个数据帧

如何从power bi中的python函数返回单个数据帧

减少火花返回字典而不是数据帧中的操作

从并行for循环返回结果

在函数中返回不同的数据帧 - R

如何让 df.loc 从数据帧的特定单元格返回值(数字)?