数据集上的地图功能是不是针对一列的操作进行了优化?

Posted

技术标签:

【中文标题】数据集上的地图功能是不是针对一列的操作进行了优化?【英文标题】:Is map function on Datasets optimized for operations on one column?数据集上的地图功能是否针对一列的操作进行了优化? 【发布时间】:2016-08-29 22:17:11 【问题描述】:

对于DataFrame,使用udfdf.withColumn("newCol", myUDF("someCol")) 很容易通过一些操作生成一个新列。要在Dataset 中做这样的事情,我想我会使用map 函数:

def map[U](func: (T) ⇒ U)(implicit arg0: Encoder[U]): Dataset[U]

您必须将整个案例类 T 作为输入传递给函数。如果Dataset[T] 有很多字段/列,那么如果您只是想通过对T 的众多列之一进行操作来制作一个额外的列,那么传递整行似乎非常低效。我的问题是,Catalyst 是否足够聪明,能够对此进行优化?

【问题讨论】:

【参考方案1】:

Catalyst 是否足够聪明,能够对此进行优化?

tl;dr 不。见SPARK-14083 Analyze JVM bytecode and turn closures into Catalyst expressions。

目前 Spark SQL 的 Catalyst Optimizer 无法知道您在 Scala 代码中执行的操作。

引用SPARK-14083:

Dataset API 的一大优势是类型安全,但由于严重依赖用户定义的闭包/lambda,因此会以性能为代价。这些闭包通常比表达式慢,因为我们可以更灵活地优化表达式(已知数据类型,无虚函数调用等)。在许多情况下,查看这些闭包的字节码并弄清楚它们要做什么实际上并不难。如果我们能理解它们,那么我们就可以将它们直接转化为 Catalyst 表达式,以实现更优化的执行。

甚至提到了你的情况:

df.map(_.name) // 等价于表达式col("name")

如您所见,它仍处于打开状态,我怀疑目前是否有人从事此工作。


您可以帮助 Spark Optimizer 的方法是 select 那一列,然后才使用带有单参数 UDF 的 map 运算符。

这肯定符合您不将整个 JVM 对象传递给您的函数的要求,但不会摆脱这种从内部行表示到 Scala 对象的缓慢反序列化(这将落在 JVM 上并占用一些空间,直到GC 发生)。

【讨论】:

【参考方案2】:

我试图弄清楚自己,因为我在任何地方都找不到回应。

让我们有一个包含多个字段的案例类的数据集:

scala> case class A(x: Int, y: Int)
scala> val dfA = spark.createDataset[A](Seq(A(1, 2)))
scala> val dfX = dfA.map(_.x)

现在,如果我们检查优化后的计划,我们会得到以下信息:

scala> val plan = dfX.queryExecution.optimizedPlan

SerializeFromObject [input[0, int, true] AS value#8]
    +- MapElements <function1>, obj#7: int
        +- DeserializeToObject newInstance(class A), obj#6: A
           +- LocalRelation [x#2, y#3]    

根据更详细的plan.toJSONDeserializeToObject 步骤假定xy 都存在。

正如您所证明的那样,例如以下 sn-p 它使用反射而不是直接接触仍然有效的 A 的字段。

val dfX = dfA.map(
  _.getClass.getMethods.find(_.getName == "x").get.invoke(x).asInstanceOf[Int]
)

【讨论】:

以上是关于数据集上的地图功能是不是针对一列的操作进行了优化?的主要内容,如果未能解决你的问题,请参考以下文章

hive里的优化和高级功能

基于针对另一列的参考表更新 Pandas 数据框列的问题

如何在 Spark Java 中的数据集上应用地图功能

优化大数据集上的多对多关系查询

在大型数据集上使用 rpart 包

在shell编程中,怎样对每一行每一列的数据进行操作?