在 Spark 中使用 UDF 时出现 NullPointerException
Posted
技术标签:
【中文标题】在 Spark 中使用 UDF 时出现 NullPointerException【英文标题】:NullPointerException when using UDF in Spark 【发布时间】:2017-03-30 11:05:55 【问题描述】:我在 Spark 中有一个 DataFrame,比如这个:
var df = List(
(1,"NUM.0002*NUM.0003"),
(2,"NUM.0004+NUM.0003"),
(3,"END(6)"),
(4,"END(4)")
).toDF("CODE", "VALUE")
+----+---------------------+
|CODE| VALUE|
+----+---------------------+
| 1|NUM.0002*NUM.0003|
| 2|NUM.0004+NUM.0003|
| 3| END(6)|
| 4| END(4)|
+----+---------------------+
我的任务是遍历 VALUE 列并执行以下操作:检查是否有 NUM.XXXX 等子字符串,获取 XXXX 数字,获取 $"CODE" === XXXX 所在的行,然后用该行中的 VALUE 字符串替换 NUM.XXX 子字符串。
我希望数据框最终看起来像这样:
+----+--------------------+
|CODE| VALUE|
+----+--------------------+
| 1|END(4)+END(6)*END(6)|
| 2| END(4)+END(6)|
| 3| END(6)|
| 4| END(4)|
+----+--------------------+
这是我想出的最好的:
val process = udf((ln: String) =>
var newln = ln
while(newln contains "NUM.")
var num = newln.slice(newln.indexOf("")+5, newln.indexOf("")).toInt
var new_value = df.where($"CODE" === num).head.getAs[String](1)
newln = newln.replace(newln.slice(newln.indexOf(""),newln.indexOf("")+1), new_value)
newln
)
var df2 = df.withColumn("VALUE", when('VALUE contains "NUM.",process('VALUE)).otherwise('VALUE))
不幸的是,当我尝试过滤/选择/保存 df2
时,我得到了 NullPointerException,而当我只显示 df2
时没有错误。我相信当我在 UDF 中访问 DataFrame df
时会出现错误,但我需要每次迭代都访问它,所以我不能将它作为输入传递。另外,我尝试在 UDF 中保存df
的副本,但我不知道该怎么做。我可以在这里做什么?
非常欢迎任何改进算法的建议!谢谢!
【问题讨论】:
【参考方案1】:我写了一些我认为有效但不是很优化的东西。我实际上在初始 DataFrame 上进行递归连接,以用 END 替换 NUM。这是代码:
case class Data(code: Long, value: String)
def main(args: Array[String]): Unit =
val sparkSession: SparkSession = SparkSession.builder().master("local").getOrCreate()
val data = Seq(
Data(1,"NUM.0002*NUM.0003"),
Data(2,"NUM.0004+NUM.0003"),
Data(3,"END(6)"),
Data(4,"END(4)"),
Data(5,"NUM.0002")
)
val initialDF = sparkSession.createDataFrame(data)
val endDF = initialDF.filter(!(col("value") contains "NUM"))
val numDF = initialDF.filter(col("value") contains "NUM")
val resultDF = endDF.union(replaceNumByEnd(initialDF, numDF))
resultDF.show(false)
val parseNumUdf = udf((value: String) =>
if (value.contains("NUM"))
val regex = """.*?\NUM\.(\d+)\.*""".r
value match
case regex(code) => code.toLong
else
-1L
)
val replaceUdf = udf((value: String, replacement: String) =>
val regex = """\NUM\.(\d+)\""".r
regex.replaceFirstIn(value, replacement)
)
def replaceNumByEnd(initialDF: DataFrame, currentDF: DataFrame): DataFrame =
if (currentDF.count() == 0)
currentDF
else
val numDFWithCode = currentDF
.withColumn("num_code", parseNumUdf(col("value")))
.withColumnRenamed("code", "code_original")
.withColumnRenamed("value", "value_original")
val joinedDF = numDFWithCode.join(initialDF, numDFWithCode("num_code") === initialDF("code"))
val replacedDF = joinedDF.withColumn("value_replaced", replaceUdf(col("value_original"), col("value")))
val nextDF = replacedDF.select(col("code_original").as("code"), col("value_replaced").as("value"))
val endDF = nextDF.filter(!(col("value") contains "NUM"))
val numDF = nextDF.filter(col("value") contains "NUM")
endDF.union(replaceNumByEnd(initialDF, numDF))
如果您需要更多解释,请不要犹豫。
【讨论】:
以上是关于在 Spark 中使用 UDF 时出现 NullPointerException的主要内容,如果未能解决你的问题,请参考以下文章
将 Python UDF 应用于 Spark 数据帧时出现 java.lang.IllegalArgumentException
编写Spark的UDF函数解决Hive表大数bigintdoublefloatdecimal等转字符串string时出现的科学计数法问题Java
编写Spark的UDF函数解决Hive表大数bigintdoublefloatdecimal等转字符串string时出现的科学计数法问题Java