数据集上的地图功能是不是针对一列的操作进行了优化?
Posted
技术标签:
【中文标题】数据集上的地图功能是不是针对一列的操作进行了优化?【英文标题】:Is map function on Datasets optimized for operations on one column?数据集上的地图功能是否针对一列的操作进行了优化? 【发布时间】:2016-08-29 22:17:11 【问题描述】:对于DataFrame
,使用udf
和df.withColumn("newCol", myUDF("someCol"))
很容易通过一些操作生成一个新列。要在Dataset
中做这样的事情,我想我会使用map
函数:
def map[U](func: (T) ⇒ U)(implicit arg0: Encoder[U]): Dataset[U]
您必须将整个案例类 T
作为输入传递给函数。如果Dataset[T]
有很多字段/列,那么如果您只是想通过对T
的众多列之一进行操作来制作一个额外的列,那么传递整行似乎非常低效。我的问题是,Catalyst 是否足够聪明,能够对此进行优化?
【问题讨论】:
【参考方案1】:Catalyst 是否足够聪明,能够对此进行优化?
tl;dr 不。见SPARK-14083 Analyze JVM bytecode and turn closures into Catalyst expressions。
目前 Spark SQL 的 Catalyst Optimizer 无法知道您在 Scala 代码中执行的操作。
引用SPARK-14083:
Dataset API 的一大优势是类型安全,但由于严重依赖用户定义的闭包/lambda,因此会以性能为代价。这些闭包通常比表达式慢,因为我们可以更灵活地优化表达式(已知数据类型,无虚函数调用等)。在许多情况下,查看这些闭包的字节码并弄清楚它们要做什么实际上并不难。如果我们能理解它们,那么我们就可以将它们直接转化为 Catalyst 表达式,以实现更优化的执行。
甚至提到了你的情况:
df.map(_.name)
// 等价于表达式col("name")
如您所见,它仍处于打开状态,我怀疑目前是否有人从事此工作。
您可以帮助 Spark Optimizer 的方法是 select
那一列,然后才使用带有单参数 UDF 的 map
运算符。
这肯定符合您不将整个 JVM 对象传递给您的函数的要求,但不会摆脱这种从内部行表示到 Scala 对象的缓慢反序列化(这将落在 JVM 上并占用一些空间,直到GC 发生)。
【讨论】:
【参考方案2】:我试图弄清楚自己,因为我在任何地方都找不到回应。
让我们有一个包含多个字段的案例类的数据集:
scala> case class A(x: Int, y: Int)
scala> val dfA = spark.createDataset[A](Seq(A(1, 2)))
scala> val dfX = dfA.map(_.x)
现在,如果我们检查优化后的计划,我们会得到以下信息:
scala> val plan = dfX.queryExecution.optimizedPlan
SerializeFromObject [input[0, int, true] AS value#8]
+- MapElements <function1>, obj#7: int
+- DeserializeToObject newInstance(class A), obj#6: A
+- LocalRelation [x#2, y#3]
根据更详细的plan.toJSON
DeserializeToObject
步骤假定x
和y
都存在。
正如您所证明的那样,例如以下 sn-p 它使用反射而不是直接接触仍然有效的 A
的字段。
val dfX = dfA.map(
_.getClass.getMethods.find(_.getName == "x").get.invoke(x).asInstanceOf[Int]
)
【讨论】:
以上是关于数据集上的地图功能是不是针对一列的操作进行了优化?的主要内容,如果未能解决你的问题,请参考以下文章