Spark之UDF
Posted yszd
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark之UDF相关的知识,希望对你有一定的参考价值。
1 package big.data.analyse.udfudaf 2 3 import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} 4 import org.apache.spark.sql.{Row, SparkSession} 5 6 /** 7 * Created by zhen on 2018/11/25. 8 */ 9 object SparkUdfUdaf { 10 def isAdult(age : Int) ={ 11 if(age > 18){ 12 true 13 }else{ 14 false 15 } 16 } 17 def main(args: Array[String]) { 18 val spark = SparkSession 19 .builder() 20 .appName("UdfUdaf") 21 .master("local[2]") 22 .getOrCreate() 23 val userData = Array( 24 "2015,11,www.baidu.com", 25 "2016,14,www.google.com", 26 "2017,13,www.apache.com", 27 "2015,21,www.spark.com", 28 "2016,32,www.hadoop.com", 29 "2017,18,www.solr.com", 30 "2017,14,www.hive.com" 31 ) 32 val sc = spark.sparkContext 33 val sqlContext = spark.sqlContext 34 val userDataRDD = sc.parallelize(userData) // 转化为RDD 35 val userDataType = userDataRDD.map(line => { 36 val Array(age, id, url) = line.split(",") 37 Row( 38 age, id.toInt, url 39 ) 40 }) 41 val structTypes = StructType(Array( 42 StructField("age", StringType, true), 43 StructField("id", IntegerType, true), 44 StructField("url", StringType, true) 45 )) 46 // RDD转化为DataFrame 47 val userDataFrame = sqlContext.createDataFrame(userDataType,structTypes) 48 // 注冊临时表 49 userDataFrame.createOrReplaceTempView("udf") 50 // 注册udf(方式一) 51 spark.udf.register("getLength", (str : String) => str.length) 52 // 注册udf(方式二) 53 spark.udf.register("isAdult", isAdult _) 54 //执行sql 55 val sql = "select * from udf where getLength(udf.url)=13 and isAdult(udf.id)" 56 val result = sqlContext.sql(sql) 57 result.foreach(println(_)) 58 } 59 }
结果:
以上是关于Spark之UDF的主要内容,如果未能解决你的问题,请参考以下文章