RDD, DataFrame,DataSet区别与相互转化

Posted bitcarmanlee

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RDD, DataFrame,DataSet区别与相互转化相关的知识,希望对你有一定的参考价值。

1.三者的区别

1.1 RDD

spark中提供了RDD, DataFrame,DataSet三组API(本质是两组),那么这三组API有啥区别,下面我们来进行一下剖析。
RDD(Resilient Distributed Dataset)是spark中最早出现的api,从一开始 RDD 就是 Spark 提供的面向用户的主要 API。从根本上来说,一个 RDD 就是你的数据的一个不可变的分布式元素集合,在集群中跨节点分布,可以通过若干提供了转换和处理的底层 API 进行并行处理。

那么我们在什么情况下使用RDD会比较合适?
1.你希望可以对你的数据集进行最基本的转换、处理和控制;
2.你的数据是非结构化的,比如流媒体或者字符流;
3.你想通过函数式编程而不是特定领域内的表达来处理你的数据;
4.你不希望像进行列式处理一样定义一个模式,通过名字或字段来处理或访问数据属性;
5.你并不在意通过 DataFrame 和 Dataset 进行结构化和半结构化数据处理所能获得的一些优化和性能上的好处;
(上面内容来自参考文献1)

1.2 DataFrame&DataSet

与 RDD 相似, DataFrame 也是数据的一个不可变分布式集合。但与 RDD 不同的是,数据都被组织到有名字的列中,就像关系型数据库中的表一样。设计 DataFrame 的目的就是要让对大型数据集的处理变得更简单,它让开发者可以为分布式的数据集指定一个模式,进行更高层次的抽象。它提供了特定领域内专用的 API 来处理你的分布式数据,并让更多的人可以更方便地使用 Spark,而不仅限于专业的数据工程师。

从 Spark 2.0 开始,Dataset 开始具有两种不同类型的 API 特征:有明确类型的 API 和无类型的 API。从概念上来说,你可以把 DataFrame 当作一些通用对象 Dataset[Row] 的集合的一个别名,而一行就是一个通用的无类型的 JVM 对象。与之形成对比,Dataset 就是一些有明确类型定义的 JVM 对象的集合,通过你在 Scala 中定义的 Case Class 或者 Java 中的 Class 来指定。

该什么时候使用 DataFrame 或 Dataset 呢?
1.如果你需要丰富的语义、高级抽象和特定领域专用的 API,那就使用 DataFrame 或 Dataset;
2.如果你的处理需要对半结构化数据进行高级处理,如 filter、map、aggregation、average、sum、SQL 查询、列式访问或使用 lambda 函数,那就使用 DataFrame 或 Dataset;
3.如果你想在编译时就有高度的类型安全,想要有类型的 JVM 对象,用上 Catalyst 优化,并得益于 Tungsten 生成的高效代码,那就使用 Dataset;
4.如果你想在不同的 Spark 库之间使用一致和简化的 API,那就使用 DataFrame 或 Dataset;
5.如果你是 R 语言使用者,就用 DataFrame;
6.如果你是 Python 语言使用者,就用 DataFrame,在需要更细致的控制时就退回去使用 RDD;

以上内容来自参考文献1,如果有需要可以直接看原文。

2. RDD与DataFrame相互转化

下面我们通过代码来演示RDD与DataFrame的相互转化。

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.IntegerType, StringType, StructField, StructType
import org.apache.spark.sql.Row
import org.junit.Test

case class ExampleClass(name: String, num: Int) extends Serializable

class T7 

  @Test
  def t1() = 
    val sparkConf = new SparkConf().setMaster("local[2]")
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    import spark.implicits._
    import org.apache.spark.sql.functions._

    val seq = Seq(("a", 1), ("b", 2), ("c", 3))
    val df = spark.sparkContext.parallelize(seq, 1)
      .toDF("name", "num")

    val result = df.agg(max("name") as "maxname", max("num") as "maxnum")
    result.show()
  

  @Test
  def t2() = 
    val sparkConf = new SparkConf().setMaster("local[2]")
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    import scala.collection.JavaConversions._

    val schema = StructType(Array(
      StructField("name", StringType, true),
      StructField("num", IntegerType, true)
    ))
    val seq = Seq(Row("a", 1), Row("b", 2), Row("c", 3))
    spark.createDataFrame(seq, schema)
      .rdd
      .map(x => 
        val name = x.getAs[String]("name")
        val num = x.getAs[Int]("num")
        (name, num)
      )
      .take(3)
      .foreach(println)
  

对于RDD转DataFrame,只需要toDF即可。
而对于DataFrame转RDD,直接调用.rdd方法就行。

3.RDD转DataSet

  @Test
  def t3() = 
    val sparkConf = new SparkConf().setMaster("local[2]")
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()

    import spark.implicits._
    import org.apache.spark.sql.functions._

    val seq = Seq(("a", 1), ("b", 2), ("c", 3))
    val ds = spark.sparkContext.parallelize(seq, 1)
      .map x => ExampleClass(x._1, x._2) 
      .toDS

    val result = ds.agg(max("name") as "maxname", max("num") as "maxnum")
    result.show(10)
  

想要将RDD转化为DataSet,只需要配合case class,调用toDS方法。

4.DataFrame与DataSet相互转化

  @Test
  def t4() = 
    val sparkConf = new SparkConf().setMaster("local[2]")
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    import spark.implicits._

    val seq = Seq(("a", 1), ("b", 2), ("c", 3))
    val df = spark.sparkContext.parallelize(seq, 1)
      .toDF("name", "num")

    val ds = df.as[ExampleClass]
    ds.select("name")
      .show(10)
  

  @Test
  def t5() = 
    val sparkConf = new SparkConf().setMaster("local[2]")
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    import spark.implicits._

    val seq = Seq(("a", 1), ("b", 2), ("c", 3))
    val ds = spark.sparkContext.parallelize(seq, 1)
      .map x => ExampleClass(x._1, x._2) 
      .toDS

    val df = ds.toDF
    df.show(10)
  

DataFrame转DataSet,配合case class 直接用as[YourClass]即可。
而将DataSet转成DataFrame,用toDF方法就行。

参考文献:
1.https://www.infoq.cn/article/three-apache-spark-apis-rdds-dataframes-and-datasets/

以上是关于RDD, DataFrame,DataSet区别与相互转化的主要内容,如果未能解决你的问题,请参考以下文章

RDD,DataFrame和DataSet的区别

Spark中RDDDataFrame和DataSet的区别与联系

Spark中RDDDataFrame和DataSet的区别与联系

Spark中RDDDataFrame和DataSet的区别与联系

Spark中RDDDataFrame和DataSet的区别与联系

Spark RDD,DataFrame和DataSet的区别