Spark成长之路(13)-DataSet与DataFrame

Posted Q博士

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark成长之路(13)-DataSet与DataFrame相关的知识,希望对你有一定的参考价值。

Datasets and DataFrames

前言

spark1.6中引入了DataSet和DataFrame的概念,然后Spark SQL的API也是基于这两个概念的,到2.2推出的稳定版本的Structured Streaming也是依靠Spark SQL的API,到Spark MLib也开始由RDD的API转换为DataFrame的API,这一步步的动作表示,未来的Spark根基是DataSet。所以也就引出了这篇文章,作者意图把DataSet认真的了解一下。

源码

我们先来看看这两个对象的源码。

DataFrame

package object sql 
  ...
  type DataFrame = Dataset[Row]

可以看出DataFrameDataset的特例,所以我先了解Dataset这个接口。

Dataset


private[sql] object Dataset 
  def apply[T: Encoder](sparkSession: SparkSession, logicalPlan: LogicalPlan): Dataset[T] = 
    new Dataset(sparkSession, logicalPlan, implicitly[Encoder[T]])
  

  def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = 
    val qe = sparkSession.sessionState.executePlan(logicalPlan)
    qe.assertAnalyzed()
    new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
  



class Dataset[T] private[sql](
    @transient val sparkSession: SparkSession,
    @DeveloperApi @InterfaceStability.Unstable @transient val queryExecution: QueryExecution,
    encoder: Encoder[T])
  extends Serializable 

Dataset类似于RDD,但是,不同于RDD的是,Dataset有特殊的编码器去序列化Jvm对象以及网络数据,开发过spark程序的都知道,之前RDD一般会使用kryo或者java序列化器来做这些工作。序列化是把对象序列化成二进制,Dataset可以直接操作二进制数据,进行相关的转换操作,这比RDD要强大。
因为Dataset是强类型数据集,使用时必须说明数据类型。

创建dataset

case class Person(name: String, age: Int)
  def main(args: Array[String]): Unit = 
    import spark.implicits._
    val caseClassDF = Seq(Person("Andy", 32)).toDS
    caseClassDF.show()
  

上面代码会自动把数据映射为一张表,表的列名分别是nameage。我们来看一下执行结果:

+----+---+
|name|age|
+----+---+
|Andy| 32|
+----+---+

可以看出来,sparksession自动将一个Person对象作为表中一条数据。

如果我们不用Person包装呢,代码修改成如下:

> val caseClassDF = Seq("Andy", "doctorq").toDS
+-------+
|  value|
+-------+
|   Andy|
|doctorq|
+-------+

默认给出一个列名”value”,每一条数据当成一个记录。如果一行数据有多个属性,还是用对象包装一下比较方便。

读取json串

val path = "src/main/resources/people.json"
    val peopleDS = spark.read.json(path).as[Person]
    peopleDS.show()

SparkSession提供的读取外部文件的接口非常的简单。

RDD转换为Dataset

val peopleDF = spark.sparkContext
      .textFile("src/main/resources/people.txt")
      .map(_.split(","))
      .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
      .toDS()

总结

得到DataSet对象的方式有很多,普通的集合,读取外部数据,都可以很方便的转换为DS,但是要记得引入隐式转换。import spark.implicits._,但是有一点就是需要指定类型。

DataFrame

DataFrame为Dataset类型为Row对象,我们来看一下Row对象:

定义了行中的元素个数,数据类型schema,用来定义属性类型。这个Row也类似一个集合类型,里面定义了集合的大小,以及每个元素的类型,还定义了一些获取某个位置的值。

我们在之前说Dataset之前,都说了创建Dataset都要指定数据类型,而DataFrame是特殊的,只需要用Row太泛指,其他交给Row来处理,这就把一些无法定义对象的,用DF来代替了。

总结

Dataset作为新版本的spark基础数据集,未来还将逐渐替代RDD,正式成为底层支持,目前的流式计算和机器学习库的转移,都是这方面的信号。之前看文章说Dataset是继承RDD的,但实际上是独立存在的,不过可以由RDD转换而来。

以上是关于Spark成长之路(13)-DataSet与DataFrame的主要内容,如果未能解决你的问题,请参考以下文章

Spark成长之路-Hypothesis testing

Spark成长之路(10)-CountVectorizer

Spark成长之路-TFIDF

Spark成长之路(11)-ngram

Spark成长之路-消息队列

spark成长之路spark究竟是什么?