spark sql 之 RDD与DataFrame互相转化
Posted 冬冬爱分享
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark sql 之 RDD与DataFrame互相转化相关的知识,希望对你有一定的参考价值。
一、RDD转DataFrame
方法一:通过 case class 创建 DataFrames
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext object TestDataFrame { def main(args: Array[String]): Unit = { /** * 1、初始化 spark config */ val conf = new SparkConf().setAppName("TestDataFrame").setMaster("local"); /** * 2、初始化spark context */ val sc = new SparkContext(conf); /** * 3、初始化spark sql context */ val ssc = new SQLContext(sc); /** * 4、做spark sql 的df获取工作 */ val PeopleRDD = sc.textFile("F:\input.txt").map(line => People(line.split(" ")(0),line.split(" ")(1).trim.toInt)) import ssc.implicits._ var df = PeopleRDD.toDF //将DataFrame注册成临时的一张表,这张表相当于临时注册到内存中,是逻辑上的表,不会物化到磁盘 这种方式用的比较多 df.registerTempTable("peopel") var df2 =ssc.sql("select * from peopel where age > 23")show() /** * 5、spark context 结束工作 */ sc.stop(); } } case class People(var name:String ,var age : Int)
方法二:通过 structType创建 DataFrames
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.sql.DataFrame import org.apache.spark.sql.Row import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType} object TestDataFrame2{ def test2(): Unit = { /** * 1、初始化 spark config */ val conf = new SparkConf().setAppName("TestDataFrame").setMaster("local"); /** * 2、初始化spark context */ val sc = new SparkContext(conf); /** * 3、初始化spark sql context */ val ssc = new SQLContext(sc); /** * 4、做spark sql 的df获取工作 */ val peopleRDD = sc.textFile("F:\input.txt")map(line => Row(line.split(" ")(0),line.split(" ")(1).trim().toInt)) // 创建 StructType 来定义结构 val structType : StructType = StructType( StructField("name",StringType,true):: StructField("age",IntegerType,true) ::Nil ); val df : DataFrame = ssc.createDataFrame(peopleRDD, structType); df.registerTempTable("peopel"); ssc.sql("select * from peopel").show(); /** * 5、spark context 结束工作 */ sc.stop(); } }
方法三:通过json创建 DataFream
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.sql.DataFrame import org.apache.spark.sql.Row import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType} import org.apache.spark.sql.DataFrame object TestDataFrame2{ def test3() : Unit={ /** * 1、初始化 spark config */ val conf = new SparkConf().setAppName("TestDataFrame").setMaster("local"); /** * 2、初始化spark context */ val sc = new SparkContext(conf); /** * 3、初始化spark sql context */ val ssc = new SQLContext(sc); /** * 4、做spark sql 的df获取工作 */ val df :DataFrame = ssc.read.json("F:\json.json") df.registerTempTable("people") ssc.sql("select * from people").show(); /** * 5、spark context 结束工作 */ sc.stop(); } }
二、RDD转DataFrame
df.rdd
以上是关于spark sql 之 RDD与DataFrame互相转化的主要内容,如果未能解决你的问题,请参考以下文章
类型不匹配;找到:org.apache.spark.sql.DataFrame 需要:org.apache.spark.rdd.RDD
大数据技术之_27_电商平台数据分析项目_02_预备知识 + Scala + Spark Core + Spark SQL + Spark Streaming + Java 对象池