Spark---DataFrame

Posted Shall潇

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark---DataFrame相关的知识,希望对你有一定的参考价值。

DataFrame (Spark 1.4+)
DataFrame = Dataset[Row]
类似传统数据的二维表格
在RDD基础上加入了Schema(数据结构信息)
DataFrame Schema支持嵌套数据类型
struct、map、array
提供更多类似SQL操作的API

DataFrame常用操作

package testrdd

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession

object TestDataFrame {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("dateSet")
    val sc = SparkContext.getOrCreate(conf)
    val spark = SparkSession.builder().master("local[*]").appName("dataset").getOrCreate()
    import spark.implicits._  					//不能少
    
	//第一种方式用RDD执行
    val frame = spark.read.json("in/user.json")
    frame.printSchema()
	frame.select("age").show()  //两种方式,一种String,一种Column
	frame.select(frame("name")).show()
	frame.select(frame("name"),(frame("age")+1).as("年龄")).show()   //直接对查询字段操作,as :重命名
	val ageFrame = frame.select(frame("age")>22).withColumnRenamed("(age > 22)","年龄").show()  //改字段名的第1中方式
	frame.withColumnRenamed("age","年龄").show()                        //改字段名的第2中方式
	frame.withColumn("姓名",frame("name")).drop(frame("name")).show()  //改字段名的第3中方式
	frame.groupBy("age").count().show()   //分组查询
	frame.filter(frame("age")>22).show()  //过滤
	println(frame.first().toString())    //第一个行,返回的是Row对象
	frame.take(2).foreach(x=>println(x.toString()))  //前N行数据
	frame.select("name","age").sample(false,0.1).show()  //抽样
	
	//第二种:用SQL执行
    //frame 转成 视图进行操作 --- 进行sql语句操作
    frame.createOrReplaceTempView("user")
    spark.sql("select age,count(1) c from user group by age having c>1").show()
  }
}

DataFrame自定义字段和与RDD转换

package testrdd

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}   
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

/*
* DataFrame -> RDD   导包别导错了
* */

object TestDataFrame2 {
  def main(args: Array[String]): Unit = {

    //需要schema 和 Row --- 因为 Dataset[Row] = DataFrame

    val conf = new SparkConf().setMaster("local[*]").setAppName("SparkConf")
    val sc = SparkContext.getOrCreate(conf)
    val spark = SparkSession.builder().master("local[*]").appName("schema").getOrCreate()
    val people = sc.textFile("in/people.txt")

    val schemaString = "id name age"   //根据数据自定义字段

    //第一种方式  --- 只能是String类型
    import spark.implicits._
    val schema = StructType(schemaString.split(" ").map(x=>StructField(x,StringType,true)))  //schema
    val row:RDD[Row] = people.map(x=>x.split(" ")).map(x=>Row(x(0),x(1),x(2)))               //Row

    val peopleDF = spark.createDataFrame(row,schema)
    peopleDF.printSchema()   //可以看一下全部是String类型,因为在设置schema的时候,定死了
    peopleDF.select("age").show()

    peopleDF.createOrReplaceTempView("people")   //可以转成视图 ,用 sql去处理数据
    peopleDF.rdd.collect    // DF -> RDD ,用算子去处理数据
    

    //第二种方式  --- 自定义各个字段类型
    val fields = Array(
      StructField("id", IntegerType, false),
      StructField("name", StringType, true),
      StructField("age", IntegerType, true)
    )
    val schema1 = StructType(fields)
    val row1:RDD[Row] = people.map(_.split(" ")).map(x=>Row(x(0).toInt,x(1),x(2).toInt))
    val peopledf = spark.createDataFrame(row1,schema1)

    peopledf.printSchema()
    peopledf.show()
  }
}

以上是关于Spark---DataFrame的主要内容,如果未能解决你的问题,请参考以下文章

Spark DataFrame 聚合

spark dataFrame api操作

从 Redshift 读取到 Spark Dataframe(Spark-Redshift 模块)

使用列值作为 spark DataFrame 函数的参数

将 Spark DataFrame 转换为 Pojo 对象

如何验证 Spark Dataframe 的内容