Spark SQL functions.scala 源码解析Aggregate functions(基于 Spark 3.3.0)

Posted Shockang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark SQL functions.scala 源码解析Aggregate functions(基于 Spark 3.3.0)相关的知识,希望对你有一定的参考价值。

前言

本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

本专栏目录结构和参考文献请见1000个问题搞定大数据技术体系

目录

Spark SQL functions.scala 源码解析(一)Sort functions (基于 Spark 3.3.0)

Spark SQL functions.scala 源码解析(二)Aggregate functions(基于 Spark 3.3.0)

Spark SQL functions.scala 源码解析(三)Window functions (基于 Spark 3.3.0)

Spark SQL functions.scala 源码解析(四)Non-aggregate functions (基于 Spark 3.3.0)

Spark SQL functions.scala 源码解析(五)Math Functions (基于 Spark 3.3.0)

Spark SQL functions.scala 源码解析(六)Misc functions (基于 Spark 3.3.0)

Spark SQL functions.scala 源码解析(七)String functions (基于 Spark 3.3.0)

Spark SQL functions.scala 源码解析(八)DateTime functions (基于 Spark 3.3.0)

Spark SQL functions.scala 源码解析(九)Collection functions (基于 Spark 3.3.0)

Spark SQL functions.scala 源码解析(十)Partition transform functions(基于 Spark 3.3.0)

Spark SQL functions.scala 源码解析(十一)Scala UDF functions(基于 Spark 3.3.0)

Spark SQL functions.scala 源码解析(十二)Java UDF functions(基于 Spark 3.3.0)

正文

approx_count_distinct

  /**
   * @group agg_funcs
   * @since 1.3.0
   */
  @deprecated("Use approx_count_distinct", "2.1.0")
  def approxCountDistinct(e: Column): Column = approx_count_distinct(e)

  /**
   * @group agg_funcs
   * @since 1.3.0
   */
  @deprecated("Use approx_count_distinct", "2.1.0")
  def approxCountDistinct(columnName: String): Column = approx_count_distinct(columnName)

  /**
   * @group agg_funcs
   * @since 1.3.0
   */
  @deprecated("Use approx_count_distinct", "2.1.0")
  def approxCountDistinct(e: Column, rsd: Double): Column = approx_count_distinct(e, rsd)

  /**
   * @group agg_funcs
   * @since 1.3.0
   */
  @deprecated("Use approx_count_distinct", "2.1.0")
  def approxCountDistinct(columnName: String, rsd: Double): Column = 
    approx_count_distinct(Column(columnName), rsd)
  

  /**
   * 聚合函数:返回一组数据中不同项目的大致数量。
   *
   * @group agg_funcs
   * @since 2.1.0
   */
  def approx_count_distinct(e: Column): Column = withAggregateFunction 
    HyperLogLogPlusPlus(e.expr)
  

  /**
   * 聚合函数:返回一组数据中不同项目的大致数量。
   *
   * @group agg_funcs
   * @since 2.1.0
   */
  def approx_count_distinct(columnName: String): Column = approx_count_distinct(column(columnName))

  /**
   * 聚合函数:返回一组数据中不同项目的大致数量。
   *
   * @param rsd maximum 允许的最大相对标准偏差(默认值 = 0.05)
   *
   * @group agg_funcs
   * @since 2.1.0
   */
  def approx_count_distinct(e: Column, rsd: Double): Column = withAggregateFunction 
    HyperLogLogPlusPlus(e.expr, rsd, 0, 0)
  

  /**
   *聚合函数:返回一组数据中不同项目的大致数量。
   *
   * @param rsd maximum 允许的最大相对标准偏差(默认值 = 0.05)
   *
   * @group agg_funcs
   * @since 2.1.0
   */
  def approx_count_distinct(columnName: String, rsd: Double): Column = 
    approx_count_distinct(Column(columnName), rsd)
  

avg

  /**
   * 聚合函数:返回一组值中的平均值。
   *
   * @group agg_funcs
   * @since 1.3.0
   */
  def avg(e: Column): Column = withAggregateFunction  Average(e.expr) 

  /**
   * 聚合函数:返回一组值中的平均值。
   *
   * @group agg_funcs
   * @since 1.3.0
   */
  def avg(columnName: String): Column = avg(Column(columnName))

collect_list/collect_set

  /**
   * 聚合函数:返回具有重复项的对象列表。
   * 注意:该函数是不确定的,因为收集结果的顺序取决于行的顺序,这在 shuffle 之后可能是不确定的。
   *
   * @group agg_funcs
   * @since 1.6.0
   */
  def collect_list(e: Column): Column = withAggregateFunction  CollectList(e.expr) 

  /**
   * 聚合函数:返回具有重复项的对象列表。
   * 注意:该函数是不确定的,因为收集结果的顺序取决于行的顺序,这在 shuffle 之后可能是不确定的。
   *
   * @group agg_funcs
   * @since 1.6.0
   */
  def collect_list(columnName: String): Column = collect_list(Column(columnName))

  /**
   * 聚合函数:返回消除了重复项的对象列表。
   * 注意:该函数是不确定的,因为收集结果的顺序取决于行的顺序,这在 shuffle 之后可能是不确定的。
   *
   * @group agg_funcs
   * @since 1.6.0
   */
  def collect_set(e: Column): Column = withAggregateFunction  CollectSet(e.expr) 

  /**
   * 聚合函数:返回消除了重复项的对象列表。
   * 注意:该函数是不确定的,因为收集结果的顺序取决于行的顺序,这在 shuffle 之后可能是不确定的。
   *
   * @group agg_funcs
   * @since 1.6.0
   */
  def collect_set(columnName: String): Column = collect_set(Column(columnName))

corr

  /**
   * 聚合函数:返回两列的皮尔逊相关系数。
   *
   * @group agg_funcs
   * @since 1.6.0
   */
  def corr(column1: Column, column2: Column): Column = withAggregateFunction 
    Corr(column1.expr, column2.expr)
  

  /**
   * 聚合函数:返回两列的皮尔逊相关系数。
   *
   * @group agg_funcs
   * @since 1.6.0
   */
  def corr(columnName1: String, columnName2: String): Column = 
    corr(Column(columnName1), Column(columnName2))
  

相似度计算的时候经常会用到皮尔逊相关系数(Pearson Correlation Coefficient)。
皮尔逊系数可以看作两组数据的向量夹角的余弦。

count/count_distinct

  /**
   * 聚合函数:返回一组中的项目数。
   *
   * @group agg_funcs
   * @since 1.3.0
   */
  def count(e: Column): Column = withAggregateFunction 
    e.expr match 
      // Turn count(*) into count(1)
      case s: Star => Count(Literal(1))
      case _ => Count(e.expr)
    
  

  /**
   * 聚合函数:返回一组中的项目数。
   *
   * @group agg_funcs
   * @since 1.3.0
   */
  def count(columnName: String): TypedColumn[Any, Long] =
    count(Column(columnName)).as(ExpressionEncoder[Long]())

  /**
   * 聚合函数:返回一组中不同项目的数量。
   *
   * 又称 `count_distinct`,鼓励直接使用 `count_distinct` 。
   *
   * @group agg_funcs
   * @since 1.3.0
   */
  @scala.annotation.varargs
  def countDistinct(expr: Column, exprs: Column*): Column = count_distinct(expr, exprs: _*)

  /**
   * 聚合函数:返回一组中不同项目的数量。
   *
   * 又称 `count_distinct`,鼓励直接使用 `count_distinct` 。
   *
   * @group agg_funcs
   * @since 1.3.0
   */
  @scala.annotation.varargs
  def countDistinct(columnName: String, columnNames: String*): Column =
    count_distinct(Column(columnName), columnNames.map(Column.apply) : _*)

  /**
   * 聚合函数:返回一组中不同项目的数量。
   *
   * @group agg_funcs
   * @since 3.2.0
   */
  @scala.annotation.varargs
  def count_distinct(expr: Column, exprs: Column*): Column =
    // 对于像countDistinct("")这样的用法,我们应该让分析器扩展star和resolve函数。
    Column(UnresolvedFunction("count", (expr +: exprs).map(_.expr), isDistinct = true))

covar_pop/covar_samp

  /**
   * 聚合函数:返回两列的总体协方差。
   *
   * @group agg_funcs
   * @since 2.0.0
   */
  def covar_pop(column1: Column, column2: Column): Column = withAggregateFunction 
    CovPopulation(column1.expr, column2.expr)
  

  /**
   *  聚合函数:返回两列的总体协方差。
   *
   * @group agg_funcs
   * @since 2.0.0
   */
  def covar_pop(columnName1: String, columnName2: String): Column = 
    covar_pop(Column(columnName1), Column(columnName2))
  

  /**
   *  聚合函数:返回两列的样本协方差。
   *
   * @group agg_funcs
   * @since 2.0.0
   */
  def covar_samp(column1: Column, column2: Column): Column = withAggregateFunction 
    CovSample(column1.expr, column2.expr)
  

  /**
   * 聚合函数:返回两列的样本协方差。
   *
   * @group agg_funcs
   * @since 2.0.0
   */
  def covar_samp(columnName1: String, columnName2: String): Column = 
    covar_samp(Column(columnName1), Column(columnName2))
  

first

  /**
   * 聚合函数:返回一组中的第一个值。
   * 
   * 默认情况下,该函数返回它看到的第一个值。 当 ignoreNulls 设置为 true 时,它​​将返回它看到的第一个非空
   * 值。 如果所有值都为空,则返回空值。
   * 注意:
   * 该函数是不确定的,因为它的结果取决于行的顺序,这在 shuffle 之后可能是不确定的。
   *
   * @group agg_funcs
   * @since 2.0.0
   */
  def first(e: Column, ignoreNulls: Boolean): Column = withAggregateFunction 
    First(e.expr, ignoreNulls)
  

  /**
   * 聚合函数:返回一组中的第一个值。
   * 
   * 默认情况下,该函数返回它看到的第一个值。 当 ignoreNulls 设置为 true 时,它​​将返回它看到的第一个非空
   * 值。 如果所有值都为空,则返回空值。
   * 注意:
   * 该函数是不确定的,因为它的结果取决于行的顺序,这在 shuffle 之后可能是不确定的。
   *
   * @group agg_funcs
   * @since 2.0.0
   */
  def first(columnName: String, ignoreNulls: Boolean): Column = 
    first(Column(columnName), ignoreNulls)
  

  /**
   * 聚合函数:返回一组中的第一个值。
   * 
   * 默认情况下,该函数返回它看到的第一个值。 当 ignoreNulls 设置为 true 时,它​​将返回它看到的第一个非空
   * 值。 如果所有值都为空,则返回空值。
   * 注意:
   * 该函数是不确定的,因为它的结果取决于行的顺序,这在 shuffle 之后可能是不确定的。
   *
   * @group agg_funcs
   * @since 1.3.0
   */
  def first(e: Column): Column = first(e, ignoreNulls = false)

  /**
   * 聚合函数:返回一组中的第一个值。
   * 
   * 默认情况下,该函数返回它看到的第一个值。 当 ignoreNulls 设置为 true 时,它​​将返回它看到的第一个非空
   * 值。 如果所有值都为空,则返回空值。
   * 注意:
   * 该函数是不确定的,因为它的结果取决于行的顺序,这在 shuffle 之后可能是不确定的。
   *
   * @group agg_funcs
   * @since 1.3.0
   */
  def first(columnName: String): Column = first(Column(columnName))

grouping/grouping_id

  /**
   * 聚合函数:指示是否聚合GROUP BY列表中的指定列,返回1表示聚合,返回0表示结果集中未聚合。
   *
   * @group agg_funcs
   * @since 2.0.0
   */
  def grouping(e: Column): Column = Column(Grouping(e.expr))

  /**
   * 聚合函数:指示是否聚合GROUP BY列表中的指定列,返回1表示聚合,返回0表示结果集中未聚合。
   *
   * @group agg_funcs
   * @since 2.0.0
   */
  def grouping(columnName: String): Column = grouping(Column(columnName))

  /**
   * 聚合函数:返回分组级别,等于
   *
   * 
   *   (grouping(c1) <<; (n-1)) + (grouping(c2) <<; (n-2)) + ... + grouping(cn)
   * 
   *
   * @注意 列的列表应与分组列完全匹配,或为空(表示所有分组列)。
   * 
   * grouping columns).
   *
   * @group agg_funcs
   * @since 2.0.0
   */
  def grouping_id(cols: Column*): Column = Column(GroupingID(cols.map(_.expr)))

  /**
   * 聚合函数:返回分组级别,等于
   *
   * 
   *   (grouping(c1) <<; (n-1)) + (grouping(c2) <<; (n-2)) + ... + grouping(cn)
   * 
   *
   * @注意 列的列表应与分组列完全匹配,或为空(表示所有分组列)。
   *
   * @group agg_funcs
   * @since 2.0.0
   */
  def grouping_id(colName: String, colNames: String*): Column = 
    grouping_id((Seq(colName) ++ colNames).map(n => Column(n)) : _*)
  

kurtosis

  /**
   * 聚合函数:返回组中值的峰度。
   *
   * @group agg_funcs
   * @since 1.6.0
   */
  def kurtosis(e: Column): Column = withAggregateFunction  Kurtosis(e.expr) 

  /**
   * 聚合函数:返回组中值的峰度。
   *
   * @group agg_funcs
   * @since 1.6.0
   */
  def kurtosis(columnName: String): Column = kurtosis(Column(columnName))

峰度(peakedness;kurtosis)又称峰态系数。表征概率密度分布曲线在平均值处峰值高低的特征数。直观看来,峰度反映了峰部的尖度。样本的峰度是和正态分布相比较而言统计量,如果峰度大于三,峰的形状比较尖,比正态分布峰要陡峭。反之亦然。
在统计学中,峰度(Kurtosis)衡量实数随机变量概率分布的峰态。峰度高就意味着方差增大是由低频度的大于或小于平均值的极端差值引起的。

last

  /**
   * 聚合函数:返回组中的最后一个值。 
   * 默认情况下,该函数返回它看到的最后一个值。
   * 当ignoreNulls设置为true时,它将返回最后一个看到的非null值。
   * 如果所有值都为null,则返回null。 
   * 注意: 该函数是不确定的,因为它的结果取决于行的顺序,而行的顺序在 Shuffle 后可能是不确定的。
   *
   * @group agg_funcs
   * @since 2.0.0
   */
  def last(e: Column, ignoreNulls: Boolean): Column = withAggregateFunction 
    new Last(e.expr, ignoreNulls)
  

  /**
   * 聚合函数:返回组中的最后一个值。 
   * 默认情况下,该函数返回它看到的最后一个值。
   * 当ignoreNulls设置为true时,它将返回最后一个看到的非null值。
   * 如果所有值都为null,则返回null。 
   * 注意: 该函数是不确定的,因为它的结果取决于行的顺序,而行的顺序在 Shuffle 后可能是不确定的。
   *
   * @group agg_funcs
   * @since 2.0.0
   */
  def last(columnName: String, ignoreNulls: Boolean): Column = 
    last(Column(columnName), ignoreNulls)
  

  /**
   * 聚合函数:返回组中的最后一个值。 
   * 默认情况下,该函数返回它看到的最后一个值。
   * 当ignoreNulls设置为true时,它将返回最后一个看到的非null值。
   * 如果所有值都为null,则返回null。 
   * 注意: 该函数是不确定的,因为它的结果取决于行的顺序,而行的顺序在 Shuffle 后可能是不确定的。
   *
   * @group agg_funcs
   * @since 1.3.0
   */
  def last(e: Column): Column = last(e, ignoreNulls = false)

  /**
   *    * 聚合函数:返回组中的最后一个值。 
   * 默认情况下,该函数返回它看到的最后一个值。
   * 当ignoreNulls设置为true时,它将返回最后一个看到的非null值。
   * 如果所有值都为null,则返回null。 
   * 注意: 该函数是不确定的,因为它的结果取决于行的顺序,而行的顺序在 Shuffle 后可能是不确定的。
   *
   * @group agg_funcs
   * @since 1.3.0
   */
  def last(columnName: String): Column = last(Column(columnName), ignoreNulls = false)

max/max_by/mean/min/min_by

  /**
   * 聚合函数:返回组中表达式的最大值。
   *
   * @group agg_funcs
   * @since 1.3.0
   */
  def max(e: Column): Column = withAggregateFunction  Max(e.expr) 

  /**
   * 聚合函数:返回组中表达式的最大值。
   *
   * @group agg_funcs
   * @since 1.3.0
   */
  def max(columnName: String): Column = max(Column(columnName))

  /**
   * 聚合函数:返回与ord最大值关联的值。
   *
   * @group agg_funcs
   * @since 3.3.0
   */
  def max_by(e: Column, ord: Column): Column = withAggregateFunction  MaxBy(e.expr, ord.expr) 

  /**
   * 聚合函数:返回组中值的平均值。
   * "avg" 的别名。
   *
   * @group agg_funcs
   * @since 1.4.0
   */
  def mean(e: Column): Column = avg(e)

  /**
   * 聚合函数:返回组中值的平均值。
   * "avg" 的别名。
   *
   * @group agg_funcs
   * @since 1.4.0
   */
  def mean(columnName: String): Column = avg(columnName)

  /**
   * 聚合函数:返回组中表达式的最小值。
   *
   * @group agg_funcs
   * @since 1.3.0
   */
  def min(e: Column): Column = withAggregateFunction  Min(e.expr) 

  /**
   * 聚合函数:返回组中表达式的最小值。
   *
   * @group agg_funcs
   * @since 1.3.0
   */
  def min(columnName: String): Column = min(Column(columnName))

  /**
   * 聚合函数:返回与ord最小值关联的值。
   *
   * @group agg_funcs
   * @since 3.3.0
   */
  def min_by(e: Column, ord: Column): Column = withAggregateFunction  MinBy(e.expr, ord.expr) 

percentile_approx

  /**
   * 聚合函数:返回数值列'col'的近似百分比,该数值列'col'是有序'col'值(从最小值到最大值排序)中的最
   * 小值,因此col值小于或等于该值的百分比不超过'percentage'。 
   * 如果'percentage'是数组,则每个值必须介于0.0和1.0之间。
   * 如果是单浮点值,则必须介于0.0和1.0之间。 
   * 精度参数'accuracy'是一个正数值文字,它以内存为代价控制近似精度。
   * 精度值越高,精度越好,1.0/accuracy 是近似值的相对误差。
   *
   * @group agg_funcs
   * @since 3.1.0
   */
  def percentile_approx(e: Column, percentage: Column, accuracy: Column): Column = 
    withAggregateFunction 
      new ApproximatePercentile(
        e.expr, percentage.expr, accuracy.expr
      )
    
  

product

  /**
   * 聚合函数:返回组中所有数值元素的乘积。
   *
   * @group agg_funcs
   * @since 3.2.0
   */
  def product(e: Column): Column =
    withAggregateFunction  new Product(e.expr) 

skewness

  /**
   * 聚合函数:返回组中值的偏度。
   *
   * @group agg_funcs
   * @since 1.6.0
   */
  def skewness(e: Column): Column = withAggregateFunction  Skewness(e.expr) 

  /**
   * 聚合函数:返回组中值的偏度。
   *
   * @group agg_funcs
   * @since 1.6.0
   */
  def skewness(columnName: String): Column = skewness(Column(columnName))

偏度(skewness),是统计数据分布偏斜方向和程度的度量,是统计数据分布非对称程度的数字特征。偏度(Skewness)亦称偏态、偏态系数。

stddev/stddev_samp/stddev_pop

  /**
   * 聚合函数: `stddev_samp` 的别名
   *
   * @group agg_funcs
   * @since 1.6.0
   */
  def stddev(e: Column): Column = withAggregateFunction  StddevSamp(e.expr) 

  /**
   * 聚合函数: `stddev_samp` 的别名
   *
   * @group agg_funcs
   * @since 1.6.0
   */
  def stddev(columnName: String): Column = stddev(Column(columnName))

  /**
   * 聚合函数:返回组中表达式的样本标准差。
   *
   * @group agg_funcs
   * @since 1.6.0
   */
  def stddev_samp(e: Column): Column = withAggregateFunction  StddevSamp(e.expr) 

  /**
   * 聚合函数:返回组中表达式的样本标准差。
   *
   * @group agg_funcs
   * @since 1.6.0
   */
  def stddev_samp(columnName: String): Column = stddev_samp(Column(columnName))

  /**
   * 聚合函数:返回组中表达式的总体标准差。
   *
   * @group agg_funcs
   * @since 1.6.0
   */
  def stddev_pop(e: Column): Column = withAggregateFunction  StddevPop(e.expr) 

  /**
   * 聚合函数:返回组中表达式的总体标准差。
   *
   * @group agg_funcs
   * @since 1.6.0
   */
  def stddev_pop(columnName: String): Column = stddev_pop(Column(columnName))

sum/sum_distinct

  /**
   * 聚合函数:返回表达式中所有值的总和。
   *
   * @group agg_funcs
   * @since 1.3.0
   */
  def sum(e: Column): Column = withAggregateFunction  Sum(e.expr) 

  /**
   * 聚合函数:返回表达式中所有值的总和。
   *
   * @group agg_funcs
   * @since 1.3.0
   */
  def sum(columnName: String): Column = sum(Column(columnName))

  /**
   * 聚合函数:返回表达式中不同值的总和。
   *
   * @group agg_funcs
   * @since 1.3.0
   */
  @deprecated("Use sum_distinct", "3.2.0")
  def sumDistinct(e: Column): Column = withAggregateFunction(Sum(e.expr), isDistinct = true)

  /**
   * 聚合函数:返回表达式中不同值的总和。
   *
   * @group agg_funcs
   * @since 1.3.0
   */
  @deprecated("Use sum_distinct", "3.2.0")
  def sumDistinct(columnName: String): Column = sum_distinct(Column(columnName))

  /**
   * 聚合函数:返回表达式中不同值的总和。
   *
   * @group agg_funcs
   * @since 3.2.0
   */
  def sum_distinct(e: Column): Column = withAggregateFunction(Sum(e.expr), isDistinct = true)

variance/var_samp/var_pop

  /**
   * 聚合函数: `var_samp` 的别名
   *
   * @group agg_funcs
   * @since 1.6.0
   */
  def variance(e: Column): Column = withAggregateFunction  VarianceSamp(e.expr) 

  /**
   * 聚合函数: `var_samp` 的别名
   *
   * @group agg_funcs
   * @since 1.6.0
   */
  def variance(columnName: String): Column = variance(Column(columnName))

  /**
   * 聚合函数:返回组中值的无偏方差。
   *
   * @group agg_funcs
   * @since 1.6.0
   */
  def var_samp(e: Column): Column = withAggregateFunction  VarianceSamp(e.expr) 

  /**
   * 聚合函数:返回组中值的无偏方差。
   *
   * @group agg_funcs
   * @since 1.6.0
   */
  def var_samp(columnName: String): Column = var_samp(Column(columnName))

  /**
   * 聚合函数:返回组中值的总体方差。
   *
   * @group agg_funcs
   * @since 1.6.0
   */
  def var_pop(e: Column): Column = withAggregateFunction  VariancePop(e.expr) 

  /**
   * 聚合函数:返回组中值的总体方差。
   *
   * @group agg_funcs
   * @since 1.6.0
   */
  def var_pop(columnName: String): Column = var_pop(Column(columnName))

实践

数据

employees.csv

如何在 Spark 的 github 中查看 Functions.Scala 中的代码

spark-sql 与 spark-shell REPL 中的 Spark SQL 性能差异

1.Spark SQL基础—Spark SQL概述Spark SQL核心编程—DataFrameDataSet

学习笔记Spark—— Spark SQL应用—— Spark SQL简介环境配置

Spark SQL - org.apache.spark.sql.AnalysisException

Spark SQL - 在 Spark Streams 上部署 SQL 查询的选项