spark 累加历史+统计全部
Posted zhangxuhui
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark 累加历史+统计全部相关的知识,希望对你有一定的参考价值。
spark 累加历史主要用到了窗口函数,而进行全部统计,则需要用到rollup函数
1 应用场景:
1、我们需要统计用户的总使用时长(累加历史)
2、前台展现页面需要对多个维度进行查询,如:产品、地区等等
2 原始数据:
product_code |event_date |duration | -------------|-----------|---------| 1438 |2016-05-13 |165 | 1438 |2016-05-14 |595 | 1438 |2016-05-15 |105 | 1629 |2016-05-13 |12340 | 1629 |2016-05-14 |13850 | 1629 |2016-05-15 |227 |
3 业务场景实现
3.1 业务场景1:累加历史:
如数据源所示:我们已经有当天用户的使用时长,我们期望在进行统计的时候,14号能累加13号的,15号能累加14、13号的,以此类推
3.1.1 spark-sql实现
//spark sql 使用窗口函数累加历史数据 sqlContext.sql( """ select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc) as sum_duration from userlogs_date """).show +-----+----------+------------+ |pcode|event_date|sum_duration| +-----+----------+------------+ | 1438|2016-05-13| 165| | 1438|2016-05-14| 760| | 1438|2016-05-15| 865| | 1629|2016-05-13| 12340| | 1629|2016-05-14| 26190| | 1629|2016-05-15| 26417| +-----+----------+------------+
3.1.2 dataframe实现
//使用Column提供的over 函数,传入窗口操作 import org.apache.spark.sql.expressions._ val first_2_now_window = Window.partitionBy("pcode").orderBy("event_date") df_userlogs_date.select( $"pcode", $"event_date", sum($"duration").over(first_2_now_window).as("sum_duration") ).show +-----+----------+------------+ |pcode|event_date|sum_duration| +-----+----------+------------+ | 1438|2016-05-13| 165| | 1438|2016-05-14| 760| | 1438|2016-05-15| 865| | 1629|2016-05-13| 12340| | 1629|2016-05-14| 26190| | 1629|2016-05-15| 26417| +-----+----------+------------+
3.1.3 扩展 累加一段时间范围内
实际业务中的累加逻辑远比上面复杂,比如,累加之前N天,累加前N天到后N天等等。以下我们来实现:
3.1.3.1 累加历史所有:
select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc) as sum_duration from userlogs_date
select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding and current row) as sum_duration from userlogs_date
Window.partitionBy("pcode").orderBy("event_date").rowsBetween(Long.MinValue,0)
Window.partitionBy("pcode").orderBy("event_date")
上边四种写法完全相等
3.1.3.2 累加N天之前,假设N=3
select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between 3 preceding and current row) as sum_duration from userlogs_date
Window.partitionBy("pcode").orderBy("event_date").rowsBetween(-3,0)
3.1.3.3 累加前N天,后M天: 假设N=3 M=5
select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between 3 preceding and 5 following ) as sum_duration from userlogs_date
Window.partitionBy("pcode").orderBy("event_date").rowsBetween(-3,5)
3.1.3.4 累加该分区内所有行
select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding and unbounded following ) as sum_duration from userlogs_date
Window.partitionBy("pcode").orderBy("event_date").rowsBetween(Long.MinValue,Long.MaxValue)
总结如下:
preceding:用于累加前N行(分区之内)。若是从分区第一行头开始,则为 unbounded。 N为:相对当前行向前的偏移量
following :与preceding相反,累加后N行(分区之内)。若是累加到该分区结束,则为 unbounded。N为:相对当前行向后的偏移量
current row:顾名思义,当前行,偏移量为0
说明:上边的前N,后M,以及current row均会累加该偏移量所在行
3.1.3.4 实测结果
累加历史:分区内当天及之前所有 写法1:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc) as sum_duration from userlogs_date
+-----+----------+------------+ |pcode|event_date|sum_duration| +-----+----------+------------+ | 1438|2016-05-13| 165| | 1438|2016-05-14| 760| | 1438|2016-05-15| 865| | 1629|2016-05-13| 12340| | 1629|2016-05-14| 26190| | 1629|2016-05-15| 26417| +-----+----------+------------+
累加历史:分区内当天及之前所有 写法2:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding and current row) as sum_duration from userlogs_date
+-----+----------+------------+ |pcode|event_date|sum_duration| +-----+----------+------------+ | 1438|2016-05-13| 165| | 1438|2016-05-14| 760| | 1438|2016-05-15| 865| | 1629|2016-05-13| 12340| | 1629|2016-05-14| 26190| | 1629|2016-05-15| 26417| +-----+----------+------------+
累加当日和昨天:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between 1 preceding and current row) as sum_duration from userlogs_date
+-----+----------+------------+ |pcode|event_date|sum_duration| +-----+----------+------------+ | 1438|2016-05-13| 165| | 1438|2016-05-14| 760| | 1438|2016-05-15| 700| | 1629|2016-05-13| 12340| | 1629|2016-05-14| 26190| | 1629|2016-05-15| 14077| +-----+----------+------------+
累加当日、昨日、明日:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between 1 preceding and 1 following ) as sum_duration from userlogs_date
+-----+----------+------------+ |pcode|event_date|sum_duration| +-----+----------+------------+ | 1438|2016-05-13| 760| | 1438|2016-05-14| 865| | 1438|2016-05-15| 700| | 1629|2016-05-13| 26190| | 1629|2016-05-14| 26417| | 1629|2016-05-15| 14077| +-----+----------+------------+
累加分区内所有:当天和之前之后所有:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding and unbounded following ) as sum_duration from userlogs_date
+-----+----------+------------+ |pcode|event_date|sum_duration| +-----+----------+------------+ | 1438|2016-05-13| 865| | 1438|2016-05-14| 865| | 1438|2016-05-15| 865| | 1629|2016-05-13| 26417| | 1629|2016-05-14| 26417| | 1629|2016-05-15| 26417| +-----+----------+------------+
3.2 业务场景2:统计全部
3.2.1 spark sql实现
//spark sql 使用rollup添加all统计 sqlContext.sql( """ select pcode,event_date,sum(duration) as sum_duration from userlogs_date_1 group by pcode,event_date with rollup order by pcode,event_date """).show() +-----+----------+------------+ |pcode|event_date|sum_duration| +-----+----------+------------+ | null| null| 27282| | 1438| null| 865| | 1438|2016-05-13| 165| | 1438|2016-05-14| 595| | 1438|2016-05-15| 105| | 1629| null| 26417| | 1629|2016-05-13| 12340| | 1629|2016-05-14| 13850| | 1629|2016-05-15| 227| +-----+----------+------------+
3.2.2 dataframe函数实现
//使用dataframe提供的rollup函数,进行多维度all统计 df_userlogs_date.rollup($"pcode", $"event_date").agg(sum($"duration")).orderBy($"pcode", $"event_date") +-----+----------+-------------+ |pcode|event_date|sum(duration)| +-----+----------+-------------+ | null| null| 27282| | 1438| null| 865| | 1438|2016-05-13| 165| | 1438|2016-05-14| 595| | 1438|2016-05-15| 105| | 1629| null| 26417| | 1629|2016-05-13| 12340| | 1629|2016-05-14| 13850| | 1629|2016-05-15| 227| +-----+----------+-------------+
附录
下面是这两个函数的官方api说明:
org.apache.spark.sql.scala
def rollup(col1: String, cols: String*): GroupedData
Create a multi-dimensional rollup for the current DataFrame using the specified columns, so we can run aggregation on them. See GroupedData for all the available aggregate functions. This is a variant of rollup that can only group by existing columns using column names (i.e. cannot construct expressions). // Compute the average for all numeric columns rolluped by department and group. df.rollup("department", "group").avg() // Compute the max age and average salary, rolluped by department and gender. df.rollup($"department", $"gender").agg(Map( "salary" -> "avg", "age" -> "max" ))
def rollup(cols: Column*): GroupedData Create a multi-dimensional rollup for the current DataFrame using the specified columns, so we can run aggregation on them. See GroupedData for all the available aggregate functions. df.rollup($"department", $"group").avg() // Compute the max age and average salary, rolluped by department and gender. df.rollup($"department", $"gender").agg(Map( "salary" -> "avg", "age" -> "max" ))
org.apache.spark.sql.Column.scala
def over(window: WindowSpec): Column Define a windowing column. val w = Window.partitionBy("name").orderBy("id") df.select( sum("price").over(w.rangeBetween(Long.MinValue, 2)), avg("price").over(w.rowsBetween(0, 4)) )
以上是关于spark 累加历史+统计全部的主要内容,如果未能解决你的问题,请参考以下文章
Spark 实战系列SparkStreaming 累加器的使用
Note_Spark_Day12: StructuredStreaming入门