Spark RDD转换成DataFrame的两种方式

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark RDD转换成DataFrame的两种方式相关的知识,希望对你有一定的参考价值。

Spark SQL支持两种方式将现有RDD转换为DataFrame。
第一种方法使用反射来推断RDD的schema并创建DataSet然后将其转化为DataFrame。这种基于反射方法十分简便,但是前提是在您编写Spark应用程序时就已经知道RDD的schema类型。
第二种方法是通过编程接口,使用您构建的StructType,然后将其应用于现有RDD。虽然此方法很麻烦,但它允许您在运行之前并不知道列及其类型的情况下构建DataSet

    方法如下
         1.将RDD转换成Rows   
         2.按照第一步Rows的结构定义StructType  
         3.基于rows和StructType使用createDataFrame创建相应的DF

测试数据为order.data

1   小王  电视  12  2015-08-01 09:08:31
1   小王  冰箱  24  2015-08-01 09:08:14
2   小李  空调  12  2015-09-02 09:01:31

代码如下:

object RDD2DF {

  /**
    * 主要有两种方式
    *   第一种是在已经知道schema已经知道的情况下,我们使用反射把RDD转换成DS,进而转换成DF
    *   第二种是你不能提前定义好case class,例如数据的结构是以String类型存在的。我们使用接口自定义一个schema
    * @param args
    */
  def main(args: Array[String]): Unit = {

    val spark=SparkSession.builder()
      .appName("DFDemo")
      .master("local[2]")
      .getOrCreate()

//    rdd2DFFunc1(spark)

    rdd2DFFunc2(spark)
    spark.stop()
  }

  /**
    * 提前定义好case class
    * @param spark
    */
  def rdd2DFFunc1(spark:SparkSession): Unit ={
    import spark.implicits._
    val orderRDD=spark.sparkContext.textFile("F:\\JAVA\\WorkSpace\\spark\\src\\main\\resources\\order.data")
    val orderDF=orderRDD.map(_.split("\t"))
      .map(attributes=>Order(attributes(0),attributes(1),attributes(2),attributes(3),attributes(4)))
      .toDF()
    orderDF.show()
    Thread.sleep(1000000)
  }

  /**
    *总结:第二种方式就是通过最基础的DF接口方法,将
    * @param spark
    */
  def rdd2DFFunc2(spark:SparkSession): Unit ={
    //TODO:   1.将RDD转换成Rows   2.按照第一步Rows的结构定义StructType  3.基于rows和StructType使用createDataFrame创建相应的DF
    val orderRDD=spark.sparkContext.textFile("F:\\JAVA\\WorkSpace\\spark\\src\\main\\resources\\order.data")

    //TODO:   1.将RDD转换成Rows
    val rowsRDD=orderRDD
//      .filter((str:String)=>{val arr=str.split("\t");val res=arr(1)!="小李";res})
      .map(_.split("\t"))
      .map(attributes=>Row(attributes(0).trim,attributes(1),attributes(2),attributes(3).trim,attributes(4)))

    //TODO:   2.按照第一步Rows的结构定义StructType
    val schemaString="id|name|commodity|age|date"
    val fields=schemaString.split("\\|")
      .map(filedName=>StructField(filedName,StringType,nullable = true))
    val schema=StructType(fields)

    //TODO:   3.基于rows和StructType使用createDataFrame创建相应的DF
   val orderDF= spark.createDataFrame(rowsRDD,schema)
    orderDF.show()
    orderDF.groupBy("name").count().show()
    orderDF.select("name","commodity").show()
    Thread.sleep(10000000)
  }
}
case class Order(id:String,name:String,commodity:String,age:String,date:String)

以上是关于Spark RDD转换成DataFrame的两种方式的主要内容,如果未能解决你的问题,请参考以下文章

Spark中RDD转换成DataFrame的两种方式(分别用Java和Scala实现)

spark-sql将Rdd转换为DataFrame进行操作的两种方法

pyspark建立RDD以及读取文件成dataframe

RDD转换得到DataFrame

RDD转换得到DataFrame

Spark数据读取