Spark/Scala:对带有数组类型列的 DataFrame 中的某些组件的操作
Posted
技术标签:
【中文标题】Spark/Scala:对带有数组类型列的 DataFrame 中的某些组件的操作【英文标题】:Spark/Scala: Operations on some components from a DataFrame with Array typed column 【发布时间】:2018-04-25 01:38:58 【问题描述】:让我用一个例子来解释我想要实现的目标。 以 DataFrame 开头,如下所示:
val df = Seq((1, "CS", 0, (0.1, 0.2, 0.4, 0.5)),
(4, "Ed", 0, (0.4, 0.8, 0.3, 0.6)),
(7, "CS", 0, (0.2, 0.5, 0.4, 0.7)),
(101, "CS", 1, (0.5, 0.7, 0.3, 0.8)),
(5, "CS", 1, (0.4, 0.2, 0.6, 0.9)))
.toDF("id", "dept", "test", "array")
+---+----+----+--------------------+
| id|dept|test| array|
+---+----+----+--------------------+
| 1| CS| 0|[0.1, 0.2, 0.4, 0.5]|
| 4| Ed| 0|[0.4, 0.8, 0.3, 0.6]|
| 7| CS| 0|[0.2, 0.5, 0.4, 0.7]|
|101| CS| 1|[0.5, 0.7, 0.3, 0.8]|
| 5| CS| 1|[0.4, 0.2, 0.6, 0.9]|
+---+----+----+--------------------+
我想根据id、dept和test列的信息来改变数组列的一些元素。我首先将索引添加到不同部门的每一行,如下所示:
@transient val w = Window.partitionBy("dept").orderBy("id")
val tempdf = df.withColumn("Index", row_number().over(w))
tempdf.show
+---+----+----+--------------------+-----+
| id|dept|test| array|Index|
+---+----+----+--------------------+-----+
| 1| CS| 0|[0.1, 0.2, 0.4, 0.5]| 1|
| 5| CS| 1|[0.4, 0.2, 0.6, 0.9]| 2|
| 7| CS| 0|[0.2, 0.5, 0.4, 0.7]| 3|
|101| CS| 1|[0.5, 0.7, 0.3, 0.8]| 4|
| 4| Ed| 0|[0.4, 0.8, 0.3, 0.6]| 1|
+---+----+----+--------------------+-----+
我想要实现的是从数组列中的一个元素中减去一个常量(0.1),它的位置对应于每个部门内行的索引。例如,在“dept==CS”的情况下,最终结果应该是:
+---+----+----+--------------------+-----+
| id|dept|test| array|Index|
+---+----+----+--------------------+-----+
| 1| CS| 0|[0.0, 0.2, 0.4, 0.5]| 1|
| 5| CS| 1|[0.4, 0.1, 0.6, 0.9]| 2|
| 7| CS| 0|[0.2, 0.5, 0.3, 0.7]| 3|
|101| CS| 1|[0.5, 0.7, 0.3, 0.7]| 4|
| 4| Ed| 0|[0.4, 0.8, 0.3, 0.6]| 1|
+---+----+----+--------------------+-----+
目前,我正在考虑使用 udf 实现这一点,如下所示:
def subUdf = udf((array: Seq[Double], dampFactor: Double, additionalIndex: Int) => additionalIndex match
case 0 => array
case _ => val temp = array.zipWithIndex
var mask = Array.fill(array.length)(0.0)
mask(additionalIndex-1) = dampFactor
val tempAdj = temp.map(x => if (additionalIndex == (x._2+1)) (x._1-mask, x._2) else x)
tempAdj.map(_._1)
)
val dampFactor = 0.1
val finaldf = tempdf.withColumn("array", subUdf(tempdf("array"), dampFactor, when(tempdf("dept") === "CS" && tempdf("test") === 0, tempdf("Index")).otherwise(lit(0)))).drop("Index")
由于重载方法导致udf编译错误:
Name: Compile Error
Message: <console>:34: error: overloaded method value - with alternatives:
(x: Double)Double <and>
(x: Float)Double <and>
(x: Long)Double <and>
(x: Int)Double <and>
(x: Char)Double <and>
(x: Short)Double <and>
(x: Byte)Double
cannot be applied to (Array[Double])
val tempAdj = temp.map(x => if (additionalIndex == (x._2+1)) (x._1-mask, x._2) else x)
^
两个相关问题:
如何解决编译错误?
我也愿意建议使用 udf 以外的方法来实现这一目标。
【问题讨论】:
【参考方案1】:如果我正确理解您的要求,您可以创建一个UDF,它采用damperFactor、数组列和窗口索引列来转换数据框,如下所示:
val df = Seq(
(1, "CS", 0, Seq(0.1, 0.2, 0.4, 0.5)),
(4, "Ed", 0, Seq(0.4, 0.8, 0.3, 0.6)),
(7, "CS", 0, Seq(0.2, 0.5, 0.4, 0.7)),
(101, "CS", 1, Seq(0.5, 0.7, 0.3, 0.8)),
(5, "CS", 1, Seq(0.4, 0.2, 0.6, 0.9))
).toDF("id", "dept", "test", "array")
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy("dept").orderBy("id")
val tempdf = df.withColumn("index", row_number().over(w))
def adjustSeq(dampFactor: Double) = udf(
(seq: Seq[Double], index: Int) =>
seq.indices.map(i =>
if (i == index - 1) seq(i) - dampFactor else seq(i)
)
)
val finaldf = tempdf.
withColumn("array", adjustSeq(0.1)($"array", $"index")).
drop("index")
finaldf.show(false)
// +---+----+----+------------------------------------+
// |id |dept|test|array |
// +---+----+----+------------------------------------+
// |1 |CS |0 |[0.0, 0.2, 0.4, 0.5] |
// |5 |CS |1 |[0.4, 0.1, 0.6, 0.9] |
// |7 |CS |0 |[0.2, 0.5, 0.30000000000000004, 0.7]|
// |101|CS |1 |[0.5, 0.7, 0.3, 0.7000000000000001] |
// |4 |Ed |0 |[0.30000000000000004, 0.8, 0.3, 0.6]|
// +---+----+----+------------------------------------+
您的示例代码似乎包含一些未在要求中描述的附加逻辑:
val finaldf = tempdf.withColumn("array", subUdf(tempdf("array"), 阻尼因子,当(tempdf(“部门”)===“CS”&& tempdf(“测试”)=== 0, tempdf("Index")).otherwise(lit(0)))).drop("Index")
要考虑额外的逻辑:
def adjustSeq(dampFactor: Double) = udf(
(seq: Seq[Double], index: Int, dept: String, test: Int) =>
(`dept`, `test`) match
case ("CS", 0) =>
seq.indices.map(i =>
if (i == index - 1) seq(i) - dampFactor else seq(i)
)
case _ => seq
)
val finaldf = tempdf.
withColumn("array", adjustSeq(0.1)($"array", $"index", $"dept", $"test")).
drop("index")
finaldf.show(false)
// +---+----+----+------------------------------------+
// |id |dept|test|array |
// +---+----+----+------------------------------------+
// |1 |CS |0 |[0.0, 0.2, 0.4, 0.5] |
// |5 |CS |1 |[0.4, 0.2, 0.6, 0.9] |
// |7 |CS |0 |[0.2, 0.5, 0.30000000000000004, 0.7]|
// |101|CS |1 |[0.5, 0.7, 0.3, 0.8] |
// |4 |Ed |0 |[0.4, 0.8, 0.3, 0.6] |
// +---+----+----+------------------------------------+
【讨论】:
谢谢Leo,你的理解是正确的,包括附加的逻辑部分。我对 UDF 的理解比较浅,Spark 中的 Catalyst 优化器没有对 UDF 进行优化。但是对于这个问题,我现在还没有一个明显的解决方案。您能评论一下 UDF 在实践中的使用频率吗?我们是否需要为使用大量 UDF 而担心? 你说得对,Spark 的内置 SQL 函数通常比 UDF 执行得更好,因此只要适用,就应该选择内置函数而不是 UDF。对于 Spark 的内置函数集未涵盖的复杂数据转换,UDF 确实开放了庞大的 Scala API 库(使用 Scala 时)。由于 Spark 和 Scala 都在 JVM 上运行(并且 Spark 是用 Scala 编写的),因此 Scala 中的 UDF 已经受到最小的性能损失。以上是关于Spark/Scala:对带有数组类型列的 DataFrame 中的某些组件的操作的主要内容,如果未能解决你的问题,请参考以下文章