如何在 python 中使用 Spark Data frame 和 GroupBy 派生 Percentile
Posted
技术标签:
【中文标题】如何在 python 中使用 Spark Data frame 和 GroupBy 派生 Percentile【英文标题】:How to derive Percentile using Spark Data frame and GroupBy in python 【发布时间】:2016-05-03 09:12:14 【问题描述】:我有一个包含 Date
、Group
和 Price
列的 Spark 数据框。
我正在尝试为其中的 Price
列导出 percentile(0.6)
Python中的数据框。此外,我需要将输出添加为新列。
我试过下面的代码:
perudf = udf(lambda x: x.quantile(.6))
df1 = df.withColumn("Percentile", df.groupBy("group").agg("group"),perudf('price'))
但它抛出以下错误:
assert all(isinstance(c, Column) for c in exprs), "all exprs should be Column"
AssertionError: all exprs should be Column
【问题讨论】:
【参考方案1】:您可以使用 sql 使用“percentile_approx”。在 pyspark 中创建 UDF 很困难。
其他详情请参考此链接:https://mail-archives.apache.org/mod_mbox/spark-user/201510.mbox/%3CCALte62wQV68D6J87EVq6AD5-T3D0F3fHjuzs+1C5aCHOUUQS8w@mail.gmail.com%3E
【讨论】:
对于那些感兴趣/懒惰的人来说,from pyspark import SparkContext, HiveContext; sc = SparkContext(); hiveContext = HiveContext(sc); hiveContext.registerDataFrameAsTable(df, "df"); hiveContext.sql("SELECT percentile(price, 0.75) FROM df");
可以得到第 75 个百分位的价格。【参考方案2】:
您可以使用窗口函数,只需定义一个聚合窗口(您的案例中的所有数据),然后按百分位值过滤:
from pyspark.sql.window import Window
from pyspark.sql.functions import percent_rank
w = Window.orderBy(df.price)
df.select('price', percent_rank().over(w).alias("percentile"))\
.where('percentile == 0.6').show()
percent_rank
在pyspark.sql.functions 中可用
如果您愿意,可以使用databricks post 中的 SQL 接口
【讨论】:
我发现databricks 帖子很有用,谢谢!这是一个有效的链接:databricks.com/blog/2015/07/15/…【参考方案3】:我知道使用 RDD 获取每一行的百分位数的解决方案。首先,将您的 RDD 转换为 DataFrame:
# convert to rdd of dicts
rdd = df.rdd
rdd = rdd.map(lambda x: x.asDict())
然后,您可以计算每一行的百分位数:
column_to_decile = 'price'
total_num_rows = rdd.count()
def add_to_dict(_dict, key, value):
_dict[key] = value
return _dict
def get_percentile(x, total_num_rows):
_dict, row_number = x
percentile = x[1] / float(total_num_rows)
return add_to_dict(_dict, "percentile", percentile)
rdd_percentile = rdd.map(lambda d: (d[column_to_decile], d)) # make column_to_decile a key
rdd_percentile = rdd_percentile.sortByKey(ascending=False) # so 1st decile has largest
rdd_percentile = rdd_percentile.map(lambda x: x[1]) # remove key
rdd_percentile = rdd_percentile.zipWithIndex() # append row number
rdd_percentile = rdd_percentile.map(lambda x: get_percentile(x, total_num_rows))
最后,转换回 DataFrame:
df = sqlContext.createDataFrame(rdd_percentile)
要获得与 0.6 最接近的百分位数的行,您可以执行以下操作:
from pyspark.sql.types import *
from pyspark.sql.functions import udf
def get_row_with_percentile(df, percentile):
func = udf(lambda x: abs(x), DoubleType())
df_distance = df.withColumn("distance", func(df['percentile'] - percentile))
min_distance = df_distance.groupBy().min('distance').collect()[0]['min(distance)']
result = df_distance.filter(df_distance['distance'] == min_distance)
result.drop("distance")
return result
get_row_with_percentile(df, 0.6).show()
【讨论】:
以上是关于如何在 python 中使用 Spark Data frame 和 GroupBy 派生 Percentile的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Spark 中使用 Python 查找 DataFrame 中的分区数以及如何在 Spark 中使用 Python 在 DataFrame 中创建分区
python core-spark-aggregating-data.py
python core-spark-filtering-data.py