在 Spark SQL 中查找多个双数据类型列的中位数

Posted

技术标签:

【中文标题】在 Spark SQL 中查找多个双数据类型列的中位数【英文标题】:Find median in spark SQL for multiple double datatype columns 【发布时间】:2016-12-30 23:38:27 【问题描述】:

我需要找到多个双数据类型列的中位数。请求建议以找到正确的方法。

下面是我的一列示例数据集。我希望样本的中值返回为 1。

  scala> sqlContext.sql("select num from test").show();
+---+
|num|
+---+
|0.0|
|0.0|
|1.0|
|1.0|
|1.0|
|1.0|
+---+

我尝试了以下选项

1) Hive UDAF 百分位数,它仅适用于 BigInt。

2) Hive UDAT percentile_approx,但它没有按预期工作(返回 0.25 vs 1)。

sqlContext.sql("select percentile_approx(num,0.5) from test").show();

+----+
| _c0|
+----+
|0.25|
+----+

3) Spark 窗口函数 percent_rank- 查找中位数我看到的方法是查找所有高于 0.5 的 percent_rank 并选择 max percent_rank 的相应 num 值。但它并非在所有情况下都有效,尤其是当我有记录计数时,在这种情况下,中位数是排序分布中中间值的平均值。

同样在 percent_rank 中,因为我必须找到多列的中位数,所以我必须在不同的数据帧中计算它,这对我来说是一个不太复杂的方法。如果我的理解不正确,请纠正我。

+---+-------------+
|num|percent_rank |
+---+-------------+
|0.0|0.0|
|0.0|0.0|
|1.0|0.4|
|1.0|0.4|
|1.0|0.4|
|1.0|0.4|
+---+---+

【问题讨论】:

【参考方案1】:

出于好奇,您使用的是哪个版本的 Apache Spark? Apache Spark 2.0+ 中有一些修复,其中包括对approxQuantile 的更改。

如果我要运行下面的 pySpark 代码 sn-p:

rdd = sc.parallelize([[1, 0.0], [1, 0.0], [1, 1.0], [1, 1.0], [1, 1.0], [1, 1.0]])
df = rdd.toDF(['id', 'num'])
df.createOrReplaceTempView("df")

median 计算使用 approxQuantile 为:

df.approxQuantile("num", [0.5], 0.25)

spark.sql("select percentile_approx(num, 0.5) from df").show()

结果是:

Spark 2.0.0:0.25 Spark 2.0.1:1.0 Spark 2.1.0:1.0

请注意,因为这些是近似数字(通过approxQuantile),但通常这应该可以正常工作。如果您需要准确的中位数,一种方法是使用numpy.median。下面的代码 sn-p 是根据 gench 对How to find the median in Apache Spark with Python Dataframe API? 的 SO 响应为这个 df 示例更新的:

from pyspark.sql.types import *
import pyspark.sql.functions as F
import numpy as np

def find_median(values):
    try:
        median = np.median(values) #get the median of values in a list in each row
        return round(float(median),2)
    except Exception:
        return None #if there is anything wrong with the given values

median_finder = F.udf(find_median,FloatType())

df2 = df.groupBy("id").agg(F.collect_list("num").alias("nums"))
df2 = df2.withColumn("median", median_finder("nums"))

# print out
df2.show()

输出:

+---+--------------------+------+
| id|                nums|median|
+---+--------------------+------+
|  1|[0.0, 0.0, 1.0, 1...|   1.0|
+---+--------------------+------+

更新:使用 RDD 的 Spark 1.6 Scala 版本

如果您使用的是 Spark 1.6,您可以通过 Eugene Zhulenev 的回复 How can I calculate the exact median with Apache Spark 使用 Scala 代码计算 median。下面是修改后的代码,适用于我们的示例。

import org.apache.spark.SparkContext._

  val rdd: RDD[Double] = sc.parallelize(Seq((0.0), (0.0), (1.0), (1.0), (1.0), (1.0)))

  val sorted = rdd.sortBy(identity).zipWithIndex().map 
    case (v, idx) => (idx, v)
  

  val count = sorted.count()

  val median: Double = if (count % 2 == 0) 
    val l = count / 2 - 1
    val r = l + 1
    (sorted.lookup(l).head + sorted.lookup(r).head).toDouble / 2
   else sorted.lookup(count / 2).head.toDouble

输出:

// output
import org.apache.spark.SparkContext._
rdd: org.apache.spark.rdd.RDD[Double] = ParallelCollectionRDD[227] at parallelize at <console>:34
sorted: org.apache.spark.rdd.RDD[(Long, Double)] = MapPartitionsRDD[234] at map at <console>:36
count: Long = 6
median: Double = 1.0

注意,这是使用 RDDs 计算准确的中位数 - 即您需要将 DataFrame 列转换为 RDD 才能执行此计算。

【讨论】:

感谢丹尼的指点。我正在为应用程序使用 Spark 1.6.0 和 Scala (2.10.5)。我认为 percentile_approx 和 approxQuantile 不在我的选择范围内。你知道任何 Scala Dataframe API,比如用于计算中位数的 numpy。 知道了 - 我将更新我的答案以包含一个通过 Scala 代码适用于 Spark 1.6 的答案。 越来越棘手了。在将数据与数据集中的其他列分组后,我需要找到多个双数据类型列的中位数。现在使用此信息编辑问题。感谢您的帮助。 很高兴能帮助您解决这个问题,但您可以使用与该问题对应的样本数据集创建一个新问题,而不是更改原始问题并将其标记为未回答? 将查询作为新问题发布。 ***.com/questions/41431270/…

以上是关于在 Spark SQL 中查找多个双数据类型列的中位数的主要内容,如果未能解决你的问题,请参考以下文章

使用 spark.read.format("com.crealytics.spark.excel") 的 inferSchema 推断日期类型列的双精度

获取 Apache spark 数据集中包含的列的列数据类型

Spark SQL讲解

检查列数据类型并仅对 Spark SQL 中的整数和小数执行 SQL

如何提高具有数组列的 DataFrame 的 Spark SQL 查询性能?

更改 SQL Server DB 中列的数据类型时出错 - Java