Spark---自定义函数(UDFUDAFUDTF)

Posted Shall潇

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark---自定义函数(UDFUDAFUDTF)相关的知识,希望对你有一定的参考价值。

和Hive中的自定义函数一样

  • UDF — 一对一
  • UDAF — 多对一
  • UDTF — 一对多

文章目录

UDF

hobbies.txt

alice jogging,Coding,cooking
lina travel,dance
tom shopping,working
jack basketball,swimming

package udf

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession


/*
* 自定义函数 :UDF : 一进一出
*
* 统计每个人的兴趣爱好个数
*
* */

case class Hobbies(name:String,hobby:String)

object SparkUDFDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("UDFFunction2")
    val sc = SparkContext.getOrCreate(conf)
    val spark = SparkSession.builder().config(conf).getOrCreate()

    val info = sc.textFile("in/hobbies.txt")
    import spark.implicits._
    val hobbyDF = info.map(_.split(" ")).map(x=>Hobbies(x(0),x(1))).toDF
    hobbyDF.show()

    hobbyDF.createOrReplaceTempView("hob")

    //注册自定义函数
    spark.udf.register("hob_num",(s:String)=>s.split(",").size) //第一个参数:函数名,第二个:自定义函数的实现方式
    spark.sql("select name,hobby,hob_num(hobby) as hobby_number from hob").show(false)  //truncate(缩短): false:全部显示
  }
}

UDAF

package udf

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Row, SparkSession, types}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._

/*
*  自定义聚合函数 --- 多进一出
*
*  实现 Avg 函数
* */

case class StudentS(id:Integer,name:String,gender:String,age:Integer)

object UDAFDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("UDAF")
    val sc = SparkContext.getOrCreate(conf)
    val spark = SparkSession.builder().config(conf).getOrCreate()
    import spark.implicits._
	//数据
    val students = Seq(
      StudentS(1, "zhangsan", "F", 20),
      StudentS(2, "lisi", "M", 24),
      StudentS(3, "wangwu", "M", 23),
      StudentS(4, "zhaoliu", "F", 21),
      StudentS(5, "qianqi", "F", 45),
      StudentS(6, "sunba", "F", 39),
      StudentS(7, "zhoujiu", "F", 43),
      StudentS(8, "wuhuang", "M", 29),
      StudentS(9, "zhengtan", "F", 24),
      StudentS(10, "xujiao", "M", 22),
      StudentS(11, "hanxing", "F", 24)
    )

    val frame = students.toDF()
    frame.createOrReplaceTempView("student")
    val myudaf = new MyAgeAvgFunction
    spark.udf.register("myavg",myudaf)
    spark.sql("select gender,myavg(age) as avgAge from student group by gender").show()
  }
}
class MyAgeAvgFunction extends UserDefinedAggregateFunction{   //自定义函数
  //输入数据的数据类型
  override def inputSchema: StructType = {
    new StructType().add("age",LongType)
//    StructType(StructField("age","LongType")) :: Nil
  }
  //缓存区内数据结构
  override def bufferSchema: StructType = {
    new StructType().add("sum",LongType).add("count",LongType)
  }
  //聚合函数返回的数据结构
  override def dataType: DataType = DoubleType

  //聚合函数 相同的输入是否要相同的输出,聚合函数是否幂等操作
  override def deterministic: Boolean = true

  //数据初始化
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0L
    buffer(1) = 0L
  }

  //传入一条新的数据后需要进行的操作
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0) = buffer.getLong(0)+input.getLong(0)   //年龄累加
    buffer(1) = buffer.getLong(1)+1                  //个数+1
  }

  //合并各分区内的数据
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getLong(0)+buffer2.getLong(0)  //不同分组内的年龄相加
    buffer1(1) = buffer1.getLong(1)+buffer2.getLong(1)  //不同分组内的个数相加
  }

  //计算最终结果
  override def evaluate(buffer: Row): Any = {
    (buffer.getLong(0)/buffer.getLong(1)).toDouble      // 年龄总和/个数
  }
}

UDTF

udtf.txt

01//zs//Hadoop scala spark hive hbase
02//ls//Hadoop scala kafka hive hbase Oozie
03//ww//Hadoop scala spark hive sqoop

package udf

import java.util

import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectInspectorFactory, StructObjectInspector}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession

object UDTFDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("UDFFunction2")
    val sc = SparkContext.getOrCreate(conf)
    val spark = SparkSession.builder().config("hive.metastore.uris","thrift://192.168.XXX.100:9083").master("local[*]").appName("utd")
      .enableHiveSupport().getOrCreate()

    val info = sc.textFile("in/udtf.txt")
    import spark.implicits._

    val infoDF = info.map(x => x.split("//")).filter(x => x(1).equals("ls"))
      .map(x => (x(0), x(1), x(2))).toDF("id","name","class")
    infoDF.createOrReplaceTempView("table_udtf")
//    infoDF.printSchema()
//    infoDF.show()
//    spark.sql("select * from table_udtf").show(false)

    spark.sql("create temporary function myudtf as 'udf.myUDTF'") //创建临时方法
    spark.sql("select myudtf(class) from table_udtf").show()      //使用自定义方法
  }
}
class myUDTF extends GenericUDTF{

  //初始化
  override def initialize(argOIs: Array[ObjectInspector]): StructObjectInspector = {
    val fieldName = new util.ArrayList[String]()
    val fieldOIS = new util.ArrayList[ObjectInspector]()
    //定义输出数据类型
    fieldName.add("type")
    fieldOIS.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector)   //String类型
    ObjectInspectorFactory.getStandardStructObjectInspector(fieldName,fieldOIS)
  }

  //处理
  //传入 Hadoop scala spark hive hbase
  /*输出 head  type String
               Hadoop
               scala
               spark
               hive
               hbase
  */
  override def process(objects: Array[AnyRef]): Unit = {
    val strs = objects(0).toString.split(" ")  //将字符串切割为单个字符,形成字符数组
    for( str<- strs){
      val temp = new Array[String](1)
      temp(0) = str
      forward(temp)
    }
  }

  //关闭资源
  override def close(): Unit = {}
}

以上是关于Spark---自定义函数(UDFUDAFUDTF)的主要内容,如果未能解决你的问题,请参考以下文章

Hive 10Hive的UDFUDAFUDTF

Hive 10Hive的UDFUDAFUDTF

HIVE函数的UDFUDAFUDTF

Hive 的函数:UDFUDAFUDTF 的区别?

Hive 的函数:UDFUDAFUDTF 的区别?

Spark SQL自定义函数