来自示例 Java 程序的 Spark UDF 反序列化错误
Posted
技术标签:
【中文标题】来自示例 Java 程序的 Spark UDF 反序列化错误【英文标题】:Spark UDF deserialization error from sample Java program 【发布时间】:2020-10-10 21:32:32 【问题描述】:这个例子直接取自Spark example code,所以我有点不知所措。
import org.apache.spark.sql.*;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;
public class TestSpark
public static void main(String[] args)
SparkSession spark = SparkSession.builder()
.appName("testspark")
.master("spark://0.0.0.0:7077")
// Yes this file exists, and contains this class
.config("spark.jars", "out/artifacts/testspark_jar/testspark.jar")
.getOrCreate();
// This works
spark.sql("SELECT 5 + 1").show();
spark.udf().register("plusOne", (UDF1<Integer, Integer>) x -> x + 1, DataTypes.IntegerType);
// This fails
spark.sql("SELECT plusOne(5)").show();
我在本地主机上运行的 Spark Standalone 集群上运行它。
工人总是失败:
Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD
at java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2205)
at java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2168)
at java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1422)
at java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2450)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2357)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2166)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2434)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2166)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1175)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2295)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2166)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2434)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2166)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2434)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2166)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
我在 Java 11 上运行,使用 Spark 3.0.1。
我确实发现了这个非常相似的问题,它看起来就是答案:java.lang.ClassCastException using lambda expressions in spark job on remote server
但是,在确保将我的 TestSpark 编译为提供给 SparkSession 的 JAR 之后,我仍然遇到相同的错误。
任何帮助将不胜感激。似乎在 Java/Scala 边界发生了一些事情,但我对 Scala 互操作的了解还不够,无法进一步分析。
【问题讨论】:
【参考方案1】:我已经链接的问题 (java.lang.ClassCastException using lambda expressions in spark job on remote server) 的答案是正确的。
它又被混淆了,因为我没有正确编译 JAR(我认为清单中有一些东西,但不是 100% 确定)。
正确编译 JAR(使用 mvn package
而不是我的 IDE)并在 spark.jars
配置属性中提供该 JAR 后,代码按预期工作。
【讨论】:
以上是关于来自示例 Java 程序的 Spark UDF 反序列化错误的主要内容,如果未能解决你的问题,请参考以下文章
Spark UDF:如何在每一行上编写一个 UDF 以提取嵌套结构中的特定值?
我可以使用java从spark UDF返回一个java对象吗
如何将复杂的 Java 类对象作为参数传递给 Spark 中的 Scala UDF?