Spark 2.0 数据集与数据帧

Posted

技术标签:

【中文标题】Spark 2.0 数据集与数据帧【英文标题】:Spark 2.0 Dataset vs DataFrame 【发布时间】:2016-11-14 19:44:36 【问题描述】:

从 spark 2.0.1 开始我有一些问题。我阅读了很多文档,但到目前为止找不到足够的答案:

两者有什么区别 df.select("foo") df.select($"foo") 我是否理解正确 myDataSet.map(foo.someVal) 是类型安全的,不会转换为 RDD,但会保留在 DataSet 表示/没有额外开销(2.0.0 的性能明智) 所有其他命令,例如select, .. 只是语法糖。它们不是类型安全的,可以使用映射。如果没有 map 语句,我怎么能 df.select("foo") 类型安全? 为什么我应该使用 UDF / UADF 而不是地图(假设地图保留在数据集表示中)?

【问题讨论】:

有一个项目旨在为 Spark 提供更多类型安全性,同时保持高效执行路径:typelevel/frameless 【参考方案1】:
    df.select("foo")df.select($"foo") 之间的区别在于签名。前一个取至少一个String,后一个取零个或多个Columns。除此之外没有实际区别。

    myDataSet.map(foo.someVal) 类型检查,但由于任何Dataset 操作都使用RDD 对象,并且与DataFrame 操作相比,开销很大。我们来看一个简单的例子:

    case class FooBar(foo: Int, bar: String)
    val ds = Seq(FooBar(1, "x")).toDS
    ds.map(_.foo).explain
    
    == Physical Plan ==
    *SerializeFromObject [input[0, int, true] AS value#123]
    +- *MapElements <function1>, obj#122: int
       +- *DeserializeToObject newInstance(class $line67.$read$$iw$$iw$FooBar), obj#121: $line67.$read$$iw$$iw$FooBar
          +- LocalTableScan [foo#117, bar#118]
    

    如您所见,此执行计划需要访问所有字段并且必须访问DeserializeToObject

    没有。一般来说,其他方法不是语法糖,并且会生成明显不同的执行计划。例如:

    ds.select($"foo").explain
    
    == Physical Plan ==
    LocalTableScan [foo#117]
    

    与之前显示的计划相比,它可以直接访问列。与其说是 API 的限制,不如说是操作语义不同的结果。

    如果没有 map 语句,我如何 df.select("foo") 类型安全?

    没有这样的选择。虽然类型化列允许您将静态 Dataset 转换为另一个静态类型化 Dataset

    ds.select($"bar".as[Int])
    

    没有类型安全。还有一些其他尝试包括类型安全优化操作,like typed aggregations,但这个实验性 API。

    为什么我应该使用 UDF / UADF 而不是地图

    这完全取决于您。 Spark 中的每个分布式数据结构都有自己的优点和缺点(参见例如Spark UDAF with ArrayType as bufferSchema performance issues)。

就我个人而言,我发现静态类型的 Dataset 最没用:

不提供与 Dataset[Row] 相同的优化范围(尽管它们共享存储格式和一些执行计划优化,但它并不能完全受益于代码生成或堆外存储),也不能访问所有DataFrame 的分析能力。

类型化转换是黑盒,有效地为优化器创建分析障碍。例如,选择(过滤器)不能被推送到类型转换:

ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].filter(x => true).where($"foo" === 1).explain
== Physical Plan ==
*Filter (foo#133 = 1)
+- *Filter <function1>.apply
   +- *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))])
      +- Exchange hashpartitioning(foo#133, 200)
         +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))])
            +- LocalTableScan [foo#133, bar#134]

相比:

ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].where($"foo" === 1).explain
== Physical Plan ==
*HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))])
+- Exchange hashpartitioning(foo#133, 200)
   +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))])
      +- *Filter (foo#133 = 1)
         +- LocalTableScan [foo#133, bar#134] 

这会影响谓词下推或投影下推等功能。

没有RDDs 那样灵活,仅原生支持一小部分类型。

当使用as 方法转换Dataset 时,带有Encoders 的“类型安全”是有争议的。由于未使用签名对数据形状进行编码,因此编译器只能验证 Encoder 是否存在。

相关问题:

Perform a typed join in Scala with Spark Datasets Spark 2.0 DataSets groupByKey and divide operation and type safety

【讨论】:

感谢您的这篇非常有用的帖子。你认为你可以详细说明关于编码器类型安全的最后一点。你的意思是编译器可以基于 Encoder 对你的操作进行类型检查,但是不能保证你读取的数据(比如从 parquet 文件中)确实匹配这个 schema? @allstar 差不多。 as 应该比 as[U]: Dataset[U] 更多 as[U]: Try[Dataset[U]]。就像asInstanceOf... 但这只是冰山一角 - 很难保持在“可检查”路径中,具有复杂的管道(连接,Aggregators...)【参考方案2】:

Spark Dataset 比 Spark Dataframe 更强大。小例子 - 您只能创建 RowTuple 或任何原始数据类型的 Dataframe,但 Dataset 也可以让您创建任何非原始类型的 Dataset。即您可以从字面上创建对象类型的Dataset

例如:

case class Employee(id:Int,name:String)

Dataset[Employee]   // is valid
Dataframe[Employee] // is invalid

【讨论】:

我可以知道拒绝我的答案的原因吗?我分享我的实践经验。你的回答是正确的,并不代表我的回答不正确。 我相信过去已经有人指出了这一点,但不知何故,评论已被删除 - 你在这里提出的这一点是无效的,因为你比较了不同类型的对象。 Dataset 是一个类型构造函数。 DataFame 是一个类型,更具体地说,它是一个别名 Dataset[Row]。您真正应该比较的是Dataset[Row]Dataset[U],其中不是Row 的子类。 @user6910411 完全同意。【参考方案3】:

DATAFRAME:DataFrame 是一种抽象,允许数据的架构视图。

案例类人(姓名:字符串,年龄:整数,地址:字符串)

定义类人

scala > val df = List (Person (“Sumanth”, 23, “BNG”)

DATAFRAME VS DATASET

DATASET:Data Set 是 Dataframe API 的扩展,是最新的抽象,它试图提供 RDD 和 Dataframe 的最佳性能。

【讨论】:

以上是关于Spark 2.0 数据集与数据帧的主要内容,如果未能解决你的问题,请参考以下文章

将数据集与 plotly 进行比较 - 用实线混合虚线

当我们尝试将巨大的 Pandas 数据帧(40-50 百万行)转换为 Spark 2.0 数据帧时如何提高性能

Spark 2.0 将 json 读入带有引号的数据帧中 - 与 spark 1.6 不同的行为......错误?

Spark 2.x 数据帧或数据集? [复制]

具有特征的 Spark 2.0 数据集编码器

Spark 2.0+ 即使数据帧被缓存,如果它的源之一发生变化,它会重新计算吗?