将函数应用于 Dask 中的分组数据框:如何将分组的数据框指定为函数中的参数?

Posted

技术标签:

【中文标题】将函数应用于 Dask 中的分组数据框:如何将分组的数据框指定为函数中的参数?【英文标题】:Apply function to grouped data frame in Dask: How do you specify the grouped Dataframe as argument in the function? 【发布时间】:2018-08-27 15:46:32 【问题描述】:

我有一个按索引 (first_name) 分组的 dask dataframe

import pandas as pd
import numpy as np

from multiprocessing import cpu_count

from dask import dataframe as dd
from dask.multiprocessing import get 
from dask.distributed import Client


NCORES = cpu_count()
client = Client()

entities = pd.DataFrame('first_name':['Jake','John','Danae','Beatriz', 'Jacke', 'Jon'],'last_name': ['Del Toro', 'Foster', 'Smith', 'Patterson', 'Toro', 'Froster'], 'ID':['X','U','X','Y', '12','13'])

df = dd.from_pandas(entities, npartitions=NCORES)
df = client.persist(df.set_index('first_name'))

(显然entities在现实生活中是几千行)

我想将用户定义的函数应用于每个分组的数据帧。我想将每一行与组中的所有其他行进行比较(类似于Pandas compare each row with all rows in data frame and save results in list for each row)。

以下是我尝试应用的功能:

def contraster(x, DF):
    matches = DF.apply(lambda row: fuzz.partial_ratio(row['last_name'], x) >= 50, axis = 1) 
    return [i for i, x in enumerate(matches) if x]

对于测试entities 数据框,您可以照常应用该函数:

entities.apply(lambda row: contraster(row['last_name'], entities), axis =1)

而预期的结果是:

Out[35]: 
0    [0, 4]
1    [1, 5]
2       [2]
3       [3]
4    [0, 4]
5    [1, 5]
dtype: object

entities 很大时,解决方案是使用dask。注意contraster函数中的DF必须是分组数据框。

我正在尝试使用以下内容:

df.groupby('first_name').apply(func=contraster, args=????)

但我应该如何指定分组数据框(即contraster 中的DF?)

【问题讨论】:

嗨,你能提供entities的样本吗? 我刚刚编辑了这个问题。谢谢@mortysporty 你好。您能否澄清一下...当您按名字分组时。这样做的目的是什么?例如,如果您有 1000 个名为 Jane 的人,他们的姓氏不同且相似,您希望得到什么输出?您想比较名字相同和姓氏相似的每个人吗? 我试图解决的问题是“重复数据删除”一种特殊类型的“记录链接”。将所有行与以二次方增长的所有行进行比较。所以是不可行的。标准方法是“阻塞”,即将记录分成块,只在块内进行比较。为了这个问题,阻止一个确切的列是一种简化。 你能让实体成为全局变量吗?那么你在使用apply的时候就不需要传递任何东西了。 【参考方案1】:

您提供给 groupby-apply 的函数应将 Pandas 数据帧或系列作为输入,理想情况下返回一个(或标量值)作为输出。额外的参数很好,但它们应该是次要的,而不是第一个参数。这在 Pandas 和 Dask 数据帧中都是一样的。

def func(df, x=None):
    # do whatever you want here
    # the input to this function will have all the same first name
    return pd.DataFrame('x': [x] * len(df),
                         'count': len(df),
                         'first_name': df.first_name)

然后您可以正常调用 df.groupby

import pandas as pd
import dask.dataframe as dd

df = pd.DataFrame('first_name':['Alice', 'Alice', 'Bob'],
                   'last_name': ['Adams', 'Jones', 'Smith'])

ddf = dd.from_pandas(df, npartitions=2)

ddf.groupby('first_name').apply(func, x=3).compute()

这将在 pandas 或 dask.dataframe 中产生相同的输出

   count first_name  x
0      2      Alice  3
1      2      Alice  3
2      1        Bob  3

【讨论】:

如果我尝试运行您的示例,我会收到以下错误:--------------------------------------------------------------------------- TypeError Traceback (most recent call last) <ipython-input-2-d3e0176b6e3f> in <module>() 7 ddf = dd.from_pandas(df, npartitions=2) 8 ----> 9 ddf.groupby('first_name').apply(func, x=3).compute() TypeError: apply() got an unexpected keyword argument 'x' 尝试升级到较新版本的 dask 我正在运行最后一个版本的 dask (0.17.2) 以上对我来说在 0.17.2 上运行良好。您可以使用导致失败的最小环境提出问题。 我已经验证这适用于全新安装。我用conda install -n myenv dask=0.17.2 ipython 创建了一个新环境并得到了想要的结果。【参考方案2】:

稍微猜测一下,我认为以下就是您所追求的。

def mapper(d):

    def contraster(x, DF=d):
        matches = DF.apply(lambda row: fuzz.partial_ratio(row['last_name'], x) >= 50, axis = 1)
        return [d.ID.iloc[i] for i, x in enumerate(matches) if x]
    d['out'] = d.apply(lambda row: 
        contraster(row['last_name']), axis =1)
    return d

df.groupby('first_name').apply(mapper).compute()

应用于您的数据,您会得到:

   ID first_name  last_name   out
2   X      Danae      Smith   [X]
4  12      Jacke       Toro  [12]
0   X       Jake   Del Toro   [X]
1   U       John     Foster   [U]
5  13        Jon    Froster  [13]
3   Y    Beatriz  Patterson   [Y]

即,因为您按 first_name 分组,所以每个组仅包含一个项目,该项目仅与自身匹配。

但是,如果您在多行中有一些 first_name 值,您将获得匹配项:

entities = pd.DataFrame(
    'first_name':['Jake','Jake', 'Jake', 'John'],
     'last_name': ['Del Toro', 'Toro', 'Smith'
                   'Froster'],
     'ID':['Z','U','X','Y'])

输出:

  ID first_name last_name     out
0  Z       Jake  Del Toro  [Z, U]
1  U       Jake      Toro  [Z, U]
2  X       Jake     Smith     [X]
3  Y       John   Froster     [Y]

如果您不需要 first_name 上的 exact 匹配项,那么您可能需要按 first_name 排序/设置索引,并以类似的方式使用 map_partitions。在这种情况下,您需要修改您的问题。

【讨论】:

谢谢!,这是正确的答案。很抱歉我已经分配了赏金。

以上是关于将函数应用于 Dask 中的分组数据框:如何将分组的数据框指定为函数中的参数?的主要内容,如果未能解决你的问题,请参考以下文章

如何将多个功能应用于dask数据帧的多个块?

使用 r,我如何将不是基本聚合函数的函数 (moments::skewness) 应用于分组表?

如何使用 Groupby 将 Pandas TA 应用于数据框

如何快速将数据框中的时间列分组为间隔?

如何将数据框从长转换为宽,索引中的值按年份分组?

有效地将函数并行应用于分组的 pandas DataFrame