Spark---DataFrame
Posted Shall潇
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark---DataFrame相关的知识,希望对你有一定的参考价值。
DataFrame (Spark 1.4+)
DataFrame = Dataset[Row]
类似传统数据的二维表格
在RDD基础上加入了Schema(数据结构信息)
DataFrame Schema支持嵌套数据类型
struct、map、array
提供更多类似SQL操作的API
DataFrame常用操作
package testrdd
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
object TestDataFrame {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("dateSet")
val sc = SparkContext.getOrCreate(conf)
val spark = SparkSession.builder().master("local[*]").appName("dataset").getOrCreate()
import spark.implicits._ //不能少
//第一种方式用RDD执行
val frame = spark.read.json("in/user.json")
frame.printSchema()
frame.select("age").show() //两种方式,一种String,一种Column
frame.select(frame("name")).show()
frame.select(frame("name"),(frame("age")+1).as("年龄")).show() //直接对查询字段操作,as :重命名
val ageFrame = frame.select(frame("age")>22).withColumnRenamed("(age > 22)","年龄").show() //改字段名的第1中方式
frame.withColumnRenamed("age","年龄").show() //改字段名的第2中方式
frame.withColumn("姓名",frame("name")).drop(frame("name")).show() //改字段名的第3中方式
frame.groupBy("age").count().show() //分组查询
frame.filter(frame("age")>22).show() //过滤
println(frame.first().toString()) //第一个行,返回的是Row对象
frame.take(2).foreach(x=>println(x.toString())) //前N行数据
frame.select("name","age").sample(false,0.1).show() //抽样
//第二种:用SQL执行
//frame 转成 视图进行操作 --- 进行sql语句操作
frame.createOrReplaceTempView("user")
spark.sql("select age,count(1) c from user group by age having c>1").show()
}
}
DataFrame自定义字段和与RDD转换
package testrdd
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
/*
* DataFrame -> RDD 导包别导错了
* */
object TestDataFrame2 {
def main(args: Array[String]): Unit = {
//需要schema 和 Row --- 因为 Dataset[Row] = DataFrame
val conf = new SparkConf().setMaster("local[*]").setAppName("SparkConf")
val sc = SparkContext.getOrCreate(conf)
val spark = SparkSession.builder().master("local[*]").appName("schema").getOrCreate()
val people = sc.textFile("in/people.txt")
val schemaString = "id name age" //根据数据自定义字段
//第一种方式 --- 只能是String类型
import spark.implicits._
val schema = StructType(schemaString.split(" ").map(x=>StructField(x,StringType,true))) //schema
val row:RDD[Row] = people.map(x=>x.split(" ")).map(x=>Row(x(0),x(1),x(2))) //Row
val peopleDF = spark.createDataFrame(row,schema)
peopleDF.printSchema() //可以看一下全部是String类型,因为在设置schema的时候,定死了
peopleDF.select("age").show()
peopleDF.createOrReplaceTempView("people") //可以转成视图 ,用 sql去处理数据
peopleDF.rdd.collect // DF -> RDD ,用算子去处理数据
//第二种方式 --- 自定义各个字段类型
val fields = Array(
StructField("id", IntegerType, false),
StructField("name", StringType, true),
StructField("age", IntegerType, true)
)
val schema1 = StructType(fields)
val row1:RDD[Row] = people.map(_.split(" ")).map(x=>Row(x(0).toInt,x(1),x(2).toInt))
val peopledf = spark.createDataFrame(row1,schema1)
peopledf.printSchema()
peopledf.show()
}
}
以上是关于Spark---DataFrame的主要内容,如果未能解决你的问题,请参考以下文章