如何在 Scala Spark 中使用具有许多条件的“.withColumn”为数据集创建新列

Posted

技术标签:

【中文标题】如何在 Scala Spark 中使用具有许多条件的“.withColumn”为数据集创建新列【英文标题】:How to create a new column for dataset using ".withColumn" with many conditions in Scala Spark 【发布时间】:2018-12-19 05:27:46 【问题描述】:

我有以下输入数组

val bins = (("bin1",1.0,2.0),("bin2",3.0,4.0),("bin3",5.0,6.0))

基本上,字符串“bin1”是指在其上过滤数据框的参考列中的值 - 根据数组中剩余两个双精度的边界条件从另一列创建新列

var number_of_dataframes = bins.length
var ctempdf = spark.createDataFrame(sc.emptyRDD[Row],train_data.schema)
ctempdf = ctempdf.withColumn(colName,col(colName))
val t1 = System.nanoTime
for ( x<- 0 to binputs.length-1)


      var tempdf = train_data.filter(col(refCol) === bins(x)._1)
      //println(binputs(x)._1)
      tempdf = tempdf.withColumn(colName,
                                 when(col(colName) < bins(x)._2, bins(x)._2)
                                 when(col(colName) > bins(x)._3, bins(x)._3)
                                 otherwise(col(colName)))
      ctempdf = ctempdf.union(tempdf)
val duration = (System.nanoTime - t1) / 1e9d
println(duration)     

上面的代码对于每一个增加的 bin 值都会逐渐缓慢地工作 - 有没有办法可以大大加快速度 - 因为这段代码再次嵌套在另一个循环中。

我使用过检查点/持久化/缓存,但这些都没有帮助

【问题讨论】:

【参考方案1】:

这里不需要迭代联合。使用o.a.s.sql.functions.map 创建一个文字map&lt;string, struct&lt;double, double&gt;&gt;(在功能上它的行为类似于延迟string =&gt; struct&lt;lower: dobule, upper: double&gt;

import org.apache.spark.sql.functions._

val bins: Seq[(String, Double Double)] = Seq(
  ("bin1",1.0,2.0),("bin2",3.0,4.0),("bin3",5.0,6.0))

val binCol = map(bins.map  
  case (key, lower, upper) => Seq(
    lit(key), 
    struct(lit(lower) as "lower", lit(upper) as "upper")) 
.flatten: _*)

定义这样的表达式(这些是预定义映射中的简单查找,因此 binCol(col(refCol)) 被延迟 struct&lt;lower: dobule, upper: double&gt; 并且剩余的 apply 采用 lowerupper 字段):

val lower = binCol(col(refCol))("lower")
val upper =  binCol(col(refCol))("upper")
val c = col(colName)

并使用CASE ... WHEN ... (Spark Equivalent of IF Then ELSE)

val result = when(c.between(lower, upper), c)
  .when(c < lower, lower)
  .when(c > upper, upper)

选择并删除NULLs:

df
  .withColumn(colName, result)
  // If value is still NULL it means we didn't find refCol key in binCol keys.
  // To mimic .filter(col(refCol) === ...) we drop the rows
  .na.drop(Seq(colName))

此解决方案假定一开始colName 中没有NULL 值,但可以轻松调整以处理不满足此假设的情况。

如果过程仍然不清楚,我建议使用文字逐步跟踪它:

spark.range(1).select(binCol as "map").show(false)
+------------------------------------------------------------+
|map                                                         |
+------------------------------------------------------------+
|[bin1 -> [1.0, 2.0], bin2 -> [3.0, 4.0], bin3 -> [5.0, 6.0]]|
+------------------------------------------------------------+
spark.range(1).select(binCol(lit("bin1")) as "value").show(false)
+----------+
|value     |
+----------+
|[1.0, 2.0]|
+----------+
spark.range(1).select(binCol(lit("bin1"))("lower") as "value").show
+-----+
|value|
+-----+
|  1.0|
+-----+

并进一步参考Querying Spark SQL DataFrame with complex types。

【讨论】:

以上是关于如何在 Scala Spark 中使用具有许多条件的“.withColumn”为数据集创建新列的主要内容,如果未能解决你的问题,请参考以下文章

scala - Spark:如何在 groupedData 中获取带有条件的结果集

如何在 if-else 条件下的列中使用 Spark 值 - Scala

如何使用 scala 根据 spark 中的条件获取 row_number()

如何使用 Scala 运行具有分类特征集的 Spark 决策树?

如何在 Spark Scala 的 UDF 中将列作为值传递以检查条件

在 spark scala 中为 withcolumn 编写通用函数