如何优化 spark 函数以用零替换空值?

Posted

技术标签:

【中文标题】如何优化 spark 函数以用零替换空值?【英文标题】:How can I optimize the spark function to replace nulls with zeroes? 【发布时间】:2019-06-04 21:27:46 【问题描述】:

下面是我的 Spark 函数,它处理 DataFrame 列中的空值,无论其数据类型如何。

  def nullsToZero(df:DataFrame,nullsToZeroColsList:Array[String]): DataFrame =
    var y:DataFrame = df
    for(colDF <- y.columns)
      if(nullsToZeroColsList.contains(colDF))
        y = y.withColumn(colDF,expr("case when "+colDF+" IS NULL THEN 0 ELSE "+colDF+" end"))
      
    
    return y
  

    import spark.implicits._
    val personDF = Seq(
      ("miguel", Some(12),100,110,120), (null, Some(22),200,210,220), ("blu", None,300,310,320)
    ).toDF("name", "age","number1","number2","number3")
    println("Print Schema")
    personDF.printSchema()
    println("Show Original DF")
    personDF.show(false)
    val myColsList:Array[String] = Array("name","age","age")
    println("NULLS TO ZERO")
    println("Show NullsToZeroDF")
    val fixedDF = nullsToZero(personDF,myColsList)

在上面的代码中,我有一个整数类型和一个字符串类型的数据类型,两者都由我的函数处理。 但我怀疑下面这段代码,在我的函数中可能会影响性能但不确定。

y = y.withColumn(colDF,expr("case when "+colDF+" IS NULL THEN 0 ELSE "+colDF+" end"))

有没有更优化的方法可以编写这个函数,做 .withColumn() 并一次又一次地重新分配一个 DF 有什么意义? 提前谢谢你。

【问题讨论】:

【参考方案1】:

我建议为na.fill(valueMap) 组装一个valueMap 以根据数据类型用特定值填充null 列,如下所示:

import org.apache.spark.sql.functions._
import spark.implicits._

val df = Seq(
  (Some(1), Some("a"), Some("x"), None),
  (None,    Some("b"), Some("y"), Some(20.0)),
  (Some(3), None,      Some("z"), Some(30.0))
).toDF("c1", "c2", "c3", "c4")

val nullColList = List("c1", "c2", "c4")

val valueMap = df.dtypes.filter(x => nullColList.contains(x._1)).
  collect case (c, t) => t match 
    case "StringType" => (c, "n/a")
    case "IntegerType" => (c, 0)
    case "DoubleType" => (c, Double.MinValue)
   .toMap
// valueMap: scala.collection.immutable.Map[String,Any] = 
//   Map(c1 -> 0, c2 -> n/a, c4 -> -1.7976931348623157E308)

df.na.fill(valueMap).show
// +---+---+---+--------------------+
// | c1| c2| c3|                  c4|
// +---+---+---+--------------------+
// |  1|  a|  x|-1.79769313486231...|
// |  0|  b|  y|                20.0|
// |  3|n/a|  z|                30.0|
// +---+---+---+--------------------+

【讨论】:

这真是个不错的 Leo,对于必须处理所有列的 DF 来说,这是一个完美的镜头,但就我而言,我不必处理给定 DF 的所有列,只有我需要担心的列列表。我仍然可以使用上面的方法,创建 val x = DF diff listOfColumns,然后在 listOfColumns 上执行上面的操作并将 DF 添加回原始 DF。我在问题中的做法会影响性能吗? @Pavan_Obj,请参阅处理选定列的修订解决方案。我希望通过na.fill 进行的单个转换比通过withColumn 进行的多个转换更有效。 写得很漂亮,我已经测试了运行了 40 到 50 分钟的代码,我将运行这个更改并保持 --conf spark 不变。 谢谢,Leo,这也帮助我完成了下面val myNewMap:Map[String,Any] = Map("someStringTypeCol" -&gt; null, "someIntTypeCol" -&gt; null, "someStringTypeCol" -&gt; 0) 之类的操作,以防万一我必须填写上面的内容。

以上是关于如何优化 spark 函数以用零替换空值?的主要内容,如果未能解决你的问题,请参考以下文章

如何优化此查询并替换 MAX 函数?

如何优化 spark 函数以将双精度值舍入到小数点后 2 位?

Spark fillNa 不替换空值

Bigquery 用一些 000 替换空结果或空值

Apache Spark 2.2中基于成本的优化器(CBO)(转载)

如何优化子字符串和字符串或替换函数进行调试