以动态方式找到Spark-Scala中的百分位数
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了以动态方式找到Spark-Scala中的百分位数相关的知识,希望对你有一定的参考价值。
我正在尝试使用如下所示的Window函数在列上执行百分位数。我已引用here在组上使用ApproxQuantile定义。
val df1=Seq((1,10.0),(1,20.0),(1,40.6),(1,15.6),(1,17.6),(1,25.6),(1,39.6),(2,20.5),(2,70.3),(2,69.4),(2,74.4),(2,45.4),(3,60.6),(3,80.6),(4,30.6),(4,90.6))toDF("ID","Count")
val idBucketMapping=Seq((1,4),(2,3),(3,2),(4,2))toDF("ID","Bucket")
//jpp
import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile
import org.apache.spark.sql.expressions.Window
object PercentileApprox {
def percentile_approx(col: Column, percentage: Column, accuracy: Column): Column = {
val expr = new ApproximatePercentile( col.expr, percentage.expr, accuracy.expr ).toAggregateExpression
new Column(expr)
}
def percentile_approx(col: Column, percentage: Column): Column = percentile_approx(col, percentage, lit(ApproximatePercentile.DEFAULT_PERCENTILE_ACCURACY))
}
import PercentileApprox._
var res = df1.withColumn("percentile",percentile_approx(col("count"), typedLit(doBucketing(2) )).over(Window.partitionBy("ID")))
def doBucketing(bucket_size : Int) = (1 until bucket_size).scanLeft(0d)((a, _) => a + (1 / bucket_size.toDouble))
scala> df1.show
+---+-----+
| ID|Count|
+---+-----+
| 1| 10.0|
| 1| 20.0|
| 1| 40.6|
| 1| 15.6|
| 1| 17.6|
| 1| 25.6|
| 1| 39.6|
| 2| 20.5|
| 2| 70.3|
| 2| 69.4|
| 2| 74.4|
| 2| 45.4|
| 3| 60.6|
| 3| 80.6|
| 4| 30.6|
| 4| 90.6|
+---+-----+
scala> idBucketMapping.show
+---+------+
| ID|Bucket|
+---+------+
| 1| 4|
| 2| 3|
| 3| 2|
| 4| 2|
+---+------+
scala> res.show
+---+-----+------------------+
| ID|Count| percentile|
+---+-----+------------------+
| 1| 10.0|[10.0, 20.0, 40.6]|
| 1| 20.0|[10.0, 20.0, 40.6]|
| 1| 40.6|[10.0, 20.0, 40.6]|
| 1| 15.6|[10.0, 20.0, 40.6]|
| 1| 17.6|[10.0, 20.0, 40.6]|
| 1| 25.6|[10.0, 20.0, 40.6]|
| 1| 39.6|[10.0, 20.0, 40.6]|
| 3| 60.6|[60.6, 60.6, 80.6]|
| 3| 80.6|[60.6, 60.6, 80.6]|
| 4| 30.6|[30.6, 30.6, 90.6]|
| 4| 90.6|[30.6, 30.6, 90.6]|
| 2| 20.5|[20.5, 69.4, 74.4]|
| 2| 70.3|[20.5, 69.4, 74.4]|
| 2| 69.4|[20.5, 69.4, 74.4]|
| 2| 74.4|[20.5, 69.4, 74.4]|
| 2| 45.4|[20.5, 69.4, 74.4]|
+---+-----+------------------+
到目前为止,一切都很好,逻辑很简单。但是我需要以动态的方式取得结果。这意味着此函数的参数doBucketing(2)应该基于ID值从idBucketMapping中获取。
这对我来说似乎有点棘手。这有可能吗?
预期输出-这意味着百分位存储桶基于-idBucketMapping数据帧。
+---+-----+------------------------+
|ID |Count|percentile |
+---+-----+------------------------+
|1 |10.0 |[10.0, 15.6, 20.0, 39.6]|
|1 |20.0 |[10.0, 15.6, 20.0, 39.6]|
|1 |40.6 |[10.0, 15.6, 20.0, 39.6]|
|1 |15.6 |[10.0, 15.6, 20.0, 39.6]|
|1 |17.6 |[10.0, 15.6, 20.0, 39.6]|
|1 |25.6 |[10.0, 15.6, 20.0, 39.6]|
|1 |39.6 |[10.0, 15.6, 20.0, 39.6]|
|3 |60.6 |[60.6, 60.6] |
|3 |80.6 |[60.6, 60.6] |
|4 |30.6 |[30.6, 30.6] |
|4 |90.6 |[30.6, 30.6] |
|2 |20.5 |[20.5, 45.4, 70.3] |
|2 |70.3 |[20.5, 45.4, 70.3] |
|2 |69.4 |[20.5, 45.4, 70.3] |
|2 |74.4 |[20.5, 45.4, 70.3] |
|2 |45.4 |[20.5, 45.4, 70.3] |
+---+-----+------------------------+
答案
percentile_approx取百分比和准确度。看来,它们都必须是常量文字。因此,我们无法在运行时使用动态计算的percentile_approx
和percentage
计算accuracy
。
以上是关于以动态方式找到Spark-Scala中的百分位数的主要内容,如果未能解决你的问题,请参考以下文章
如何在 numpy / scipy 中获取特定百分位数的索引?