SparkSql中的窗口函数

Posted YaoYong_BigData

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkSql中的窗口函数相关的知识,希望对你有一定的参考价值。

一、第一名和第二名案例

1.1  需求介绍

1.1.1 数据集

  • product : 商品名称

  • categroy : 类别

  • revenue : 收入

1.1.2 需求分析

需求

  • 从数据集中得到每个类别收入第一的商品和收入第二的商品

    关键点是, 每个类别, 收入前两名

方案1: 使用常见语法子查询

  • 问题1: Spark 和 Hive 这样的系统中, 有自增主键吗? 没有

  • 问题2: 为什么分布式系统中很少见自增主键? 因为分布式环境下数据在不同的节点中, 很难保证顺序

  • 解决方案: 按照某一列去排序, 取前两条数据

  • 遗留问题: 不容易在分组中取每一组的前两个

SELECT * FROM productRevenue ORDER BY revenue LIMIT 2

方案2: 计算每一个类别的按照收入排序的序号, 取每个类别中的前两个

思路步骤:

1)按照类别分组

2)每个类别中的数据按照收入排序

3)为排序过的数据增加编号

4)取得每个类别中的前两个数据作为最终结果

使用 SQL 就不太容易做到, 需要一个语法, 叫做窗口函数。

1.2. 代码编写

1.2.1 创建初始环境

1)创建新的类 WindowFunction

2)编写测试方法

3)初始化 SparkSession

4)创建数据集

class WindowFunction 

  @Test
  def firstSecond(): Unit = 
    val spark = SparkSession.builder()
      .appName("window")
      .master("local[6]")
      .getOrCreate()

    import spark.implicits._

    val data = Seq(
      ("Thin", "Cell phone", 6000),
      ("Normal", "Tablet", 1500),
      ("Mini", "Tablet", 5500),
      ("Ultra thin", "Cell phone", 5000),
      ("Very thin", "Cell phone", 6000),
      ("Big", "Tablet", 2500),
      ("Bendable", "Cell phone", 3000),
      ("Foldable", "Cell phone", 3000),
      ("Pro", "Tablet", 4500),
      ("Pro2", "Tablet", 6500)
    )

    val source = data.toDF("product", "category", "revenue")
  

1.2.2 方式一: SQL 语句

SELECT
  product,
  category,
  revenue
FROM (
  SELECT
    product,
    category,
    revenue,
    dense_rank() OVER (PARTITION BY category ORDER BY revenue DESC) as rank
  FROM productRevenue) tmp
WHERE
  rank <= 2
  • 窗口函数在 SQL 中的完整语法如下

    function OVER (PARITION BY ... ORDER BY ... FRAME_TYPE BETWEEN ... AND ...)

1.2.3 方式二: 使用 DataFrame 的命令式 API

val window: WindowSpec = Window.partitionBy('category)
  .orderBy('revenue.desc)

source.select('product, 'category, 'revenue, dense_rank() over window as "rank")
  .where('rank <= 2)
  .show()
  • WindowSpec : 窗口的描述符, 描述窗口应该是怎么样的

  • dense_rank() over window : 表示一个叫做 dense_rank() 的函数作用于每一个窗口

1.3 总结

  • 在 Spark 中, 使用 SQL 或者 DataFrame 都可以操作窗口。

  • 窗口的使用有两个步骤:

  • 1)定义窗口规则 ;

  • 2)定义窗口函数。
  • 在不同的范围内统计名次时, 窗口函数非常得力。

二、窗口函数

2.1 窗口函数的逻辑

从 逻辑 上来讲, 窗口函数执行步骤大致可以分为如下几步:

dense_rank() OVER (PARTITION BY category ORDER BY revenue DESC) as rank

1)根据 PARTITION BY category, 对数据进行分组

2)分组后, 根据 ORDER BY revenue DESC 对每一组数据进行排序

3)在 每一条数据 到达窗口函数时, 套入窗口内进行计算

从语法的角度上讲, 窗口函数大致分为两个部分:

dense_rank() OVER (PARTITION BY category ORDER BY revenue DESC) as rank
  • 函数部分: dense_rank()

  • 窗口定义部分: PARTITION BY category ORDER BY revenue DESC

注意:

        窗口函数和 GroupBy 最大的区别, 就是 GroupBy 的聚合对每一个组只有一个结果, 而窗口函数可以对每一条数据都有一个结果。

        说白了, 窗口函数其实就是根据当前数据, 计算其在所在的组中的统计数据。

2.2 窗口定义部分

window_function (expression) OVER (
   [ PARTITION BY part_list ]
   [ ORDER BY order_list ]
   [  ROWS | RANGE  BETWEEN frame_start AND frame_end ] )

2.2.1 Partition 定义

控制哪些行会被放在一起, 会将同一个分组的数据放在同一台机器中处理

2.2.2 Order 定义

2.2.3 Frame 定义

释义:

  • 窗口函数会针对 每一个组中的每一条数据 进行统计聚合或者 rank, 一个组又称为一个 Frame

  • 分组由两个字段控制, Partition 在整体上进行分组和分区

  • 而通过 Frame 可以通过 当前行 来更细粒度的分组控制

    举个栗子, 例如公司每月销售额的数据, 统计其同比增长率, 那就需要把这条数据和前面一条数据进行结合计算了。

有哪些控制方式?

1)Row Frame

通过 "行号" 来表示。

2)Range Frame

通过某一个列的差值来表示。

range between 2000 preceding and 1000 following:表示当前行的值往前减2000和往后加1000这个区间范围。如下图所示:

2.3 函数部分

dense_rank() OVER (PARTITION BY category ORDER BY revenue DESC) as rank

如下是支持的窗口函数:

类型函数解释

排名函数

rank

  • 排名函数, 计算当前数据在其 Frame 中的位置

  • 如果有重复, 则重复项后面的行号会有空挡

dense_rank

  • 和 rank 一样, 但是结果中没有空挡

row_number

  • 和 rank 一样, 也是排名, 但是不同点是即使有重复项, 排名依然增长

分析函数

first_value

获取这个组第一条数据

last_value

获取这个组最后一条数据

lag

lag(field, n) 获取当前数据的 field 列向前 n 条数据

lead

lead(field, n) 获取当前数据的 field 列向后 n 条数据

聚合函数

*

所有的 functions 中的聚合函数都支持

2.4 窗口函数元素搭配

 

 2.5 举例说明

SELECT user_id,
       course,
       score,
       ROW_NUMBER() OVER(PARTITION BY course ORDER BY score ASC) AS rn,
       FIRST_VALUE(score) OVER(PARTITION BY course ORDER BY score ASC) AS first_scorea,
       FIRST_VALUE(score) OVER(PARTITION BY course ORDER BY score DESC) AS first_scored,
       FIRST_VALUE(user_id) OVER(PARTITION BY course ORDER BY score ASC) AS first_usera,
       FIRST_VALUE(user_id) OVER(PARTITION BY course ORDER BY score DESC, user_id ASC) AS first_userda,
       LAST_VALUE(score) OVER(PARTITION BY course ORDER BY score) AS last_scorea,
       LAST_VALUE(user_id) OVER(PARTITION BY course ORDER BY score ASC,user_id ASC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS last_user_upcr,
       LAST_VALUE(user_id) OVER(PARTITION BY course ORDER BY score ASC,user_id ASC RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS last_user_upuf,
       LAG(score,1,0) OVER(PARTITION BY course ORDER BY score) AS lag_1_0
  FROM student_score
  ORDER BY course,
          rn;

2.6 总结

窗口操作分为两个部分:

  • 窗口定义, 定义时可以指定 PartitionOrderFrame

  • 函数操作, 可以使用三大类函数, 排名函数, 分析函数, 聚合函数

具体的开窗函数使用方式可参考以下文章:

Hive开窗函数总结_YaoYong_BigData的博客-CSDN博客_hive中的开窗函数

三、最优差值案例

3.1 需求介绍

源数据集:

需求:

统计每个商品和此品类最贵商品之间的差值。

目标数据集: 

3.2 代码实现

package cn.yy.spark.sql

import org.apache.spark.sql
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window

object WindowFun 

  def main(args: Array[String]): Unit = 
    val spark = SparkSession.builder()
      .appName("window")
      .master("local[6]")
      .getOrCreate()

    import org.apache.spark.sql.functions._
    import spark.implicits._

    val data = Seq(
      ("Thin", "Cell phone", 6000),
      ("Normal", "Tablet", 1500),
      ("Mini", "Tablet", 5500),
      ("Ultra thin", "Cell phone", 5500),
      ("Very thin", "Cell phone", 6000),
      ("Big", "Tablet", 2500),
      ("Bendable", "Cell phone", 3000),
      ("Foldable", "Cell phone", 3000),
      ("Pro", "Tablet", 4500),
      ("Pro2", "Tablet", 6500)
    )

    val source = data.toDF("product", "category", "revenue")

    // 1. 定义窗口, 按照分类进行倒序排列
    val window = Window.partitionBy('category)
      .orderBy('revenue.desc)

    // 2. 找到最贵的的商品价格
    val maxPrice: sql.Column = max('revenue) over window

    // 3. 得到结果
    source.select('product, 'category, 'revenue, (maxPrice - 'revenue) as "revenue_difference")
      .show()
  

运行结果:

+----------+----------+-------+------------------+
|   product|  category|revenue|revenue_difference|
+----------+----------+-------+------------------+
|      Thin|Cell phone|   6000|                 0|
| Very thin|Cell phone|   6000|                 0|
|Ultra thin|Cell phone|   5500|               500|
|  Bendable|Cell phone|   3000|              3000|
|  Foldable|Cell phone|   3000|              3000|
|      Pro2|    Tablet|   6500|                 0|
|      Mini|    Tablet|   5500|              1000|
|       Pro|    Tablet|   4500|              2000|
|       Big|    Tablet|   2500|              4000|
|    Normal|    Tablet|   1500|              5000|
+----------+----------+-------+------------------+

以上是关于SparkSql中的窗口函数的主要内容,如果未能解决你的问题,请参考以下文章

SparkSql中的窗口函数

SparkSql中的窗口函数

sparksql系列 sparksql列操作窗口函数join

sparksql比hivesql优化的点(窗口函数)

大数据之Spark:SparkSQL开窗函数实战

Spark 和 SparkSQL:如何模仿窗口功能?