Spark---自定义函数(UDFUDAFUDTF)
Posted Shall潇
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark---自定义函数(UDFUDAFUDTF)相关的知识,希望对你有一定的参考价值。
和Hive中的自定义函数一样
- UDF — 一对一
- UDAF — 多对一
- UDTF — 一对多
UDF
hobbies.txt
alice jogging,Coding,cooking
lina travel,dance
tom shopping,working
jack basketball,swimming
package udf
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
/*
* 自定义函数 :UDF : 一进一出
*
* 统计每个人的兴趣爱好个数
*
* */
case class Hobbies(name:String,hobby:String)
object SparkUDFDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("UDFFunction2")
val sc = SparkContext.getOrCreate(conf)
val spark = SparkSession.builder().config(conf).getOrCreate()
val info = sc.textFile("in/hobbies.txt")
import spark.implicits._
val hobbyDF = info.map(_.split(" ")).map(x=>Hobbies(x(0),x(1))).toDF
hobbyDF.show()
hobbyDF.createOrReplaceTempView("hob")
//注册自定义函数
spark.udf.register("hob_num",(s:String)=>s.split(",").size) //第一个参数:函数名,第二个:自定义函数的实现方式
spark.sql("select name,hobby,hob_num(hobby) as hobby_number from hob").show(false) //truncate(缩短): false:全部显示
}
}
UDAF
package udf
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Row, SparkSession, types}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
/*
* 自定义聚合函数 --- 多进一出
*
* 实现 Avg 函数
* */
case class StudentS(id:Integer,name:String,gender:String,age:Integer)
object UDAFDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("UDAF")
val sc = SparkContext.getOrCreate(conf)
val spark = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
//数据
val students = Seq(
StudentS(1, "zhangsan", "F", 20),
StudentS(2, "lisi", "M", 24),
StudentS(3, "wangwu", "M", 23),
StudentS(4, "zhaoliu", "F", 21),
StudentS(5, "qianqi", "F", 45),
StudentS(6, "sunba", "F", 39),
StudentS(7, "zhoujiu", "F", 43),
StudentS(8, "wuhuang", "M", 29),
StudentS(9, "zhengtan", "F", 24),
StudentS(10, "xujiao", "M", 22),
StudentS(11, "hanxing", "F", 24)
)
val frame = students.toDF()
frame.createOrReplaceTempView("student")
val myudaf = new MyAgeAvgFunction
spark.udf.register("myavg",myudaf)
spark.sql("select gender,myavg(age) as avgAge from student group by gender").show()
}
}
class MyAgeAvgFunction extends UserDefinedAggregateFunction{ //自定义函数
//输入数据的数据类型
override def inputSchema: StructType = {
new StructType().add("age",LongType)
// StructType(StructField("age","LongType")) :: Nil
}
//缓存区内数据结构
override def bufferSchema: StructType = {
new StructType().add("sum",LongType).add("count",LongType)
}
//聚合函数返回的数据结构
override def dataType: DataType = DoubleType
//聚合函数 相同的输入是否要相同的输出,聚合函数是否幂等操作
override def deterministic: Boolean = true
//数据初始化
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0L
buffer(1) = 0L
}
//传入一条新的数据后需要进行的操作
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getLong(0)+input.getLong(0) //年龄累加
buffer(1) = buffer.getLong(1)+1 //个数+1
}
//合并各分区内的数据
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0)+buffer2.getLong(0) //不同分组内的年龄相加
buffer1(1) = buffer1.getLong(1)+buffer2.getLong(1) //不同分组内的个数相加
}
//计算最终结果
override def evaluate(buffer: Row): Any = {
(buffer.getLong(0)/buffer.getLong(1)).toDouble // 年龄总和/个数
}
}
UDTF
udtf.txt
01//zs//Hadoop scala spark hive hbase
02//ls//Hadoop scala kafka hive hbase Oozie
03//ww//Hadoop scala spark hive sqoop
package udf
import java.util
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectInspectorFactory, StructObjectInspector}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
object UDTFDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("UDFFunction2")
val sc = SparkContext.getOrCreate(conf)
val spark = SparkSession.builder().config("hive.metastore.uris","thrift://192.168.XXX.100:9083").master("local[*]").appName("utd")
.enableHiveSupport().getOrCreate()
val info = sc.textFile("in/udtf.txt")
import spark.implicits._
val infoDF = info.map(x => x.split("//")).filter(x => x(1).equals("ls"))
.map(x => (x(0), x(1), x(2))).toDF("id","name","class")
infoDF.createOrReplaceTempView("table_udtf")
// infoDF.printSchema()
// infoDF.show()
// spark.sql("select * from table_udtf").show(false)
spark.sql("create temporary function myudtf as 'udf.myUDTF'") //创建临时方法
spark.sql("select myudtf(class) from table_udtf").show() //使用自定义方法
}
}
class myUDTF extends GenericUDTF{
//初始化
override def initialize(argOIs: Array[ObjectInspector]): StructObjectInspector = {
val fieldName = new util.ArrayList[String]()
val fieldOIS = new util.ArrayList[ObjectInspector]()
//定义输出数据类型
fieldName.add("type")
fieldOIS.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector) //String类型
ObjectInspectorFactory.getStandardStructObjectInspector(fieldName,fieldOIS)
}
//处理
//传入 Hadoop scala spark hive hbase
/*输出 head type String
Hadoop
scala
spark
hive
hbase
*/
override def process(objects: Array[AnyRef]): Unit = {
val strs = objects(0).toString.split(" ") //将字符串切割为单个字符,形成字符数组
for( str<- strs){
val temp = new Array[String](1)
temp(0) = str
forward(temp)
}
}
//关闭资源
override def close(): Unit = {}
}
以上是关于Spark---自定义函数(UDFUDAFUDTF)的主要内容,如果未能解决你的问题,请参考以下文章