Spark SQL自定义函数

Posted 赵广陆

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark SQL自定义函数相关的知识,希望对你有一定的参考价值。


1 Spark SQL自定义函数

1.1 自定义函数分类

类似于hive当中的自定义函数, spark同样可以使用自定义函数来实现新的功能。
spark中的自定义函数有如下3类

1.UDF(User-Defined-Function)
输入一行,输出一行

2.UDAF(User-Defined Aggregation Funcation)
输入多行,输出一行

3.UDTF(User-Defined Table-Generating Functions)
输入一行,输出多行

1.2 自定义UDF





●需求
有udf.txt数据格式如下:

Hello
abc
study
small

通过自定义UDF函数将每一行数据转换成大写
select value,smallToBig(value) from t_word

●代码演示

package cn.oldlu.sql

import org.apache.spark.SparkContext
import org.apache.spark.sql.Dataset, SparkSession


object UDFDemo 
  def main(args: Array[String]): Unit = 
    //1.创建SparkSession
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.读取文件
    val fileDS: Dataset[String] = spark.read.textFile("D:\\\\data\\\\udf.txt")
    fileDS.show()
    /*
    +----------+
    |     value|
    +----------+
    |helloworld|
    |       abc|
    |     study|
    | smallWORD|
    +----------+
     */
   /*
    将每一行数据转换成大写
    select value,smallToBig(value) from t_word
    */
    //注册一个函数名称为smallToBig,功能是传入一个String,返回一个大写的String
    spark.udf.register("smallToBig",(str:String) => str.toUpperCase())
    fileDS.createOrReplaceTempView("t_word")
    //使用我们自己定义的函数
    spark.sql("select value,smallToBig(value) from t_word").show()
    /*
    +----------+---------------------+
    |     value|UDF:smallToBig(value)|
    +----------+---------------------+
    |helloworld|           HELLOWORLD|
    |       abc|                  ABC|
    |     study|                STUDY|
    | smallWORD|            SMALLWORD|
    +----------+---------------------+
     */
    sc.stop()
    spark.stop()
  

1.3 自定义UDAF

虽然小编说UDAF只做了解,但也只需要掌握的。在企业中,这也是你和别人拉开差距的地方

●需求

有udaf.json数据内容如下

"name":"Michael","salary":3000
"name":"Andy","salary":4500
"name":"Justin","salary":3500
"name":"Berta","salary":4000

求取平均工资

●继承UserDefinedAggregateFunction方法重写说明

inputSchema:输入数据的类型

bufferSchema:产生中间结果的数据类型

dataType:最终返回的结果类型

deterministic:确保一致性,一般用true

initialize:指定初始值

update:每有一条数据参与运算就更新一下中间结果(update相当于在每一个分区中的运算)

merge:全局聚合(将每个分区的结果进行聚合)

evaluate:计算最终的结果

●代码演示

package cn.oldlu.sql

import org.apache.spark.SparkContext
import org.apache.spark.sql.expressions.MutableAggregationBuffer, UserDefinedAggregateFunction
import org.apache.spark.sql.types._
import org.apache.spark.sql.DataFrame, Row, SparkSession


object UDAFDemo 
  def main(args: Array[String]): Unit = 
    //1.获取sparkSession
    val spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.读取文件
    val employeeDF: DataFrame = spark.read.json("D:\\\\data\\\\udaf.json")
    //3.创建临时表
    employeeDF.createOrReplaceTempView("t_employee")
    //4.注册UDAF函数
    spark.udf.register("myavg",new MyUDAF)
    //5.使用自定义UDAF函数
    spark.sql("select myavg(salary) from t_employee").show()
    //6.使用内置的avg函数
    spark.sql("select avg(salary) from t_employee").show()
  

class MyUDAF extends UserDefinedAggregateFunction
  //输入的数据类型的schema
  override def inputSchema: StructType = 
     StructType(StructField("input",LongType)::Nil)
  
  //缓冲区数据类型schema,就是转换之后的数据的schema
  override def bufferSchema: StructType = 
    StructType(StructField("sum",LongType)::StructField("total",LongType)::Nil)
  
  //返回值的数据类型
  override def dataType: DataType = 
    DoubleType
  
  //确定是否相同的输入会有相同的输出
  override def deterministic: Boolean = 
    true
  
  //初始化内部数据结构
  override def initialize(buffer: MutableAggregationBuffer): Unit = 
    buffer(0) = 0L
    buffer(1) = 0L
  
  //更新数据内部结构,区内计算
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = 
    //所有的金额相加
    buffer(0) = buffer.getLong(0) + input.getLong(0)
    //一共有多少条数据
    buffer(1) = buffer.getLong(1) + 1
  
  //来自不同分区的数据进行合并,全局合并
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = 
    buffer1(0) =buffer1.getLong(0) + buffer2.getLong(0)
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  
  //计算输出数据值
  override def evaluate(buffer: Row): Any = 
    buffer.getLong(0).toDouble / buffer.getLong(1)
  

2 开窗函数

2.1、概述


●介绍
开窗函数的引入是为了既显示聚集前的数据,又显示聚集后的数据。即在每一行的最后一列添加聚合函数的结果。

开窗用于为行定义一个窗口(这里的窗口是指运算将要操作的行的集合),它对一组值进行操作,不需要使用 GROUP BY 子句对数据进行分组,能够在同一行中同时返回基础行的列和聚合列。

●聚合函数和开窗函数
聚合函数是将多行变成一行,count,avg…
开窗函数是将一行变成多行;
聚合函数如果要显示其他的列必须将列加入到group by中
开窗函数可以不使用group by,直接将所有信息显示出来

●开窗函数分类
1.聚合开窗函数
聚合函数(列) OVER(选项),这里的选项可以是PARTITION BY 子句,但不可以是 ORDER BY 子句。
==2.排序开窗函数 ==
排序函数(列) OVER(选项),这里的选项可以是ORDER BY 子句,也可以是 OVER(PARTITION BY 子句 ORDER BY 子句),但不可以是 PARTITION BY 子句。

2.2.准备工作

/export/servers/spark/bin/spark-shell

case class Score(name: String, clazz: Int, score: Int)
val scoreDF = spark.sparkContext.makeRDD(Array(
Score("a1", 1, 80),
Score("a2", 1, 78),
Score("a3", 1, 95),
Score("a4", 2, 74),
Score("a5", 2, 92),
Score("a6", 3, 99),
Score("a7", 3, 99),
Score("a8", 3, 45),
Score("a9", 3, 55),
Score("a10", 3, 78),
Score("a11", 3, 100))
).toDF("name", "class", "score")
scoreDF.createOrReplaceTempView("scores")
scoreDF.show()

查看数据结果:

+----+-----+-----+
|name|class|score|
+----+-----+-----+
|  a1|    1|   80|
|  a2|    1|   78|
|  a3|    1|   95|
|  a4|    2|   74|
|  a5|    2|   92|
|  a6|    3|   99|
|  a7|    3|   99|
|  a8|    3|   45|
|  a9|    3|   55|
| a10|    3|   78|
| a11|    3|  100|
+----+-----+-----+

2.3. 聚合开窗函数

●示例1
OVER 关键字表示把聚合函数当成聚合开窗函数而不是聚合函数。
SQL标准允许将所有聚合函数用做聚合开窗函数。


spark.sql("select  count(name)  from scores").show
spark.sql("select name, class, score, count(name) over() name_count from scores").show

结果如下所示:

+----+-----+-----+----------+                                                   
|name|class|score|name_count|
+----+-----+-----+----------+
|  a1|    1|   80|        11|
|  a2|    1|   78|        11|
|  a3|    1|   95|        11|
|  a4|    2|   74|        11|
|  a5|    2|   92|        11|
|  a6|    3|   99|        11|
|  a7|    3|   99|        11|
|  a8|    3|   45|        11|
|  a9|    3|   55|        11|
| a10|    3|   78|        11|
| a11|    3|  100|        11|
+----+-----+-----+----------+

●示例2
OVER 关键字后的括号中还可以添加选项用以改变进行聚合运算的窗口范围。
如果 OVER 关键字后的括号中的选项为空,则开窗函数会对结果集中的所有行进行聚合运算。

开窗函数的 OVER 关键字后括号中的可以使用 PARTITION BY 子句来定义行的分区来供进行聚合计算。与 GROUP BY 子句不同,PARTITION BY 子句创建的分区是独立于结果集的,创建的分区只是供进行聚合计算的,而且不同的开窗函数所创建的分区也不互相影响。

下面的 SQL 语句用于显示按照班级分组后每组的人数
OVER(PARTITION BY class)表示对结果集按照 class 进行分区,并且计算当前行所属的组的聚合计算结果。

spark.sql("select name, class, score, count(name) over(partition by class) name_count from scores").show

查询结果如下所示:

+----+-----+-----+----------+                                                   
|name|class|score|name_count|
+----+-----+-----+----------+
|  a1|    1|   80|         3|
|  a2|    1|   78|         3|
|  a3|    1|   95|         3|
|  a6|    3|   99|         6|
|  a7|    3|   99|         6|
|  a8|    3|   45|         6|
|  a9|    3|   55|         6|
| a10|    3|   78|         6|
| a11|    3|  100|         6|
|  a4|    2|   74|         2|
|  a5|    2|   92|         2|
+----+-----+-----+----------+

2.4. 排序开窗函数

2.4.1 ROW_NUMBER顺序排序



row_number() over(order by score) as rownum 表示按score 升序的方式来排序,并得出排序结果的序号

注意:
在排序开窗函数中使用 PARTITION BY 子句需要放置在ORDER BY 子句之前。

●示例1

只做排序,按成绩排序

spark.sql("select name, class, score, row_number() over(order by score) rank from scores").show()

结果如下:

+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
|  a8|    3|   45|   1|
|  a9|    3|   55|   2|
|  a4|    2|   74|   3|
|  a2|    1|   78|   4|
| a10|    3|   78|   5|
|  a1|    1|   80|   6|
|  a5|    2|   92|   7|
|  a3|    1|   95|   8|
|  a6|    3|   99|   9|
|  a7|    3|   99|  10|
| a11|    3|  100|  11|
+----+-----+-----+----+

分区并排序,按班级分区并按成绩排序

spark.sql("select name, class, score, row_number() over(partition by class order by score) rank from scores").show()
+----+-----+-----+----+                                                         
|name|class|score|rank|
+----+-----+-----+----+
|  a2|    1|   78|   1|
|  a1|    1|   80|   2|
|  a3|    1|   95|   3|
|  a8|    3|   45|   1|
|  a9|    3|   55|   2|
| a10|    3|   78|   3|
|  a6|    3|   99|   4|
|  a7|    3|   99|   5|
| a11|    3|  100|   6|
|  a4|    2|   74|   1|
|  a5|    2|   92|   2|
+----+-----+-----+----+

2.4.2 RANK跳跃排序

rank() over(order by score) as rank表示按 score升序的方式来排序,并得出排序结果的排名号。

这个函数求出来的排名结果可以并列(并列第一/并列第二),并列排名之后的排名将是并列的排名加上并列数

简单说每个人只有一种排名,然后出现两个并列第一名的情况,这时候排在两个第一名后面的人将是第三名ÿ

以上是关于Spark SQL自定义函数的主要内容,如果未能解决你的问题,请参考以下文章

spark-sql 自定义函数

Spark sql实现自定义函数

Spark sql实现自定义函数

Spark sql实现自定义函数

Spark sql实现自定义函数

spark-sql自定义函数UDF和UDAF