Spark---内置函数
Posted Shall潇
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark---内置函数相关的知识,希望对你有一定的参考价值。
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
/*
* 内置函数
* */
object SparkInnerFuncDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("innerFunction")
val sc = SparkContext.getOrCreate(conf)
val spark = SparkSession.builder().config(conf).config("hive.metastore.uris","thrift://192.168.159.100:9083")
.enableHiveSupport().getOrCreate()
//模拟用户访问日志信息
val accessLog = Array(
"2016-12-27,001",
"2016-12-27,001",
"2016-12-27,002",
"2016-12-28,003",
"2016-12-28,004",
"2016-12-28,002",
"2016-12-28,002",
"2016-12-28,001"
)
val row = sc.parallelize(accessLog, 3).map(x => {
val strs = x.split(",")
Row(strs(0), strs(1).toInt)
})
val schema = StructType(Array(
StructField("day", StringType),
StructField("userid", IntegerType)
))
val frame = spark.createDataFrame(row,schema) //创建DF
// frame.printSchema()
// frame.show()
import org.apache.spark.sql.functions._ //调用像agg一样的spark内置函数
//每天的总访问量:PV
frame.groupBy("day").agg(count("userid").as("pv"))
.select("day","pv").show()
//每天去重访问量 --- 一个用户访问多次只算一次:UV
frame.groupBy("day").agg(countDistinct("userid").as("uv"))
.select("day","uv").show()
}
}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
case class Student(id:Integer,name:String,gender:String,age:Integer)
object InnerFuncDemo2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("innerFunction2")
val sc = SparkContext.getOrCreate(conf)
val spark = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
val students = Seq(
Student(1, "zhangsan", "F", 20),
Student(2, "lisi", "M", 24),
Student(3, "wangwu", "M", 23),
Student(4, "zhaoliu", "F", 21),
Student(5, "qianqi", "F", 45),
Student(6, "sunba", "F", 39),
Student(7, "zhoujiu", "F", 43),
Student(8, "wuhuang", "M", 29),
Student(9, "zhengtan", "F", 24),
Student(10, "xujiao", "M", 22),
Student(11, "hanxing", "F", 24)
)
val stuDF = spark.createDataFrame(students)
// stuDF.printSchema()
// stuDF.show()
//求平均年龄
// stuDF.agg(avg("age")).show()
//求各个性别的平均年龄
// stuDF.groupBy("gender").agg(avg("age").as("avgAge"))
// .select("gender","avgAge").show()
//表达式
// stuDF.groupBy("gender").agg("age"->"avg","age"->"max","age"->"min").show()
//多个分组条件
// stuDF.groupBy("gender","age").count().show()
//排序
// stuDF.sort("age").show() //默认升序
// stuDF.sort($"age".desc).show() //降序排序
//对性别进行分组,统计平均年龄,按年龄降序排序
stuDF.groupBy("gender").agg(avg("age").as("avgAge")).sortWithinPartitions($"avgAge".desc).show()
}
}
以上是关于Spark---内置函数的主要内容,如果未能解决你的问题,请参考以下文章