Spark 2.0 数据集与数据帧
Posted
技术标签:
【中文标题】Spark 2.0 数据集与数据帧【英文标题】:Spark 2.0 Dataset vs DataFrame 【发布时间】:2016-11-14 19:44:36 【问题描述】:从 spark 2.0.1 开始我有一些问题。我阅读了很多文档,但到目前为止找不到足够的答案:
两者有什么区别df.select("foo")
df.select($"foo")
我是否理解正确
myDataSet.map(foo.someVal)
是类型安全的,不会转换为 RDD
,但会保留在 DataSet 表示/没有额外开销(2.0.0 的性能明智)
所有其他命令,例如select, .. 只是语法糖。它们不是类型安全的,可以使用映射。如果没有 map 语句,我怎么能 df.select("foo")
类型安全?
为什么我应该使用 UDF / UADF 而不是地图(假设地图保留在数据集表示中)?
【问题讨论】:
有一个项目旨在为 Spark 提供更多类型安全性,同时保持高效执行路径:typelevel/frameless 【参考方案1】:df.select("foo")
和 df.select($"foo")
之间的区别在于签名。前一个取至少一个String
,后一个取零个或多个Columns
。除此之外没有实际区别。
myDataSet.map(foo.someVal)
类型检查,但由于任何Dataset
操作都使用RDD
对象,并且与DataFrame
操作相比,开销很大。我们来看一个简单的例子:
case class FooBar(foo: Int, bar: String)
val ds = Seq(FooBar(1, "x")).toDS
ds.map(_.foo).explain
== Physical Plan ==
*SerializeFromObject [input[0, int, true] AS value#123]
+- *MapElements <function1>, obj#122: int
+- *DeserializeToObject newInstance(class $line67.$read$$iw$$iw$FooBar), obj#121: $line67.$read$$iw$$iw$FooBar
+- LocalTableScan [foo#117, bar#118]
如您所见,此执行计划需要访问所有字段并且必须访问DeserializeToObject
。
没有。一般来说,其他方法不是语法糖,并且会生成明显不同的执行计划。例如:
ds.select($"foo").explain
== Physical Plan ==
LocalTableScan [foo#117]
与之前显示的计划相比,它可以直接访问列。与其说是 API 的限制,不如说是操作语义不同的结果。
如果没有 map 语句,我如何 df.select("foo") 类型安全?
没有这样的选择。虽然类型化列允许您将静态 Dataset
转换为另一个静态类型化 Dataset
:
ds.select($"bar".as[Int])
没有类型安全。还有一些其他尝试包括类型安全优化操作,like typed aggregations,但这个实验性 API。
为什么我应该使用 UDF / UADF 而不是地图
这完全取决于您。 Spark 中的每个分布式数据结构都有自己的优点和缺点(参见例如Spark UDAF with ArrayType as bufferSchema performance issues)。
就我个人而言,我发现静态类型的 Dataset
最没用:
不提供与 Dataset[Row]
相同的优化范围(尽管它们共享存储格式和一些执行计划优化,但它并不能完全受益于代码生成或堆外存储),也不能访问所有DataFrame
的分析能力。
类型化转换是黑盒,有效地为优化器创建分析障碍。例如,选择(过滤器)不能被推送到类型转换:
ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].filter(x => true).where($"foo" === 1).explain
== Physical Plan ==
*Filter (foo#133 = 1)
+- *Filter <function1>.apply
+- *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))])
+- Exchange hashpartitioning(foo#133, 200)
+- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))])
+- LocalTableScan [foo#133, bar#134]
相比:
ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].where($"foo" === 1).explain
== Physical Plan ==
*HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))])
+- Exchange hashpartitioning(foo#133, 200)
+- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))])
+- *Filter (foo#133 = 1)
+- LocalTableScan [foo#133, bar#134]
这会影响谓词下推或投影下推等功能。
没有RDDs
那样灵活,仅原生支持一小部分类型。
as
方法转换Dataset
时,带有Encoders
的“类型安全”是有争议的。由于未使用签名对数据形状进行编码,因此编译器只能验证 Encoder
是否存在。
相关问题:
Perform a typed join in Scala with Spark Datasets Spark 2.0 DataSets groupByKey and divide operation and type safety【讨论】:
感谢您的这篇非常有用的帖子。你认为你可以详细说明关于编码器类型安全的最后一点。你的意思是编译器可以基于 Encoder 对你的操作进行类型检查,但是不能保证你读取的数据(比如从 parquet 文件中)确实匹配这个 schema? @allstar 差不多。as
应该比 as[U]: Dataset[U]
更多 as[U]: Try[Dataset[U]]
。就像asInstanceOf
... 但这只是冰山一角 - 很难保持在“可检查”路径中,具有复杂的管道(连接,Aggregators
...)【参考方案2】:
Spark Dataset
比 Spark Dataframe
更强大。小例子 - 您只能创建 Row
、Tuple
或任何原始数据类型的 Dataframe
,但 Dataset
也可以让您创建任何非原始类型的 Dataset
。即您可以从字面上创建对象类型的Dataset
。
例如:
case class Employee(id:Int,name:String)
Dataset[Employee] // is valid
Dataframe[Employee] // is invalid
【讨论】:
我可以知道拒绝我的答案的原因吗?我分享我的实践经验。你的回答是正确的,并不代表我的回答不正确。 我相信过去已经有人指出了这一点,但不知何故,评论已被删除 - 你在这里提出的这一点是无效的,因为你比较了不同类型的对象。Dataset
是一个类型构造函数。 DataFame
是一个类型,更具体地说,它是一个别名 Dataset[Row]
。您真正应该比较的是Dataset[Row]
与Dataset[U]
,其中不是Row
的子类。
@user6910411 完全同意。【参考方案3】:
DATAFRAME:DataFrame 是一种抽象,允许数据的架构视图。
案例类人(姓名:字符串,年龄:整数,地址:字符串)
定义类人
scala > val df = List (Person (“Sumanth”, 23, “BNG”)
DATAFRAME VS DATASET
DATASET:Data Set 是 Dataframe API 的扩展,是最新的抽象,它试图提供 RDD 和 Dataframe 的最佳性能。
【讨论】:
以上是关于Spark 2.0 数据集与数据帧的主要内容,如果未能解决你的问题,请参考以下文章
当我们尝试将巨大的 Pandas 数据帧(40-50 百万行)转换为 Spark 2.0 数据帧时如何提高性能