在 Spark 中使用 UDF 时出现 NullPointerException

Posted

技术标签:

【中文标题】在 Spark 中使用 UDF 时出现 NullPointerException【英文标题】:NullPointerException when using UDF in Spark 【发布时间】:2017-03-30 11:05:55 【问题描述】:

我在 Spark 中有一个 DataFrame,比如这个:

var df = List(
  (1,"NUM.0002*NUM.0003"),
  (2,"NUM.0004+NUM.0003"),
  (3,"END(6)"),
  (4,"END(4)")
).toDF("CODE", "VALUE")

+----+---------------------+
|CODE|                VALUE|
+----+---------------------+
|   1|NUM.0002*NUM.0003|
|   2|NUM.0004+NUM.0003|
|   3|               END(6)|
|   4|               END(4)|
+----+---------------------+

我的任务是遍历 VALUE 列并执行以下操作:检查是否有 NUM.XXXX 等子字符串,获取 XXXX 数字,获取 $"CODE" === XXXX 所在的行,然后用该行中的 VALUE 字符串替换 NUM.XXX 子字符串。

我希望数据框最终看起来像这样:

+----+--------------------+
|CODE|               VALUE|
+----+--------------------+
|   1|END(4)+END(6)*END(6)|
|   2|       END(4)+END(6)|
|   3|              END(6)|
|   4|              END(4)|
+----+--------------------+

这是我想出的最好的:

val process = udf((ln: String) => 
  var newln = ln
  while(newln contains "NUM.")
    var num = newln.slice(newln.indexOf("")+5, newln.indexOf("")).toInt 
    var new_value = df.where($"CODE" === num).head.getAs[String](1)
    newln = newln.replace(newln.slice(newln.indexOf(""),newln.indexOf("")+1), new_value)
  
  newln
)

var df2 = df.withColumn("VALUE", when('VALUE contains "NUM.",process('VALUE)).otherwise('VALUE))

不幸的是,当我尝试过滤/选择/保存 df2 时,我得到了 NullPointerException,而当我只显示 df2 时没有错误。我相信当我在 UDF 中访问 DataFrame df 时会出现错误,但我需要每次迭代都访问它,所以我不能将它作为输入传递。另外,我尝试在 UDF 中保存df 的副本,但我不知道该怎么做。我可以在这里做什么?

非常欢迎任何改进算法的建议!谢谢!

【问题讨论】:

【参考方案1】:

我写了一些我认为有效但不是很优化的东西。我实际上在初始 DataFrame 上进行递归连接,以用 END 替换 NUM。这是代码:

    case class Data(code: Long, value: String)

    def main(args: Array[String]): Unit = 
        val sparkSession: SparkSession = SparkSession.builder().master("local").getOrCreate()

        val data = Seq(
            Data(1,"NUM.0002*NUM.0003"),
            Data(2,"NUM.0004+NUM.0003"),
            Data(3,"END(6)"),
            Data(4,"END(4)"),
            Data(5,"NUM.0002")
        )

        val initialDF = sparkSession.createDataFrame(data)
        val endDF = initialDF.filter(!(col("value") contains "NUM"))
        val numDF = initialDF.filter(col("value") contains "NUM")

        val resultDF = endDF.union(replaceNumByEnd(initialDF, numDF))
        resultDF.show(false)
    


    val parseNumUdf = udf((value: String) => 
        if (value.contains("NUM")) 
            val regex = """.*?\NUM\.(\d+)\.*""".r
            value match 
                case regex(code) => code.toLong
            
         else 
            -1L
        
    )

    val replaceUdf = udf((value: String, replacement: String) => 
        val regex = """\NUM\.(\d+)\""".r
        regex.replaceFirstIn(value, replacement)
    )

    def replaceNumByEnd(initialDF: DataFrame, currentDF: DataFrame): DataFrame = 
        if (currentDF.count() == 0) 
            currentDF
         else 
            val numDFWithCode = currentDF
                .withColumn("num_code", parseNumUdf(col("value")))
                .withColumnRenamed("code", "code_original")
                .withColumnRenamed("value", "value_original")

            val joinedDF = numDFWithCode.join(initialDF, numDFWithCode("num_code") === initialDF("code"))

            val replacedDF = joinedDF.withColumn("value_replaced", replaceUdf(col("value_original"), col("value")))

            val nextDF = replacedDF.select(col("code_original").as("code"), col("value_replaced").as("value"))

            val endDF = nextDF.filter(!(col("value") contains "NUM"))
            val numDF = nextDF.filter(col("value") contains "NUM")

            endDF.union(replaceNumByEnd(initialDF, numDF))
        
    

如果您需要更多解释,请不要犹豫。

【讨论】:

以上是关于在 Spark 中使用 UDF 时出现 NullPointerException的主要内容,如果未能解决你的问题,请参考以下文章

将 Python UDF 应用于 Spark 数据帧时出现 java.lang.IllegalArgumentException

编写Spark的UDF函数解决Hive表大数bigintdoublefloatdecimal等转字符串string时出现的科学计数法问题Java

编写Spark的UDF函数解决Hive表大数bigintdoublefloatdecimal等转字符串string时出现的科学计数法问题Java

将 pyspark pandas_udf 与 AWS EMR 一起使用时出现“没有名为‘pandas’的模块”错误

在 pig 中使用 UDF 时出现错误 1070

我可以将 spark 数据帧作为参数发送给 pandas UDF