Spark/Scala:对带有数组类型列的 DataFrame 中的某些组件的操作

Posted

技术标签:

【中文标题】Spark/Scala:对带有数组类型列的 DataFrame 中的某些组件的操作【英文标题】:Spark/Scala: Operations on some components from a DataFrame with Array typed column 【发布时间】:2018-04-25 01:38:58 【问题描述】:

让我用一个例子来解释我想要实现的目标。 以 DataFrame 开头,如下所示:

val df = Seq((1, "CS", 0, (0.1, 0.2, 0.4, 0.5)), 
             (4, "Ed", 0, (0.4, 0.8, 0.3, 0.6)),
             (7, "CS", 0, (0.2, 0.5, 0.4, 0.7)),
             (101, "CS", 1, (0.5, 0.7, 0.3, 0.8)),
             (5, "CS", 1, (0.4, 0.2, 0.6, 0.9)))
             .toDF("id", "dept", "test", "array")
+---+----+----+--------------------+
| id|dept|test|               array|
+---+----+----+--------------------+
|  1|  CS|   0|[0.1, 0.2, 0.4, 0.5]|
|  4|  Ed|   0|[0.4, 0.8, 0.3, 0.6]|
|  7|  CS|   0|[0.2, 0.5, 0.4, 0.7]|
|101|  CS|   1|[0.5, 0.7, 0.3, 0.8]|
|  5|  CS|   1|[0.4, 0.2, 0.6, 0.9]|
+---+----+----+--------------------+

我想根据id、dept和test列的信息来改变数组列的一些元素。我首先将索引添加到不同部门的每一行,如下所示:

@transient val w = Window.partitionBy("dept").orderBy("id")
val tempdf = df.withColumn("Index", row_number().over(w))
tempdf.show
+---+----+----+--------------------+-----+
| id|dept|test|               array|Index|
+---+----+----+--------------------+-----+
|  1|  CS|   0|[0.1, 0.2, 0.4, 0.5]|    1|
|  5|  CS|   1|[0.4, 0.2, 0.6, 0.9]|    2|
|  7|  CS|   0|[0.2, 0.5, 0.4, 0.7]|    3|
|101|  CS|   1|[0.5, 0.7, 0.3, 0.8]|    4|
|  4|  Ed|   0|[0.4, 0.8, 0.3, 0.6]|    1|
+---+----+----+--------------------+-----+

我想要实现的是从数组列中的一个元素中减去一个常量(0.1),它的位置对应于每个部门内行的索引。例如,在“dept==CS”的情况下,最终结果应该是:

+---+----+----+--------------------+-----+
| id|dept|test|               array|Index|
+---+----+----+--------------------+-----+
|  1|  CS|   0|[0.0, 0.2, 0.4, 0.5]|    1|
|  5|  CS|   1|[0.4, 0.1, 0.6, 0.9]|    2|
|  7|  CS|   0|[0.2, 0.5, 0.3, 0.7]|    3|
|101|  CS|   1|[0.5, 0.7, 0.3, 0.7]|    4|
|  4|  Ed|   0|[0.4, 0.8, 0.3, 0.6]|    1|
+---+----+----+--------------------+-----+

目前,我正在考虑使用 udf 实现这一点,如下所示:

def subUdf = udf((array: Seq[Double], dampFactor: Double, additionalIndex: Int) => additionalIndex match
   case 0 => array
   case _ =>  val temp = array.zipWithIndex
     var mask = Array.fill(array.length)(0.0)
     mask(additionalIndex-1) = dampFactor
     val tempAdj = temp.map(x => if (additionalIndex == (x._2+1)) (x._1-mask, x._2) else x)
       tempAdj.map(_._1)
             
      
  )
val dampFactor = 0.1
val finaldf = tempdf.withColumn("array", subUdf(tempdf("array"), dampFactor, when(tempdf("dept") === "CS" && tempdf("test") === 0, tempdf("Index")).otherwise(lit(0)))).drop("Index")

由于重载方法导致udf编译错误:

Name: Compile Error
Message: <console>:34: error: overloaded method value - with alternatives:
  (x: Double)Double <and>
  (x: Float)Double <and>
  (x: Long)Double <and>
  (x: Int)Double <and>
  (x: Char)Double <and>
  (x: Short)Double <and>
  (x: Byte)Double
 cannot be applied to (Array[Double])
            val tempAdj = temp.map(x => if (additionalIndex == (x._2+1)) (x._1-mask, x._2) else x)
           ^

两个相关问题:

    如何解决编译错误?

    我也愿意建议使用 udf 以外的方法来实现这一目标。

【问题讨论】:

【参考方案1】:

如果我正确理解您的要求,您可以创建一个UDF,它采用damperFactor、数组列和窗口索引列来转换数据框,如下所示:

val df = Seq(
  (1, "CS", 0, Seq(0.1, 0.2, 0.4, 0.5)), 
  (4, "Ed", 0, Seq(0.4, 0.8, 0.3, 0.6)),
  (7, "CS", 0, Seq(0.2, 0.5, 0.4, 0.7)),
  (101, "CS", 1, Seq(0.5, 0.7, 0.3, 0.8)),
  (5, "CS", 1, Seq(0.4, 0.2, 0.6, 0.9))
).toDF("id", "dept", "test", "array")

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy("dept").orderBy("id")
val tempdf = df.withColumn("index", row_number().over(w))

def adjustSeq(dampFactor: Double) = udf(
  (seq: Seq[Double], index: Int) =>
    seq.indices.map(i =>
      if (i == index - 1) seq(i) - dampFactor else seq(i)
    )
)

val finaldf = tempdf.
  withColumn("array", adjustSeq(0.1)($"array", $"index")).
  drop("index")

finaldf.show(false)
// +---+----+----+------------------------------------+
// |id |dept|test|array                               |
// +---+----+----+------------------------------------+
// |1  |CS  |0   |[0.0, 0.2, 0.4, 0.5]                |
// |5  |CS  |1   |[0.4, 0.1, 0.6, 0.9]                |
// |7  |CS  |0   |[0.2, 0.5, 0.30000000000000004, 0.7]|
// |101|CS  |1   |[0.5, 0.7, 0.3, 0.7000000000000001] |
// |4  |Ed  |0   |[0.30000000000000004, 0.8, 0.3, 0.6]|
// +---+----+----+------------------------------------+

您的示例代码似乎包含一些未在要求中描述的附加逻辑:

val finaldf = tempdf.withColumn("array", subUdf(tempdf("array"), 阻尼因子,当(tempdf(“部门”)===“CS”&& tempdf(“测试”)=== 0, tempdf("Index")).otherwise(lit(0)))).drop("Index")

要考虑额外的逻辑:

def adjustSeq(dampFactor: Double) = udf(
  (seq: Seq[Double], index: Int, dept: String, test: Int) =>
    (`dept`, `test`) match 
      case ("CS", 0) =>
        seq.indices.map(i =>
          if (i == index - 1) seq(i) - dampFactor else seq(i)
        )
      case _ => seq
    
)

val finaldf = tempdf.
  withColumn("array", adjustSeq(0.1)($"array", $"index", $"dept", $"test")).
  drop("index")

finaldf.show(false)
// +---+----+----+------------------------------------+
// |id |dept|test|array                               |
// +---+----+----+------------------------------------+
// |1  |CS  |0   |[0.0, 0.2, 0.4, 0.5]                |
// |5  |CS  |1   |[0.4, 0.2, 0.6, 0.9]                |
// |7  |CS  |0   |[0.2, 0.5, 0.30000000000000004, 0.7]|
// |101|CS  |1   |[0.5, 0.7, 0.3, 0.8]                |
// |4  |Ed  |0   |[0.4, 0.8, 0.3, 0.6]                |
// +---+----+----+------------------------------------+

【讨论】:

谢谢Leo,你的理解是正确的,包括附加的逻辑部分。我对 UDF 的理解比较浅,Spark 中的 Catalyst 优化器没有对 UDF 进行优化。但是对于这个问题,我现在还没有一个明显的解决方案。您能评论一下 UDF 在实践中的使用频率吗?我们是否需要为使用大量 UDF 而担心? 你说得对,Spark 的内置 SQL 函数通常比 UDF 执行得更好,因此只要适用,就应该选择内置函数而不是 UDF。对于 Spark 的内置函数集未涵盖的复杂数据转换,UDF 确实开放了庞大的 Scala API 库(使用 Scala 时)。由于 Spark 和 Scala 都在 JVM 上运行(并且 Spark 是用 Scala 编写的),因此 Scala 中的 UDF 已经受到最小的性能损失。

以上是关于Spark/Scala:对带有数组类型列的 DataFrame 中的某些组件的操作的主要内容,如果未能解决你的问题,请参考以下文章

spark scala比较具有时间戳列的数据帧

从 HDFS 加载数据 -Spark Scala [重复]

使用 Spark Scala 对数组元素求和

在 spark scala 中对数据框的每一列进行排序

如何对 spark scala RDD 中的元组列表/数组执行转换?

spark scala数据框中所有列的值都为空