SparkSql中的窗口函数
Posted YaoYong_BigData
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkSql中的窗口函数相关的知识,希望对你有一定的参考价值。
一、第一名和第二名案例
1.1 需求介绍
1.1.1 数据集
-
product
: 商品名称 -
categroy
: 类别 -
revenue
: 收入
1.1.2 需求分析
需求
-
从数据集中得到每个类别收入第一的商品和收入第二的商品
关键点是, 每个类别, 收入前两名
方案1: 使用常见语法子查询
-
问题1:
Spark
和Hive
这样的系统中, 有自增主键吗? 没有 -
问题2: 为什么分布式系统中很少见自增主键? 因为分布式环境下数据在不同的节点中, 很难保证顺序
-
解决方案: 按照某一列去排序, 取前两条数据
-
遗留问题: 不容易在分组中取每一组的前两个
SELECT * FROM productRevenue ORDER BY revenue LIMIT 2
方案2: 计算每一个类别的按照收入排序的序号, 取每个类别中的前两个
思路步骤:
1)按照类别分组
2)每个类别中的数据按照收入排序
3)为排序过的数据增加编号
4)取得每个类别中的前两个数据作为最终结果
使用 SQL
就不太容易做到, 需要一个语法, 叫做窗口函数。
1.2. 代码编写
1.2.1 创建初始环境
1)创建新的类 WindowFunction
2)编写测试方法
3)初始化 SparkSession
4)创建数据集
class WindowFunction
@Test
def firstSecond(): Unit =
val spark = SparkSession.builder()
.appName("window")
.master("local[6]")
.getOrCreate()
import spark.implicits._
val data = Seq(
("Thin", "Cell phone", 6000),
("Normal", "Tablet", 1500),
("Mini", "Tablet", 5500),
("Ultra thin", "Cell phone", 5000),
("Very thin", "Cell phone", 6000),
("Big", "Tablet", 2500),
("Bendable", "Cell phone", 3000),
("Foldable", "Cell phone", 3000),
("Pro", "Tablet", 4500),
("Pro2", "Tablet", 6500)
)
val source = data.toDF("product", "category", "revenue")
1.2.2 方式一: SQL
语句
SELECT
product,
category,
revenue
FROM (
SELECT
product,
category,
revenue,
dense_rank() OVER (PARTITION BY category ORDER BY revenue DESC) as rank
FROM productRevenue) tmp
WHERE
rank <= 2
-
窗口函数在
SQL
中的完整语法如下function OVER (PARITION BY ... ORDER BY ... FRAME_TYPE BETWEEN ... AND ...)
1.2.3 方式二: 使用 DataFrame
的命令式 API
val window: WindowSpec = Window.partitionBy('category)
.orderBy('revenue.desc)
source.select('product, 'category, 'revenue, dense_rank() over window as "rank")
.where('rank <= 2)
.show()
-
WindowSpec
: 窗口的描述符, 描述窗口应该是怎么样的 -
dense_rank() over window
: 表示一个叫做dense_rank()
的函数作用于每一个窗口
1.3 总结
-
在
Spark
中, 使用SQL
或者DataFrame
都可以操作窗口。 -
窗口的使用有两个步骤:
-
1)定义窗口规则 ;
- 2)定义窗口函数。
-
在不同的范围内统计名次时, 窗口函数非常得力。
二、窗口函数
2.1 窗口函数的逻辑
从 逻辑 上来讲, 窗口函数执行步骤大致可以分为如下几步:
dense_rank() OVER (PARTITION BY category ORDER BY revenue DESC) as rank
1)根据 PARTITION BY category
, 对数据进行分组
2)分组后, 根据 ORDER BY revenue DESC
对每一组数据进行排序
3)在 每一条数据 到达窗口函数时, 套入窗口内进行计算
从语法的角度上讲, 窗口函数大致分为两个部分:
dense_rank() OVER (PARTITION BY category ORDER BY revenue DESC) as rank
-
函数部分:
dense_rank()
-
窗口定义部分:
PARTITION BY category ORDER BY revenue DESC
注意:
窗口函数和
GroupBy
最大的区别, 就是GroupBy
的聚合对每一个组只有一个结果, 而窗口函数可以对每一条数据都有一个结果。说白了, 窗口函数其实就是根据当前数据, 计算其在所在的组中的统计数据。
2.2 窗口定义部分
window_function (expression) OVER (
[ PARTITION BY part_list ]
[ ORDER BY order_list ]
[ ROWS | RANGE BETWEEN frame_start AND frame_end ] )
2.2.1 Partition
定义
控制哪些行会被放在一起, 会将同一个分组的数据放在同一台机器中处理
2.2.2 Order
定义
2.2.3 Frame
定义
释义:
-
窗口函数会针对 每一个组中的每一条数据 进行统计聚合或者
rank
, 一个组又称为一个Frame
-
分组由两个字段控制,
Partition
在整体上进行分组和分区 -
而通过
Frame
可以通过 当前行 来更细粒度的分组控制举个栗子, 例如公司每月销售额的数据, 统计其同比增长率, 那就需要把这条数据和前面一条数据进行结合计算了。
有哪些控制方式?
1)Row Frame
通过 "行号"
来表示。
2)Range Frame
通过某一个列的差值来表示。
range between 2000 preceding and 1000 following:表示当前行的值往前减2000和往后加1000这个区间范围。如下图所示:
2.3 函数部分
dense_rank() OVER (PARTITION BY category ORDER BY revenue DESC) as rank
如下是支持的窗口函数:
类型 | 函数 | 解释 |
---|---|---|
排名函数 |
|
|
|
| |
|
| |
分析函数 |
| 获取这个组第一条数据 |
| 获取这个组最后一条数据 | |
|
| |
|
| |
聚合函数 |
| 所有的 |
2.4 窗口函数元素搭配
2.5 举例说明
SELECT user_id,
course,
score,
ROW_NUMBER() OVER(PARTITION BY course ORDER BY score ASC) AS rn,
FIRST_VALUE(score) OVER(PARTITION BY course ORDER BY score ASC) AS first_scorea,
FIRST_VALUE(score) OVER(PARTITION BY course ORDER BY score DESC) AS first_scored,
FIRST_VALUE(user_id) OVER(PARTITION BY course ORDER BY score ASC) AS first_usera,
FIRST_VALUE(user_id) OVER(PARTITION BY course ORDER BY score DESC, user_id ASC) AS first_userda,
LAST_VALUE(score) OVER(PARTITION BY course ORDER BY score) AS last_scorea,
LAST_VALUE(user_id) OVER(PARTITION BY course ORDER BY score ASC,user_id ASC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS last_user_upcr,
LAST_VALUE(user_id) OVER(PARTITION BY course ORDER BY score ASC,user_id ASC RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS last_user_upuf,
LAG(score,1,0) OVER(PARTITION BY course ORDER BY score) AS lag_1_0
FROM student_score
ORDER BY course,
rn;
2.6 总结
窗口操作分为两个部分:
-
窗口定义, 定义时可以指定
Partition
,Order
,Frame
-
函数操作, 可以使用三大类函数, 排名函数, 分析函数, 聚合函数
具体的开窗函数使用方式可参考以下文章:
Hive开窗函数总结_YaoYong_BigData的博客-CSDN博客_hive中的开窗函数
三、最优差值案例
3.1 需求介绍
源数据集:
需求:
统计每个商品和此品类最贵商品之间的差值。
目标数据集:
3.2 代码实现
package cn.yy.spark.sql
import org.apache.spark.sql
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
object WindowFun
def main(args: Array[String]): Unit =
val spark = SparkSession.builder()
.appName("window")
.master("local[6]")
.getOrCreate()
import org.apache.spark.sql.functions._
import spark.implicits._
val data = Seq(
("Thin", "Cell phone", 6000),
("Normal", "Tablet", 1500),
("Mini", "Tablet", 5500),
("Ultra thin", "Cell phone", 5500),
("Very thin", "Cell phone", 6000),
("Big", "Tablet", 2500),
("Bendable", "Cell phone", 3000),
("Foldable", "Cell phone", 3000),
("Pro", "Tablet", 4500),
("Pro2", "Tablet", 6500)
)
val source = data.toDF("product", "category", "revenue")
// 1. 定义窗口, 按照分类进行倒序排列
val window = Window.partitionBy('category)
.orderBy('revenue.desc)
// 2. 找到最贵的的商品价格
val maxPrice: sql.Column = max('revenue) over window
// 3. 得到结果
source.select('product, 'category, 'revenue, (maxPrice - 'revenue) as "revenue_difference")
.show()
运行结果:
+----------+----------+-------+------------------+
| product| category|revenue|revenue_difference|
+----------+----------+-------+------------------+
| Thin|Cell phone| 6000| 0|
| Very thin|Cell phone| 6000| 0|
|Ultra thin|Cell phone| 5500| 500|
| Bendable|Cell phone| 3000| 3000|
| Foldable|Cell phone| 3000| 3000|
| Pro2| Tablet| 6500| 0|
| Mini| Tablet| 5500| 1000|
| Pro| Tablet| 4500| 2000|
| Big| Tablet| 2500| 4000|
| Normal| Tablet| 1500| 5000|
+----------+----------+-------+------------------+
以上是关于SparkSql中的窗口函数的主要内容,如果未能解决你的问题,请参考以下文章