Spark成长之路(13)-DataSet与DataFrame
Posted Q博士
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark成长之路(13)-DataSet与DataFrame相关的知识,希望对你有一定的参考价值。
前言
spark1.6中引入了DataSet和DataFrame的概念,然后Spark SQL的API也是基于这两个概念的,到2.2推出的稳定版本的Structured Streaming也是依靠Spark SQL的API,到Spark MLib也开始由RDD的API转换为DataFrame的API,这一步步的动作表示,未来的Spark根基是DataSet。所以也就引出了这篇文章,作者意图把DataSet认真的了解一下。
源码
我们先来看看这两个对象的源码。
DataFrame
package object sql
...
type DataFrame = Dataset[Row]
可以看出DataFrame
是Dataset
的特例,所以我先了解Dataset这个接口。
Dataset
private[sql] object Dataset
def apply[T: Encoder](sparkSession: SparkSession, logicalPlan: LogicalPlan): Dataset[T] =
new Dataset(sparkSession, logicalPlan, implicitly[Encoder[T]])
def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame =
val qe = sparkSession.sessionState.executePlan(logicalPlan)
qe.assertAnalyzed()
new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
class Dataset[T] private[sql](
@transient val sparkSession: SparkSession,
@DeveloperApi @InterfaceStability.Unstable @transient val queryExecution: QueryExecution,
encoder: Encoder[T])
extends Serializable
Dataset类似于RDD,但是,不同于RDD的是,Dataset有特殊的编码器去序列化Jvm对象以及网络数据,开发过spark程序的都知道,之前RDD一般会使用kryo或者java序列化器来做这些工作。序列化是把对象序列化成二进制,Dataset可以直接操作二进制数据,进行相关的转换操作,这比RDD要强大。
因为Dataset是强类型数据集,使用时必须说明数据类型。
创建dataset
case class Person(name: String, age: Int)
def main(args: Array[String]): Unit =
import spark.implicits._
val caseClassDF = Seq(Person("Andy", 32)).toDS
caseClassDF.show()
上面代码会自动把数据映射为一张表,表的列名分别是name
和age
。我们来看一下执行结果:
+----+---+
|name|age|
+----+---+
|Andy| 32|
+----+---+
可以看出来,sparksession
自动将一个Person
对象作为表中一条数据。
如果我们不用Person
包装呢,代码修改成如下:
> val caseClassDF = Seq("Andy", "doctorq").toDS
+-------+
| value|
+-------+
| Andy|
|doctorq|
+-------+
默认给出一个列名”value”,每一条数据当成一个记录。如果一行数据有多个属性,还是用对象包装一下比较方便。
读取json串
val path = "src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
SparkSession
提供的读取外部文件的接口非常的简单。
RDD转换为Dataset
val peopleDF = spark.sparkContext
.textFile("src/main/resources/people.txt")
.map(_.split(","))
.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
.toDS()
总结
得到DataSet
对象的方式有很多,普通的集合,读取外部数据,都可以很方便的转换为DS
,但是要记得引入隐式转换。import spark.implicits._
,但是有一点就是需要指定类型。
DataFrame
DataFrame为Dataset类型为Row对象,我们来看一下Row对象:
定义了行中的元素个数,数据类型schema,用来定义属性类型。这个Row
也类似一个集合类型,里面定义了集合的大小,以及每个元素的类型,还定义了一些获取某个位置的值。
我们在之前说Dataset
之前,都说了创建Dataset
都要指定数据类型,而DataFrame
是特殊的,只需要用Row
太泛指,其他交给Row
来处理,这就把一些无法定义对象的,用DF
来代替了。
总结
Dataset
作为新版本的spark
基础数据集,未来还将逐渐替代RDD
,正式成为底层支持,目前的流式计算和机器学习库的转移,都是这方面的信号。之前看文章说Dataset
是继承RDD
的,但实际上是独立存在的,不过可以由RDD
转换而来。
以上是关于Spark成长之路(13)-DataSet与DataFrame的主要内容,如果未能解决你的问题,请参考以下文章