Spark——DataFrame与RDD互操作方式
Posted aishanyishi
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark——DataFrame与RDD互操作方式相关的知识,希望对你有一定的参考价值。
一.引言
Spark SQL支持两种不同的方法将现有RDD转换为数据集。
1.第一种方法使用反射来推断包含特定类型对象的RDD的模式。这种基于反射的方法可以使代码更简洁,并且在编写Spark应用程序时已经了解了模式,因此可以很好地工作。详细资料参考 DataFrame与RDD互操作之反射
在开始之前现在项目的根路径下创建一个infos.txt文件,里面插入下面这种数据,以便进行学习
1,李国辉,26
2,华华,23
3,ligh,24
package com.spark import org.apache.spark.sql.SparkSession object DataFrameRDDAPP { def main(args: Array[String]): Unit = { val path="E:\data\infos.txt" val spark =SparkSession.builder().appName("DataFrameRDDAPP").master("local[2]").getOrCreate() //RDD==>DataFrame val rdd=spark.sparkContext.textFile(path) //注意导入隐式转换 import spark.implicits._ val infoDF=rdd.map(_.split(",")).map(line=>Info(line(0).toInt,line(1),line(2).toInt)).toDF() infoDF.show() infoDF.filter(infoDF.col("age")>19).show() infoDF.createOrReplaceTempView("infos") spark.sql("select *from infos where age>10").show() spark.stop() } case class Info(id:Int ,name:String ,age:Int) }
2.创建数据集的第二种方法是通过编程接口,允许您构建模式,然后将其应用于现有RDD。虽然此方法更详细,但它允许您在直到运行时才知道列及其类型时构造数据集。
DataFrame则可以通过三个步骤以编程方式创建。
1)Row从原始RDD 创建s的RDD;
2)创建由StructType匹配Row步骤1中创建的RDD中的s 结构 表示的模式。
3)Row通过createDataFrame提供的方法将模式应用于s 的RDD SparkSession。
package com.spark import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types.{StringType, StructField, StructType} /** * convert rdd to dataframe 2 * * @param spark */ object DataFrameRDDAPP2 { val spark =SparkSession.builder().appName("DataFrameRDDAPP").master("local[2]").getOrCreate() // 1.转成RDD val rdd = spark.sparkContext.textFile("E:\data\spark\infos.txt") // 2.定义schema,带有StructType的 // 定义schema信息 val schemaString = "name age" // 对schema信息按空格进行分割 // 最终fileds里包含了2个StructField val fields = schemaString.split(" ") // 字段类型,字段名称判断是不是为空 .map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema = StructType(fields) // 3.把我们的schema信息作用到RDD上 // 这个RDD里面包含了一些行 // 形成Row类型的RDD val rowRDD = rdd.map(_.split(",")) .map(x => Row(x(1), x(2).trim)) // 通过SparkSession创建一个DataFrame // 传进来一个rowRDD和schema,将schema作用到rowRDD上 val peopleDF = spark.createDataFrame(rowRDD, schema) peopleDF.show() }
以上是关于Spark——DataFrame与RDD互操作方式的主要内容,如果未能解决你的问题,请参考以下文章
sparkSQL中RDD——DataFrame——DataSet的区别
大数据技术之_27_电商平台数据分析项目_02_预备知识 + Scala + Spark Core + Spark SQL + Spark Streaming + Java 对象池