Spark 2.2 空安全左外连接空指针异常
Posted
技术标签:
【中文标题】Spark 2.2 空安全左外连接空指针异常【英文标题】:Spark 2.2 Null-safe Left Outer Join Null Pointer Exception 【发布时间】:2017-10-10 14:12:31 【问题描述】:使用 null 安全的等于运算符执行左外连接会导致 NullPointerException
。
版本 火花2.2.0, 斯卡拉 2.11.8
scala> var d1 = Seq((null, 1), ("a1", 2)).toDF("a", "b")
scala> d1.show
+----+---+
| a| b|
+----+---+
|null| 1|
| a1| 2|
+----+---+
scala> var d2 = Seq(("a2", 3)).toDF("a", "b")
scala> d2.show
+---+---+
| a| b|
+---+---+
| a2| 3|
+---+---+
scala> d1.joinWith(d2, d1("a") <=> d2("a"), "left_outer").show
17/10/10 09:44:39 ERROR Executor: Exception in task 0.0 in stage 6.0 (TID 8)
java.lang.NullPointerException
这是预期的行为吗?
堆栈跟踪
java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
17/10/10 10:19:28 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
17/10/10 10:19:28 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
【问题讨论】:
【参考方案1】:我可以说在 Spark 2.2.0 和 Scala 2.11.8 版本中没有发现任何问题,因为在使用具有 null 安全的相同代码示例时我没有遇到任何异常
和等号运算符
===
您能否再次检查并添加与问题相关的更多详细信息?
val d1 = sc.parallelize(Seq(
(null, 1), ("a1",2))
).toDF("a", "b")
d1.show
+----+---+
| a| b|
+----+---+
|null| 1|
| a1| 2|
+----+---+
val d2 = sc.parallelize(Seq(
("a2",3))
).toDF("a", "b")
d2.show
+---+---+
| a| b|
+---+---+
| a2| 3|
+---+---+
d1.joinWith(d2, d1("a") <=> d2("a"), "left_outer").show()
+--------+----+
| _1| _2|
+--------+----+
|[null,1]|null|
| [a1,2]|null|
+--------+----+
d1.joinWith(d2, d1("a") === d2("a"), "left_outer").show()
+--------+----+
| _1| _2|
+--------+----+
|[null,1]|null|
| [a1,2]|null|
+--------+----+
添加其他示例:
val x = Seq((100L,null), (102L,"17179869185L"), (101L,"17179869186L"), (200L,"17179869186L"), (401L,"1L"), (500L,"1L"), (600L,"8589934593L"), (700L,"8589934593L"), (800L,"8589934593L"), (900L,"8589934594L"), (1000L,"8589934594L"), (1200L,"2L"), (1300L,"2L"), (1301L,"2L"), (1400L,"17179869187L"), (1500L,"17179869188L"), (1600L,"8589934595L")).toDF("u","x1")
x.show()
+----+------------+
| u| x1|
+----+------------+
| 100| null|
| 102|17179869185L|
| 101|17179869186L|
| 200|17179869186L|
| 401| 1L|
| 500| 1L|
| 600| 8589934593L|
| 700| 8589934593L|
| 800| 8589934593L|
| 900| 8589934594L|
|1000| 8589934594L|
|1200| 2L|
|1300| 2L|
|1301| 2L|
|1400|17179869187L|
|1500|17179869188L|
|1600| 8589934595L|
+----+------------+
val y = Seq(("17179869187L",-8589934595L), ("17179869188L",-8589934595L), ("17179869185L",-858993
4593L)).toDF("x2","y")
y.show()
+------------+-----------+
| x2| y|
+------------+-----------+
|17179869187L|-8589934595|
|17179869188L|-8589934595|
|17179869185L|-8589934593|
+------------+-----------+
x.join(y,'x1 === 'x2, "left_outer").show()
+----+------------+------------+-----------+
| u| x1| x2| y|
+----+------------+------------+-----------+
| 100| null| null| null|
| 102|17179869185L|17179869185L|-8589934593|
| 101|17179869186L| null| null|
| 200|17179869186L| null| null|
| 401| 1L| null| null|
| 500| 1L| null| null|
| 600| 8589934593L| null| null|
| 700| 8589934593L| null| null|
| 800| 8589934593L| null| null|
| 900| 8589934594L| null| null|
|1000| 8589934594L| null| null|
|1200| 2L| null| null|
|1300| 2L| null| null|
|1301| 2L| null| null|
|1400|17179869187L|17179869187L|-8589934595|
|1500|17179869188L|17179869188L|-8589934595|
|1600| 8589934595L| null| null|
+----+------------+------------+-----------+
x: org.apache.spark.sql.DataFrame = [u: bigint, x1: string]
y: org.apache.spark.sql.DataFrame = [x2: string, y: bigint]
Command took 1.00 second
【讨论】:
我尝试了您的示例并获得了相同的输出。我给出的示例仍然给出相同的错误,也许sc.parallelize().toDF()
和Seq().toDF()
之间存在差异
@rdg 已经有一段时间了。你找到这个解决方案了吗?从Seq
创建Dataset
时遇到同样的问题。以上是关于Spark 2.2 空安全左外连接空指针异常的主要内容,如果未能解决你的问题,请参考以下文章
使用 Hbase 运行 Nutch 爬虫 2.2 时出现空指针异常
解决spark dataframe get 报空指针异常 java.lang.NullPointerException
Kotlin空安全总结 ( 变量可空性 | 手动空安全管理 | 空安全调用操作符 | 非空断言操作符 | 空合并操作符 | 空指针异常处理 | 先决条件函数判空 )