Apache Spark:Task not serializable异常的排查和解决

Posted 你是小KS

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Spark:Task not serializable异常的排查和解决相关的知识,希望对你有一定的参考价值。

1. 声明

当前内容主要为排查在排序的时候Spark突然出现的java.io.NotSerializableException问题,以及解决思路

2. 还原报错代码

实体类:User

import java.io.Serializable;

public class User implements Serializable {
	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	private Integer id;
	private String name;
	private Double score;
	// 省略getsettoStirng,构造函数
}

实体类已实现Serializable接口!

实际代码:

public class SortDataOperationTest {
	public static void main(String[] args) {
		SparkConf conf = new SparkConf().setMaster("local").setAppName("sort data test");
		JavaSparkContext jsc = new JavaSparkContext(conf);
		sortOperation(jsc);
		jsc.close();
	}

	private static void sortOperation(JavaSparkContext jsc) {
		// List<Integer> intList = Arrays.asList(1, 5, 8, 4, 3, 1, 2, 9, 7, -1);
		// JavaRDD<Integer> parallelize = jsc.parallelize(intList);
		List<User> userList = Arrays.asList(new User(1, "zs", 50.2), new User(2, "ls", 70.5), new User(3, "ww", 65.2));
		// 按照key的方式进行排序的操作
		jsc.parallelize(userList).mapToPair(new PairFunction<User, Integer, User>() {

			@Override
			public Tuple2<Integer, User> call(User t) throws Exception {
				return new Tuple2<Integer, User>(t.getId(), t);
			}
		}).sortByKey().foreach(x -> System.out.println(x));

		//	按照当前的id的方式进行排序
		Comparator<User> comparator = new Comparator<User>() {

			@Override
			public int compare(User o1, User o2) {
				return o1.getId() - o2.getId();
			}
		};

		jsc.parallelize(userList).mapToPair(new PairFunction<User, User, Integer>() {
			@Override
			public Tuple2<User, Integer> call(User t) throws Exception {
				// TODO Auto-generated method stub
				return new Tuple2<User, Integer>(t, 1);
			}
		}).sortByKey(comparator).foreach(x -> System.out.println(x));

	}

}

3. 执行报错

Serialization stack:
	- object not serializable (class: com.hy.spark.test.basic.SortDataOperationTest$2, value: com.hy.spark.test.basic.SortDataOperationTest$2@2c5f8bc8)
	- field (class: scala.math.LowPriorityOrderingImplicits$$anon$7, name: cmp$2, type: interface java.util.Comparator)
	- object (class scala.math.LowPriorityOrderingImplicits$$anon$7, scala.math.LowPriorityOrderingImplicits$$anon$7@ee0c35f)
	- field (class: scala.Some, name: x, type: class java.lang.Object)
	- object (class scala.Some, Some(scala.math.LowPriorityOrderingImplicits$$anon$7@ee0c35f))
	- field (class: org.apache.spark.ShuffleDependency, name: keyOrdering, type: class scala.Option)
	- object (class org.apache.spark.ShuffleDependency, org.apache.spark.ShuffleDependency@51a9145b)
	- field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
	- object (class scala.Tuple2, (MapPartitionsRDD[4] at mapToPair at SortDataOperationTest.java:72,org.apache.spark.ShuffleDependency@51a9145b))
21/08/29 13:46:05 INFO DAGScheduler: Job 1 failed: foreach at SortDataOperationTest.java:78, took 0.053300 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: com.hy.spark.test.basic.SortDataOperationTest$2
Serialization stack:
	- object not serializable (class: com.hy.spark.test.basic.SortDataOperationTest$2, value: com.hy.spark.test.basic.SortDataOperationTest$2@2c5f8bc8)
	- field (class: scala.math.LowPriorityOrderingImplicits$$anon$7, name: cmp$2, type: interface java.util.Comparator)
	- object (class scala.math.LowPriorityOrderingImplicits$$anon$7, scala.math.LowPriorityOrderingImplicits$$anon$7@ee0c35f)
	- field (class: scala.Some, name: x, type: class java.lang.Object)
	- object (class scala.Some, Some(scala.math.LowPriorityOrderingImplicits$$anon$7@ee0c35f))
	- field (class: org.apache.spark.ShuffleDependency, name: keyOrdering, type: class scala.Option)
	- object (class org.apache.spark.ShuffleDependency, org.apache.spark.ShuffleDependency@51a9145b)
	- field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
	- object (class scala.Tuple2, (MapPartitionsRDD[4] at mapToPair at SortDataOperationTest.java:72,org.apache.spark.ShuffleDependency@51a9145b))
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1167)
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:1071)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:1074)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:1073)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:1073)
	at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1014)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2069)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:972)
	at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:970)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.foreach(RDD.scala:970)
	at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:351)
	at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:45)
	at com.hy.spark.test.basic.SortDataOperationTest.sortOperation(SortDataOperationTest.java:78)
	at com.hy.spark.test.basic.SortDataOperationTest.main(SortDataOperationTest.java:30)
Caused by: java.io.NotSerializableException: com.hy.spark.test.basic.SortDataOperationTest$2
Serialization stack:
	- object not serializable (class: com.hy.spark.test.basic.SortDataOperationTest$2, value: com.hy.spark.test.basic.SortDataOperationTest$2@2c5f8bc8)
	- field (class: scala.math.LowPriorityOrderingImplicits$$anon$7, name: cmp$2, type: interface java.util.Comparator)
	- object (class scala.math.LowPriorityOrderingImplicits$$anon$7, scala.math.LowPriorityOrderingImplicits$$anon$7@ee0c35f)
	- field (class: scala.Some, name: x, type: class java.lang.Object)
	- object (class scala.Some, Some(scala.math.LowPriorityOrderingImplicits$$anon$7@ee0c35f))
	- field (class: org.apache.spark.ShuffleDependency, name: keyOrdering, type: class scala.Option)
	- object (class org.apache.spark.ShuffleDependency, org.apache.spark.ShuffleDependency@51a9145b)
	- field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
	- object (class scala.Tuple2, (MapPartitionsRDD[4] at mapToPair at SortDataOperationTest.java:72,org.apache.spark.ShuffleDependency@51a9145b))
	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
	at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1155)
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:1071)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:1074)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:1073)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:1073)
	at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1014)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2069)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)

错误报了一大堆,但是前面的没有报错啊!

4.排查错误

说明当前的sortByKey()是没有任何问题,所以到处找那个类没有实现序列化接口…

开始检查User是否实现序列化接口,看到这个 com.hy.spark.test.basic.SortDataOperationTest$2,这个好像是匿名的导致的?

最后找到字段属性

感觉Spark应该没这个笨让Object对象出现问题,然后开始检查里面的类型,不停的开始将所有未实现序列化的都实现,结果还是报错!

最后定位在了这个地方:(Object对象和其他的对象能有的都有了,不能动的也无法实现序列化,最后发现可能是Comparator这个是匿名的未实现序列化接口导致的)

5.验证结果

将匿名的函数提取出来变成实现类并实现序列化接口

static class UserComparator implements Comparator<User>, Serializable {

	@Override
	public int compare(User o1, User o2) {
		return (int) (o1.getScore() - o2.getScore());
	}
}

最后将代码中的匿名的Comparator替换成UserComparator,结果就成功了,为了方便查看结果设置日志级别为WARN(jsc.setLogLevel("WARN");)

使用成功,说明在使用Spark的时候比较器一定要手动使用实现类,并实现序列化接口,否则在排序操作的时候就会出现序列化问题的!!!(小心匿名类的使用)

以上是关于Apache Spark:Task not serializable异常的排查和解决的主要内容,如果未能解决你的问题,请参考以下文章

spark 插入数据到mysql时遇到的问题 org.apache.spark.SparkException: Task not serializable

Apache Spark:Task not serializable异常的排查和解决

spark Task not serializable

Spark:加入待转换的数据集时,“SparkException:Task not serializable”

spark2.1注册内部函数spark.udf.register("xx", xxx _),运行时抛出异常:Task not serializable

Spark-shell报error: not found错误