从并行化函数返回数据帧字典?
Posted
技术标签:
【中文标题】从并行化函数返回数据帧字典?【英文标题】:Returning a dictionary of dataframes from parallelized function? 【发布时间】:2021-05-08 10:49:31 【问题描述】:这里是 Python 和 GitHub/*** 新手,第一次尝试使用 joblib 和多处理来加快我在 Python 中的工作流程。
我定义了一个空的 OrderedDict 来存储由函数 (my_function) 生成的 DataFrame。该函数接受一个单独的 DataFrame 的列的元素,执行一些操作,并且应该返回(希望填充的)OrderedDict 和另一个 DataFrame。
请允许我提供一些伪代码来解释这一点:
from joblib import Parallel, delayed
from collections import OrderedDict
from tqdm import tqdm
import pandas as pd
import multiprocessing
my_dict = OrderedDict()
my_df = DataFrameofvalues
def my_function(k):
my_dict[k] = someoperationswithpandasresultinginDataFrames
my_df = someooperationswithpandas
return my_dict, my_df
num_cores = multiprocessing.cpu_count()
inputs = tqdm(my_df['my_column'])
if __name__ == '__main__':
my_dict, my_df = Parallel(n_jobs = num_cores)(delayed(my_function)(k for k in inputs)
这会导致以下错误:
File "<ipython-input-52-df771b916ba5>", line 8, in <module>
my_dict, my_df = Parallel(n_jobs = num_cores)(delayed(my_function)(k) for k in inputs)
ValueError: too many values to unpack (expected 2)
我想我忽略了一些小问题,但我就是找不到。有人可以帮我看看吗?
我无法在网上找到太多关于如何弄清楚我的函数试图解包多少值,确切地说(我猜输入中的元素数量?)或者它是否给了我所有的 DataFrames应该立即进入 OrderedDict。
谢谢,非常感谢!
根据进一步的故障排除进行编辑:
我想我知道问题出在哪里:该函数正在迭代输入并简单地生成数据帧,然后它无法将其组合为它所期望的字典。我通过设置 inputs = tqdm(my_df.loc[0:1, 'my_column']) 来解决这个问题。当我这样做时它可以工作,但如果我将其设置为输入 = tqdm(my_df.loc[0:2, 'my_column']) 则无法解压。不过目前还没有解决方案。
【问题讨论】:
【参考方案1】:我相信它与语法有关。您没有以正确的方式为您的函数提供参数。您可以尝试将最后一行拆分成更小的部分,以确定哪一部分中断。
另外,这不是正确的列表理解:
(delayed(my_function)(k) for k in inputs)
也许你想要这个:
[delayed(my_function(k)) for k in inputs]
希望这对您有所帮助。祝你好运!
【讨论】:
谢谢乔普,我尝试了你的解决方案,它给了我一个 SyntaxError。我想我知道问题出在哪里:该函数正在迭代输入并简单地生成数据帧,然后它不能将它们组合成一个字典。我通过设置 inputs = tqdm(my_df.loc[0:1, 'my_column']) 解决了这个问题。当我这样做时它可以工作,但是如果我将它设置为 inputs = tqdm(my_df.loc[0:2, 'my_column']) 没错,我提出的解决方案并不完整。我正试图引导你朝着正确的方向前进。请谷歌如何在 Python 中使用函数参数。此外,您不能在一行中执行两个相邻的函数(例如,func1()func2()
),您需要将它们放在不同的行中。【参考方案2】:
想出了如何获得我想要并认为我会分享的东西。
以下sn-p的伪代码:
if __name__ == '__main__':
my_dict, my_df = Parallel(n_jobs = num_cores)(delayed(my_function)(k for k in inputs)
实际上是给了我一个数据框列表。我把它改成:
if __name__ == '__main__':
my_list = Parallel(n_jobs = num_cores)(delayed(my_function)(k for k in inputs)
for i in range(len(my_list)):
if len(results[i]) > 0:
my_list[i] = my_list[i].reset_index(drop = True)
my_dict[str(my_list[i].loc[0,'col1'])] = my_list[i]
现在返回一个数据帧字典。不完全是我一开始就在寻找的东西,但就我的目的而言,甚至更好。
【讨论】:
【参考方案3】:TL;DR:
#Python3
from multiprocessing import Process, Manager
from collections import OrderedDict
def update_dict(my_dict, key):
# Insert your DataFrame calculations here!
my_dict[key] = '1st df': 'result_df_1',
'2nd df': 'result_df_2'
return
if __name__ == "__main__":
# whatever your inputs are
inputs = [x for x in range(4)]
manager = Manager()
global_dict = manager.dict()
job = [Process(target=update_dict, args=(global_dict, _input)) for _input in inputs]
_ = [p.start() for p in job]
_ = [p.join() for p in job]
[print(f"x") for x in global_dict.items()]
# N.B assumes numeric (sortable) keys:
# dictionary sorted by key -- OrderedDict(sorted(d.items()) also works
ordered_global_dict = OrderedDict(sorted(global_dict.items(), key=lambda t: t[0]))
print(ordered_global_dict.items())
# accessing the results dataframe from the dict
results_df_01 = ordered_global_dict[0]['results_df_1']
results_df_02 = ordered_global_dict[0]['results_df_2']
#output:
odict_items([(0, '1st df': 'result_df_1', '2nd df': 'result_df_2'), (1, '1st df': 'result_df_1', '2nd df': 'result_df_2'), (2, '1st df': 'result_df_1', '2nd df': 'result_df_2'), (3, '1st df': 'result_df_1', '2nd df': 'result_df_2')])
说明
好问题,尽管您要实现的目标有点模棱两可。例如,您从每个进程返回 my_df
,但错误地将 all 的输出数据帧分配给单个变量:
my_dict, my_df = Parallel(...
.
根据我的理解,我会回答,好像您需要my_function
来更新表单的全局字典:
key: secondary_key: <dataframe>
.
让我们一点一点地来看看。 这是我从related question on S.O 中找到的答案,我的答案基于:
在 (1) 启动每个进程和 (2) 必须跨多个进程复制 pandas.DataFrame (等)时,您会产生大量开销。如果您只需要并行填充字典,我建议您使用共享内存字典。如果没有键会被覆盖,那么这很容易,您不必担心锁。
这是他们提供的解决方案:
>>> from multiprocess import Process, Manager
>>>
>>> def f(d, x):
... d[x] = x**2
...
>>> manager = Manager()
>>> d = manager.dict()
>>> job = [Process(target=f, args=(d, i)) for i in range(5)]
>>> _ = [p.start() for p in job]
>>> _ = [p.join() for p in job]
>>> print d
0: 0, 1: 1, 2: 4, 3: 9, 4: 16
只需进行少量修改,我们就可以轻松地重新调整此代码的用途以供您使用。
def f(d, x):
d[x] = x**2
变成(使用你的变量名)。
def update_dict(my_dict, key):
# Insert your DataFrame calculations here!
my_dict[key] = '1st df': 'result_df_1',
'2nd df': 'result_df_2'
return
而剩下的代码变成:
inputs = [x for x in range(4)]
manager = Manager()
global_dict = manager.dict()
job = [Process(target=update_dict, args=(global_dict, _input)) for _input in inputs]
_ = [p.start() for p in job]
_ = [p.join() for p in job]
[print(f"x") for x in global_dict.items()]
打印如下:
#python3.9
(1, '1st df': 'result_df_1', '2nd df': 'result_df_2')
(0, '1st df': 'result_df_1', '2nd df': 'result_df_2')
(2, '1st df': 'result_df_1', '2nd df': 'result_df_2')
(3, '1st df': 'result_df_1', '2nd df': 'result_df_2')
注意这里的字典是无序的。最后一步是让我们订购您的字典。假设您的键是整数,您可以使用:
from collections import OrderedDict
# dictionary sorted by key -- OrderedDict(sorted(d.items()) also works
ordered_global_dict = OrderedDict(sorted(global_dict.items())
【讨论】:
谢谢乔!我实际上正在考虑做其他事情,但这教会了我很多。 很高兴我能帮上忙以上是关于从并行化函数返回数据帧字典?的主要内容,如果未能解决你的问题,请参考以下文章