org.apache.spark.SparkException:无法执行用户定义的函数

Posted

技术标签:

【中文标题】org.apache.spark.SparkException:无法执行用户定义的函数【英文标题】:org.apache.spark.SparkException: Failed to execute user defined function 【发布时间】:2017-08-17 15:24:49 【问题描述】:

我是 scala 的新手,我正在尝试执行以下代码:

val SetID = udf(c:String, d: String) =>
    if( c.UpperCase.contains("EXKLUS") == true)
    d
    else ""

val ParquetWithID = STG1
  .withColumn("ID", SetID( col("line_item"), col("line_item_ID")))

两列(line_itemline_item_id)在STG1 架构中定义为Strings

当我尝试运行代码时出现以下错误:

`org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1$$anonfun$2: (string, string) => string)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.NullPointerException
    at MyTests$$anonfun$1$$anonfun$2.apply(MyTests.scala:356)
    at MyTests$$anonfun$1$$anonfun$2.apply(MyTests.scala:355)
    ... 16 more

我也试过c.UpperCase().contains("EXKLUS"),但我得到了同样的错误。 但是,如果我只是运行“if equals”语句,一切正常。所以我想问题出在我的udf 中使用UpperCase().contains(" ") 函数,但我不明白问题出在哪里。任何帮助将不胜感激!

【问题讨论】:

【参考方案1】:

如果schema 包含为

 |-- line_item: string (nullable = true)
 |-- line_item_ID: string (nullable = true)

然后在 if 语句中检查 null 应该可以解决问题(请注意,字符串有 toUpperCase 方法)

val SetID = udf(c:String, d: String) =>
  if(c != null && c.toUpperCase.contains("EXKLUS") == true)
  d
  else ""

val ParquetWithID = STG1
  .withColumn("ID", SetID( col("line_item"), col("line_item_ID")))

希望回答对你有帮助

【讨论】:

很高兴听到@Inna 并感谢您接受 :) 任何建议如何处理这个UDF问题***.com/questions/63935600/…

以上是关于org.apache.spark.SparkException:无法执行用户定义的函数的主要内容,如果未能解决你的问题,请参考以下文章