Spark---内置函数

Posted Shall潇

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark---内置函数相关的知识,希望对你有一定的参考价值。

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}

/*
* 内置函数
* */

object SparkInnerFuncDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("innerFunction")
    val sc = SparkContext.getOrCreate(conf)
    val spark = SparkSession.builder().config(conf).config("hive.metastore.uris","thrift://192.168.159.100:9083")
      .enableHiveSupport().getOrCreate()

    //模拟用户访问日志信息
    val accessLog = Array(
      "2016-12-27,001",
      "2016-12-27,001",
      "2016-12-27,002",
      "2016-12-28,003",
      "2016-12-28,004",
      "2016-12-28,002",
      "2016-12-28,002",
      "2016-12-28,001"
    )
    val row = sc.parallelize(accessLog, 3).map(x => {
      val strs = x.split(",")
      Row(strs(0), strs(1).toInt)
    })
    val schema = StructType(Array(
      StructField("day", StringType),
      StructField("userid", IntegerType)
    ))
    val frame = spark.createDataFrame(row,schema)   //创建DF
//    frame.printSchema()
//    frame.show()
    import org.apache.spark.sql.functions._   //调用像agg一样的spark内置函数

    //每天的总访问量:PV
    frame.groupBy("day").agg(count("userid").as("pv"))
      .select("day","pv").show()

    //每天去重访问量 --- 一个用户访问多次只算一次:UV
    frame.groupBy("day").agg(countDistinct("userid").as("uv"))
      .select("day","uv").show()
  }
}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

case class Student(id:Integer,name:String,gender:String,age:Integer)

object InnerFuncDemo2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("innerFunction2")
    val sc = SparkContext.getOrCreate(conf)
    val spark = SparkSession.builder().config(conf).getOrCreate()
    import spark.implicits._

    val students = Seq(
      Student(1, "zhangsan", "F", 20),
      Student(2, "lisi", "M", 24),
      Student(3, "wangwu", "M", 23),
      Student(4, "zhaoliu", "F", 21),
      Student(5, "qianqi", "F", 45),
      Student(6, "sunba", "F", 39),
      Student(7, "zhoujiu", "F", 43),
      Student(8, "wuhuang", "M", 29),
      Student(9, "zhengtan", "F", 24),
      Student(10, "xujiao", "M", 22),
      Student(11, "hanxing", "F", 24)
    )

    val stuDF = spark.createDataFrame(students)
//    stuDF.printSchema()
//    stuDF.show()

    //求平均年龄
//    stuDF.agg(avg("age")).show()

    //求各个性别的平均年龄
//    stuDF.groupBy("gender").agg(avg("age").as("avgAge"))
//      .select("gender","avgAge").show()

    //表达式
//    stuDF.groupBy("gender").agg("age"->"avg","age"->"max","age"->"min").show()

    //多个分组条件
//      stuDF.groupBy("gender","age").count().show()

    //排序
//    stuDF.sort("age").show()   //默认升序
//    stuDF.sort($"age".desc).show()  //降序排序

    //对性别进行分组,统计平均年龄,按年龄降序排序
    stuDF.groupBy("gender").agg(avg("age").as("avgAge")).sortWithinPartitions($"avgAge".desc).show()
  }
}

以上是关于Spark---内置函数的主要内容,如果未能解决你的问题,请参考以下文章

Spark基础学习笔记29:Spark SQL内置函数

Spark闭包与序列化

Spark---内置函数

Spark SQL内置函数

python+spark程序代码片段

Spark SQL 内置函数Map Functions(基于 Spark 3.2.0)