PySpark 中的 Groupby 和 UDF/UDAF,同时保持 DataFrame 结构

Posted

技术标签:

【中文标题】PySpark 中的 Groupby 和 UDF/UDAF,同时保持 DataFrame 结构【英文标题】:Groupby and UDF/UDAF in PySpark while maintaining DataFrame structure 【发布时间】:2018-10-28 13:02:18 【问题描述】:

我是 PySpark 的新手,正在为简单的数据框操作而苦苦挣扎。我有一个类似的数据框:

product    period     rating   product_Desc1   product_Desc2 ..... more columns 
a            1         60          foo              xx
a            2         70          foo              xx
a            3         59          foo              xx
b            1         50          bar              yy
b            2         55          bar              yy
c            1         90          foo bar          xy
c            2         100         foo bar          xy

我想按产品分组,添加列以计算评分的算术、几何和谐波平均值同时保持数据框中的其余列,这些列在每个产品中都是一致的。

我尝试通过结合内置函数和 UDF 来实现这一点。例如:

a_means = df.groupBy("product").agg(mean("rating").alias("a_mean")
g_means = df.groupBy("product").agg(udf_gmean("rating").alias("g_mean")

地点:

def g_mean(x):
  gm = reduce(mul,x)**(1/len(x))
  return gm

udf_gmean = udf(g_mean, FloatType())

然后,我会将 a_means 和 g_means 输出与产品上的原始数据框连接起来,并删除重复项。但是,此方法返回错误,对于 g_means,说明 groupBy 中不涉及“评级”,也不是用户定义的聚合函数....

我也尝试过使用 SciPy 的 gmean 模块,但我收到的错误消息指出 ufunc 'log' 不适合输入类型,尽管据我所知,所有评级列都是整数类型。

网站上有类似的问题,但我找不到任何东西似乎可以解决我遇到的这个问题。我真的很感激你的帮助,因为它让我发疯了!

在此先感谢,如果我提供的信息还不够,我今天应该能够迅速提供任何进一步的信息。

值得注意的是,为了提高效率,我无法像使用 Pandas 数据框那样简单地转换为 Pandas 和转换...而且我使用的是 Spark 2.2 并且无法更新!

【问题讨论】:

您在函数 g_mean 中所指的“评级”是什么,未定义 道歉;现在显示了我的代码中的实际功能。 Applying UDFs on GroupedData in PySpark (with functioning python example)的可能重复 这是一个重复的问题,虽然我被限制使用没有相同功能的 Spark 2.2。 【参考方案1】:

这样的事情怎么样

from pyspark.sql.functions import avg
df1 = df.select("product","rating").rdd.map(lambda x: (x[0],(1.0,x[1]*1.0))).reduceByKey(lambda x,y: (x[0]+y[0], x[1]*y[1])).toDF(['product', 'g_mean'])
gdf = df1.select(df1['product'],pow(df1['g_mean._2'],1.0/df1['g_mean._1']).alias("rating_g_mean"))
display(gdf)

+-------+-----------------+
|product|    rating_g_mean|
+-------+-----------------+
|      a|62.81071936240795|
|      b|52.44044240850758|
|      c|94.86832980505137|
+-------+-----------------+


df1 = df.withColumn("h_mean", 1.0/df["rating"])
hdf = df1.groupBy("product").agg(avg(df1["rating"]).alias("rating_mean"), (1.0/avg(df1["h_mean"])).alias("rating_h_mean"))
sdf = hdf.join(gdf, ['product'])
display(sdf)

+-------+-----------+-----------------+-----------------+
|product|rating_mean|    rating_h_mean|    rating_g_mean|
+-------+-----------+-----------------+-----------------+
|      a|       63.0|62.62847514743051|62.81071936240795|
|      b|       52.5|52.38095238095239|52.44044240850758|
|      c|       95.0|94.73684210526315|94.86832980505137|
+-------+-----------+-----------------+-----------------+


fdf = df.join(sdf, ['product'])
display(fdf.sort("product"))


+-------+------+------+-------------+-------------+-----------+-----------------+-----------------+
|product|period|rating|product_Desc1|product_Desc2|rating_mean|    rating_h_mean|    rating_g_mean|
+-------+------+------+-------------+-------------+-----------+-----------------+-----------------+
|      a|     3|    59|          foo|           xx|       63.0|62.62847514743051|62.81071936240795|
|      a|     2|    70|          foo|           xx|       63.0|62.62847514743051|62.81071936240795|
|      a|     1|    60|          foo|           xx|       63.0|62.62847514743051|62.81071936240795|
|      b|     2|    55|          bar|           yy|       52.5|52.38095238095239|52.44044240850758|
|      b|     1|    50|          bar|           yy|       52.5|52.38095238095239|52.44044240850758|
|      c|     2|   100|      foo bar|           xy|       95.0|94.73684210526315|94.86832980505137|
|      c|     1|    90|      foo bar|           xy|       95.0|94.73684210526315|94.86832980505137|
+-------+------+------+-------------+-------------+-----------+-----------------+-----------------+

【讨论】:

这太棒了,很明显很有效,所以谢谢你......我实际上已经以不同的方式解决了这个问题......当我有机会时,我会发布作为答案。 酷,我有兴趣查看解决方案。有空请分享 其实想多了,我们不必去 RDD 可能可以使用产品属性的力量来计算 df 中使用 select 和 group by 我想的几何平均值【参考方案2】:

比上面使用 gapply 的方法稍微简单一些:

from spark_sklearn.group_apply import gapply
from scipy.stats.mstats import gmean
import pandas as pd

def g_mean(_, vals):
    gm = gmean(vals["rating"])
    return pd.DataFrame(data=[gm])

geoSchema = StructType().add("geo_mean", FloatType())

gMeans = gapply(df.groupby("product"), g_mean, geoSchema)

这将返回一个数据框,然后可以使用以下方法对其进行排序并连接到原始数据框:

df_withGeo = df.join(gMeans, ["product"])

对要添加到原始 DataFrame 的任何聚合类型函数列重复该过程...

【讨论】:

不错。看起来您并不期望您的数据变大,在这种情况下这要简单得多。 几点: 1. 您在g_mean 函数中对列名进行硬编码。所以你不能在全球范围内将它用于其他数据帧? 2. 您的函数使用 pandas 数据框,如果您的数据集非常小,这很好。它不适用于大型数据集。

以上是关于PySpark 中的 Groupby 和 UDF/UDAF,同时保持 DataFrame 结构的主要内容,如果未能解决你的问题,请参考以下文章

带有 UDF 的 PySpark 数据框

PySpark中pandas_udf的隐式模式?

如何在 Pyspark UDF 中返回双精度列表?

SparkSession 中的 udf 和 pyspark.sql.functions 中的 udf 有啥区别

Pyspark 中的 UDF 和 python 函数

pyspark 中的 Pandas UDF