org.apache.spark.SparkException: Task not serializable

Posted Shockang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了org.apache.spark.SparkException: Task not serializable相关的知识,希望对你有一定的参考价值。

前言

本文隶属于专栏《Spark异常问题汇总》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

本专栏目录结构和参考文献请见 Spark异常问题汇总

正文

报错原因解析如果出现“org.apache.spark.SparkException: Task not serializable”错误,一般是因为在 map 、 filter 等的参数使用了外部的变量,但是这个变量不能序列化(不是说不可以引用外部变量,只是要做好序列化工作)。

其中最普遍的情形是:当引用了某个类(经常是当前类)的成员函数或变量时,会导致这个类的所有成员(整个类)都需要支持序列化

虽然许多情形下,当前类使用了“extends Serializable”声明支持序列化,但是由于某些字段不支持序列化,仍然会导致整个类序列化时出现问题,最终导致出现 Task 未序列化问题。

实践

1

需求描述

由于 Spark 程序中的 map 、 filter 等算子内部引用了类成员函数或变量导致需要该类所有成员都支持序列化,又由于该类某些成员变量不支持序列化,最终引发 Task 无法序列化问题。

为了验证上述原因,我们编写了一个实例程序,如下所示。

该类的功能是从域名列表中( RDD )过滤得到特定顶级域名(rootDomain,如.com,cn,org)的域名列表,而该特定顶级域名需在要函数调用时指定。

代码 1

package com.shockang.study.bigdata.spark.errors.serializable

import org.apache.spark.{SparkConf, SparkContext}

class MyTest1 private(conf: String) extends Serializable {
  private val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org")
  private val sparkConf = new SparkConf().setMaster("local[*]").setAppName("MyTest1")
  private val sc = new SparkContext(sparkConf)
  private val rdd = sc.parallelize(list)
  private val rootDomain = conf

  private def getResult(): Unit = {
    val result = rdd.filter(item => item.contains(rootDomain))
    result.foreach(println)
  }

  private def stop(): Unit = {
    sc.stop()
  }
}

object MyTest1 {
  def main(args: Array[String]): Unit = {
    val test = new MyTest1("com")
    test.getResult()
    test.stop()
  }
}

日志 1

依据上述分析的原因,由于依赖了当前类的成员变量,所以导致当前类全部需要序列化。

当前类的某些字段未做好序列化,导致出错。

实际情况与分析的原因一致,运行过程中出现的错误如下所示。

分析下面的日志,可知错误是由于 sc ( SparkContext )引起的。

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/07/25 15:50:42 INFO SparkContext: Running Spark version 3.1.2
21/07/25 15:50:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/07/25 15:50:43 INFO ResourceUtils: ==============================================================
21/07/25 15:50:43 INFO ResourceUtils: No custom resources configured for spark.driver.
21/07/25 15:50:43 INFO ResourceUtils: ==============================================================
21/07/25 15:50:43 INFO SparkContext: Submitted application: MyTest1
21/07/25 15:50:43 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
21/07/25 15:50:43 INFO ResourceProfile: Limiting resource is cpu
21/07/25 15:50:43 INFO ResourceProfileManager: Added ResourceProfile id: 0
21/07/25 15:50:43 INFO SecurityManager: Changing view acls to: shockang
21/07/25 15:50:43 INFO SecurityManager: Changing modify acls to: shockang
21/07/25 15:50:43 INFO SecurityManager: Changing view acls groups to: 
21/07/25 15:50:43 INFO SecurityManager: Changing modify acls groups to: 
21/07/25 15:50:43 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(shockang); groups with view permissions: Set(); users  with modify permissions: Set(shockang); groups with modify permissions: Set()
21/07/25 15:50:43 INFO Utils: Successfully started service 'sparkDriver' on port 63559.
21/07/25 15:50:43 INFO SparkEnv: Registering MapOutputTracker
21/07/25 15:50:43 INFO SparkEnv: Registering BlockManagerMaster
21/07/25 15:50:43 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
21/07/25 15:50:43 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
21/07/25 15:50:43 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
21/07/25 15:50:43 INFO DiskBlockManager: Created local directory at /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/blockmgr-862d818a-ff7d-473b-a321-84ca60962ada
21/07/25 15:50:43 INFO MemoryStore: MemoryStore started with capacity 2004.6 MiB
21/07/25 15:50:43 INFO SparkEnv: Registering OutputCommitCoordinator
21/07/25 15:50:44 INFO Utils: Successfully started service 'SparkUI' on port 4040.
21/07/25 15:50:44 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.0.105:4040
21/07/25 15:50:44 INFO Executor: Starting executor ID driver on host 192.168.0.105
21/07/25 15:50:44 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 63560.
21/07/25 15:50:44 INFO NettyBlockTransferService: Server created on 192.168.0.105:63560
21/07/25 15:50:44 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/07/25 15:50:44 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.0.105, 63560, None)
21/07/25 15:50:44 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.0.105:63560 with 2004.6 MiB RAM, BlockManagerId(driver, 192.168.0.105, 63560, None)
21/07/25 15:50:44 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.0.105, 63560, None)
21/07/25 15:50:44 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.0.105, 63560, None)
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2459)
	at org.apache.spark.rdd.RDD.$anonfun$filter$1(RDD.scala:439)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.filter(RDD.scala:438)
	at com.shockang.study.bigdata.spark.errors.serializable.MyTest1.com$shockang$study$bigdata$spark$errors$serializable$MyTest1$$getResult(MyTest1.scala:13)
	at com.shockang.study.bigdata.spark.errors.serializable.MyTest1$.main(MyTest1.scala:25)
	at com.shockang.study.bigdata.spark.errors.serializable.MyTest1.main(MyTest1.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
	- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@23811a09)
	- field (class: com.shockang.study.bigdata.spark.errors.serializable.MyTest1, name: sc, type: class org.apache.spark.SparkContext)
	- object (class com.shockang.study.bigdata.spark.errors.serializable.MyTest1, com.shockang.study.bigdata.spark.errors.serializable.MyTest1@256aa5f2)
	- element of array (index: 0)
	- array (class [Ljava.lang.Object;, size 1)
	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class com.shockang.study.bigdata.spark.errors.serializable.MyTest1, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic com/shockang/study/bigdata/spark/errors/serializable/MyTest1.$anonfun$getResult$1$adapted:(Lcom/shockang/study/bigdata/spark/errors/serializable/MyTest1;Ljava/lang/String;)Ljava/lang/Object;, instantiatedMethodType=(Ljava/lang/String;)Ljava/lang/Object;, numCaptured=1])
	- writeReplace data (class: java.lang.invoke.SerializedLambda)
	- object (class com.shockang.study.bigdata.spark.errors.serializable.MyTest1$$Lambda$689/1155399955, com.shockang.study.bigdata.spark.errors.serializable.MyTest1$$Lambda$689/1155399955@66bacdbc)
	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413)
	... 11 more
21/07/25 15:50:44 INFO SparkContext: Invoking stop() from shutdown hook
21/07/25 15:50:44 INFO SparkUI: Stopped Spark web UI at http://192.168.0.105:4040
21/07/25 15:50:44 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
21/07/25 15:50:45 INFO MemoryStore: MemoryStore cleared
21/07/25 15:50:45 INFO BlockManager: BlockManager stopped
21/07/25 15:50:45 INFO BlockManagerMaster: BlockManagerMaster stopped
21/07/25 15:50:45 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
21/07/25 15:50:45 INFO SparkContext: Successfully stopped SparkContext
21/07/25 15:50:45 INFO ShutdownHookManager: Shutdown hook called
21/07/25 15:50:45 INFO ShutdownHookManager: Deleting directory /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/spark-f084a4b0-2f36-4e9d-a1f6-fb2bbbbd99d9

2

代码 2

为了验证上述结论,将不需要序列化的成员变量使用关键字“@transient”标注,表示不序列化当前类中的这两个成员变量。

package com.shockang.study.bigdata.spark.errors.serializable

import org.apache.spark.{SparkConf, SparkContext}

class MyTest2 private(conf: String) extends Serializable {
  private val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org")
  @transient private val sparkConf = new SparkConf().setMaster("local[*]").setAppName("MyTest2")
  @transient private val sc = new SparkContext(sparkConf)
  private val rdd = sc.parallelize(list)
  private val rootDomain = conf

  private def getResult(): Unit = {
    val result = rdd.filter(item => item.contains(rootDomain))
    result.foreach(println)
  }

  private def stop(): Unit = {
    sc.stop()
  }
}

object MyTest2 {
  def main(args: Array[String]): Unit = {
    val test = new MyTest2("com")
    test.getResult()
    test.stop()
  }
}

日志 2

再次执行程序,程序运行正常。

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/07/25 15:51:17 INFO SparkContext: Running Spark version 3.1.2
21/07/25 15:51:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/07/25 15:51:18 INFO ResourceUtils: ==============================================================
21/07/25 15:51:18 INFO ResourceUtils: No custom resources configured for spark.driver.
21/07/25 15:51:18 INFO ResourceUtils: ==============================================================
21/07/25 15:51:18 INFO SparkContext: Submitted application: MyTest2
21/07/25 15:51:18 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
21/07/25 15:51:18 INFO ResourceProfile: Limiting resource is cpu
21/07/25 15:51:18 INFO ResourceProfileManager: Added ResourceProfile id: 0
21/07/25 15:51:18 INFO SecurityManager: Changing view acls to: shockang
21/07/25 15:51:18 INFO SecurityManager: Changing modify acls to: shockang
21/07/25 15:51:18 INFO SecurityManager: Changing view acls groups to: 
21/07/25 15:51:18 INFO SecurityManager: Changing modify acls groups to: 
21/07/25 15:51:18 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(shockang); groups with view permissions: Set(); users  with modify permissions: Set(shockang); groups with modify permissions: Set()
21/07/25 15:51:18 INFO Utils: Successfully started service 'sparkDriver' on port 63584.
21/07/25 15:51:18 INFO SparkEnv: Registering MapOutputTracker
21/07/25 15:51:18 INFO SparkEnv: Registering BlockManagerMaster
21/07/25 15:51:18 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
21/07/25 15:51:18 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
21/07/25 15:51:18 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
21/07/25 15:51:18 INFO DiskBlockManager: Created local directory at /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/blockmgr-292dce43-a0c2-4ba7-aed2-7786b9c34b6d
21/07/25 15:51:18 INFO MemoryStore: MemoryStore started with capacity 2004.6 MiB
21/07/25 15:51:18 INFO SparkEnv: Registering OutputCommitCoordinator
21/07/25 15:51:19 INFO Utils: Successfully started service 'SparkUI' on port 4040.
21/07/25 15:51:19 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.0.105:4040
21/07/25 15:51:19 INFO Executor: Starting executor ID driver on host 192.168.0.105
21/07/25 15:51:19 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 63585.
21/07/25 15:51:19 INFO NettyBlockTransferService: Server created on 192.168.0.105:63585
21/07/25 15:51:19 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/07/25 15:51:19 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.0.105, 63585, None)
21/07/25 15:51:19 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.0.105:63585 with 2004.6 MiB RAM, BlockManagerId(driver, 192.168.0.105, 63585, None)
21/07/25 15:51:19 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.0.105, 63585, None)
21/07/25 15:51:19 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.0.105, 63585, None)
21/07/25 15:51:19 INFO SparkContext: Starting job: foreach at MyTest2.scala:14
21/07/25 15:51:19 INFO DAGScheduler: Got job 0 (foreach at MyTest2.scala:14) with 12 output partitions
21/07/25 15:51:19 INFO DAGScheduler: Final stage: ResultStage 0 (foreach at MyTest2.scala:14)
21/07/25 15:51:19 INFO DAGScheduler: Parents of final stage: List()
21/07/25 15:51:19 INFO DAGScheduler: Missing parents: List()
21/07/25 15:51:19 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at filter at MyTest2.scala:13), which has no missing parents
21/07/25 15:51:19 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 3.4 KiB, free 2004.6 MiB)
21/07/25 15:51:19 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1922.0 B, free 2004.6 MiB)
21/07/25 15:51:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.0.105:63585 (size: 1922.0 B, free: 2004.6 MiB)
21/07/25 15:51:19 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1388
21/07/25 15:51:19 INFO DAGScheduler: Submitting 12 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at filter at MyTest2.scala:13) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11))
21/07/25 15:51:19 INFO TaskSchedulerImpl: Adding task set 0.0 with 12 tasks resource profile 0
21/07/25 15:51:19 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (192.168.0.105, executor driver, partition 0, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1) (192.168.0.105, executor driver, partition 1, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2) (192.168.0.105, executor driver, partition 2, PROCESS_LOCAL, 4457 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3) (192.168.0.105, executor driver, partition 3, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID 4) (192.168.0.105, executor driver, partition 4, PROCESS_LOCAL, 4461 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO TaskSetManager: Starting task 5.0 in stage 0.0 (TID 5) (192.168.0.105, executor driver, partition 5, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO TaskSetManager: Starting task 6.0 in stage 0.0 (TID 6) (192.168.0.105, executor driver, partition 6, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO TaskSetManager: Starting task 7.0 in stage 0.0 (TID 7) (192.168.0.105, executor driver, partition 7, PROCESS_LOCAL, 4456 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO TaskSetManager: Starting task 8.0 in stage 0.0 (TID 8) (192.168.0.105, executor driver, partition 8, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO TaskSetManager: Starting task 9.0 in stage 0.0 (TID 9) (192.168.0.105, executor driver, partition 9, PROCESS_LOCAL, 4460 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO TaskSetManager: Starting task 10.0 in stage 0.0 (TID 10) (192.168.0.105, executor driver, partition 10, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO TaskSetManager: Starting task 11.0 in stage 0.0 (TID 11) (192.168.0.105, executor driver, partition 11, PROCESS_LOCAL, 4457 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO Executor: Running task 7.0 in stage 0.0 (TID 7)
21/07/25 15:51:19 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
21/07/25 15:51:19 INFO Executor: Running task 5.0 in stage 0.0 (TID 5)
21/07/25 15:51:19 INFO Executor: Running task 6.0 in stage 0.0 (TID 6)
21/07/25 15:51:19 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
21/07/25 15:51:19 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
21/07/25 15:51:19 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
21/07/25 15:51:19 INFO Executor: Running task 10.0 in stage 0.0 (TID 10)
21/07/25 15:51:19 INFO Executor: Running task 11.0 in stage 0.0 (TID 11)
21/07/25 15:51:19 INFO Executor: Running task 9.0 in stage 0.0 (TID 9)
21/07/25 15:51:19 INFO Executor: Running task 4.0 in stage 0.0 (TID 4)
21/07/25 15:51:19 INFO Executor: Running task 8.0 in stage 0.0 (TID 8)
www.b.com
a.com
a.com.cn
21/07/25 15:51:20 INFO Executor: Finished task 8.0 in stage 0.0 (TID 8). 880 bytes result sent to driver
21/07/25 15:51:20 INFO Executor: Finished task 11.0 in stage 0.0 (TID 11). 880 bytes result sent to driver
21/07/25 15:51:20 INFO Executor: Finished task 9.0 in stage 0.0 (TID 9). 880 bytes result sent to driver
21/07/25 15:51:20 INFO Executor: Finished task 10.0 in stage 0.0 (TID 10). 880 bytes result sent to driver
21/07/25 15:51:20 INFO Executor: Finished task 4.0 in stage 0.0 (TID 4). 880 bytes result sent to driver
21/07/25 15:51:20 INFO Executor: Finished task 6.0 in stage 0.0 (TID 6). 880 bytes result sent to driver
21/07/25 15:51:20 INFO Executor: Finished task 5.0 in stage 0.0 (TID 5). 880 bytes result sent to driver
21/07/25 15:51:20 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 880 bytes result sent to driver
21/07/25 15:51:20 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 880 bytes result sent to driver
21/07/25 15:51:20 INFO Executor: Finished task 7.0 in stage 0.0 (TID 7). 880 bytes result sent to driver
21/07/25 15:51:20 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 880 bytes result sent to driver
21/07/25 15:51:20 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 880 bytes result sent to driver
21/07/25 15:51:20 INFO TaskSetManager: Finished task 11.0 in stage 0.0 (TID 11) in 595 ms on 192.168.0.105 (executor driver) (1/12)
21/07/25 15:51:20 INFO TaskSetManager: Finished task 6.0 in stage 0.0 (TID 6) in 600 ms on 192.168.0.105 (executor driver) (2/12)
21/07/25 15:51:20 INFO TaskSetManager: Finished task 5.0 in stage 0.0 (TID 5) in 601 ms on 192.168.0.105 (executor driver) (3/12)
21/07/25 15:51:20 INFO TaskSetManager: Finished task 10.0 in stage 0.0 (TID 10) in 599 ms on 192.168.0.105 (executor driver) (4/12)
21/07/25 15:51:20 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 603 ms on 192.168.0.105 (executor driver) (5/12)
21/07/25 15:51:20 INFO TaskSetManager: Finished task 8.0 in stage 0.0 (TID 8) in 602 ms on 192.168.0.105 (executor driver) (6/12)
21/07/25 15:51:20 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 606 ms on 192.168.0.105 (executor driver) (7/12)
21/07/25 15:51:20 INFO TaskSetManager: Finished task 9.0 in stage 0.0 (TID 9) in 603 ms on 192.168.0.105 (executor driver) (8/12)
21/07/25 15:51:20 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 607 ms on 192.168.0.105 (executor driver) (9/12)
21/07/25 15:51:20 INFO TaskSetManager: Finished task 7.0 in stage 0.0 (TID 7) in 606 ms on 192.168.0.105 (executor driver) (10/12)
21/07/25 15:51:20 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 609 ms on 192.168.0.105 (executor driver) (11/12)
21/07/25 15:51:20 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 637 ms on 192.168.0.105 (executor driver) (12/12)
21/07/25 15:51:20 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
21/07/25 15:51:20 INFO DAGScheduler: ResultStage 0 (foreach at MyTest2.scala:14) finished in 0.763 s
21/07/25 15:51:20 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
21/07/25 15:51:20 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished
21/07/25 15:51:20 INFO DAGScheduler: Job 0 finished: foreach at MyTest2.scala:14, took 0.807256 s
21/07/25 15:51:20 INFO SparkUI: Stopped Spark web UI at http://192.168.0.105:4040
21/07/25 15:51:20 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
21/07/25 15:51:20 INFO MemoryStore: MemoryStore cleared
21/07/25 15:51:20 INFO BlockManager: BlockManager stopped
21/07/25 15:51:20 INFO BlockManagerMaster: BlockManagerMaster stopped
21/07/25 15:51:20 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
21/07/25 15:51:20 INFO SparkContext: Successfully stopped SparkContext
21/07/25 15:51:20 INFO ShutdownHookManager: Shutdown hook called
21/07/25 15:51:20 INFO ShutdownHookManager: Deleting directory /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/spark-b6b9cd75-c3cc-4701-8b63-93e25180bd56

初步结论

所以,通过上面的例子可以得到结论:

由于 Spark 程序中的 map 、 filter 等算子内部引用了类成员函数或变量,导致该类所有成员都需要支持序列化,又由于该类某些成员变量不支持序列化,最终引发 Task 无法序列化问题。

相反,对类中那些不支持序列化问题的成员变量标注后,使得整个类能够正常序列化,最终消除 Task 未序列化问题。

3

引用成员函数的实例分析

成员变量与成员函数对序列化的影响相同,即引用了某类的成员函数,会导致该类所有成员都支持序列化。

为了验证这个假设,我们在 map 中使用了当前类的一个成员函数,作用是如果当前域名没有以“www.”开头,那么就在域名头部添加“www.”前缀

注:由于 rootDomain 是在 getResult 函数内部定义的,所以就不存在引用类成员变量的问题,也就不存在和排除了上一个例子讨论和引发的问题。
因此,这个例子主要讨论成员函数引用的影响:
此外,不直接引用类成员变量也是解决这类问题的一个手段,如本例中为了消除成员变量的影响而在函数内部定义变量的这种做法。

代码 3

下面的代码同样会报错,同上面的例子一样,由于当前类中的 sc ( SparkContext )和 sparkConf ( SparkConf )两个成员变量没有做好序列化处理,导致当前类的序列化出现问题。

package com.shockang.study.bigdata.spark.errors.serializable

import org.apache.spark.{SparkConf, SparkContext}

class MyTest3 private(conf: String) extends Serializable {
  private val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org")
  private val sparkConf = new SparkConf().setMaster("local[*]").setAppName("MyTest3")
  private val sc = new SparkContext(sparkConf)
  private val rdd = sc.parallelize(list)

  private def getResult(): Unit = {
    val rootDomain = conf
    val result = rdd.filter(item => item.contains(rootDomain)).map(item => addWWW(item))
    result.foreach(println)
  }

  private def addWWW(str: String): String = {
    if (str.startsWith("www")) str else "www." + str
  }

  private def stop(): Unit = {
    sc.stop()
  }
}

object MyTest3 {
  def main(args: Array[String]): Unit = {
    val test = new MyTest3("com")
    test.getResult()
    test.stop()
  }
}

日志 3

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/07/25 15:52:26 INFO SparkContext: Running Spark version 3.1.2
21/07/25 15:52:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/07/25 15:52:27 INFO ResourceUtils: ==============================================================
21/07/25 15:52:27 INFO ResourceUtils: No custom resources configured for spark.driver.
21/07/25 15:52:27 INFO ResourceUtils: ==============================================================
21/07/25 15:52:27 INFO SparkContext: Submitted application: MyTest3
21/07/25 15:52:27 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
21/07/25 15:52:27 INFO ResourceProfile: Limiting resource is cpu
21/07/25 15:52:27 INFO ResourceProfileManager: Added ResourceProfile id: 0
21/07/25 15:52:27 INFO SecurityManager: Changing view acls to: shockang
21/07/25 15:52:27 INFO SecurityManager: Changing modify acls to: shockang
21/07/25 15:52:27 INFO SecurityManager: Changing view acls groups to: 
21/07/25 15:52:27 INFO SecurityManager: Changing modify acls groups to: 
21/07/25 15:52:27 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(shockang); groups with view permissions: Set(); users  with modify permissions: Set(shockang); groups with modify permissions: Set()
21/07/25 15:52:27 INFO Utils: Successfully started service 'sparkDriver' on port 63658.
21/07/25 15:52:27 INFO SparkEnv: Registering MapOutputTracker
21/07/25 15:52:27 INFO SparkEnv: Registering BlockManagerMaster
21/07/25 15:52:27 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
21/07/25 15:52:27 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
21/07/25 15:52:27 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
21/07/25 15:52:27 INFO DiskBlockManager: Created local directory at /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/blockmgr-135805bc-d9ab-4d01-a374-f5631e2cb311
21/07/25 15:52:27 INFO MemoryStore: MemoryStore started with capacity 2004.6 MiB
21/07/25 15:52:27 INFO SparkEnv: Registering OutputCommitCoordinator
21/07/25 15:52:28 INFO Utils: Successfully started service 'SparkUI' on port 4040.
21/07/25 15:52:28 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.0.105:4040
21/07/25 15:52:28 INFO Executor: Starting executor ID driver on host 192.168.0.105
21/07/25 15:52:28 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 63659.
21/07/25 15:52:28 INFO NettyBlockTransferService: Server created on 192.168.0.105:63659
21/07/25 15:52:28 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/07/25 15:52:28 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.0.105, 63659, None)
21/07/25 15:52:28 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.0.105:63659 with 2004.6 MiB RAM, BlockManagerId(driver, 192.168.0.105, 63659, None)
21/07/25 15:52:28 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.0.105, 63659, None)
21/07/25 15:52:28 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.0.105, 63659, None)
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2459)
	at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:422)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.map(RDD.scala:421)
	at com.shockang.study.bigdata.spark.errors.serializable.MyTest3.com$shockang$study$bigdata$spark$errors$serializable$MyTest3$$getResult(MyTest3.scala:13)
	at com.shockang.study.bigdata.spark.errors.serializable.MyTest3$.main(MyTest3.scala:29)
	at com.shockang.study.bigdata.spark.errors.serializable.MyTest3.main(MyTest3.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
	- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@3900fa71)
	- field (class: com.shockang.study.bigdata.spark.errors.serializable.MyTest3, name: sc, type: class org.apache.spark.SparkContext)
	- object (class com.shockang.study.bigdata.spark.errors.serializable.MyTest3, com.shockang.study.bigdata.spark.errors.serializable.MyTest3@5c82cd4f)
	- element of array (index: 0)
	- array (class [Ljava.lang.Object;, size 1)
	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class com.shockang.study.bigdata.spark.errors.serializable.MyTest3, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic com/shockang/study/bigdata/spark/errors/serializable/MyTest3.$anonfun$getResult$2:(Lcom/shockang/study/bigdata/spark/errors/serializable/MyTest3;Ljava/lang/String;)Ljava/lang/String;, instantiatedMethodType=(Ljava/lang/String;)Ljava/lang/String;, numCaptured=1])
	- writeReplace data (class: java.lang.invoke.SerializedLambda)
	- object (class com.shockang.study.bigdata.spark.errors.serializable.MyTest3$$Lambda$703/393481646, com.shockang.study.bigdata.spark.errors.serializable.MyTest3$$Lambda$703/393481646@3c6aa04a)
	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413)
	... 11 more
21/07/25 15:52:28 INFO SparkContext: Invoking stop() from shutdown hook
21/07/25 15:52:28 INFO SparkUI: Stopped Spark web UI at http://192.168.0.105:4040
21/07/25 15:52:28 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
21/07/25 15:52:28 INFO MemoryStore: MemoryStore cleared
21/07/25 15:52:28 INFO BlockManager: BlockManager stopped
21/07/25 15:52:28 INFO BlockManagerMaster: BlockManagerMaster stopped
21/07/25 15:52:28 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
21/07/25 15:52:28 INFO SparkContext: Successfully stopped SparkContext
21/07/25 15:52:28 INFO ShutdownHookManager: Shutdown hook called
21/07/25 15:52:28 INFO ShutdownHookManager: Deleting directory /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/spark-43b61841-b823-4c8d-9589-725962d49c2a

4

代码 4

如同前面的做法,将 sc ( SparkContext )和 SparkConf ( SparkConf )两个成员变量使用"@transient"标注后,使当前类不序列化这两个变量,则程序可以正常运行。

此外,与成员变量稍有不同的是,由于该成员函数不依赖特定的成员变量,因此可以定义在 scala 的 Object 中(类似于 Java 中的 static 函数),这样也取消了对特定类的依赖。

如下面的例子所示,将 addWWW 放到 Object 对象中,在 filter 操作中直接调用,这样处理以后,程序能够正常运行

package com.shockang.study.bigdata.spark.errors.serializable

import org.apache.spark.{SparkConf, SparkContext}

class MyTest4 private(conf: String) extends Serializable {
  private val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org")
  private val sparkConf = new SparkConf().setMaster("local[*]").setAppName("MyTest4")
  private val sc = new SparkContext(sparkConf)
  private val rdd = sc.parallelize(list)

  private def getResult(): Unit = {
    val rootDomain = conf
    val result = rdd.filter(item => item.contains(rootDomain)).map(item => MyTest4.addWWW(item))
    result.foreach(println)
  }

  private def stop(): Unit = {
    sc.stop()
  }
}

object MyTest4 {

  private def addWWW(str: String): String = {
    if (str.startsWith("www")) str else "www." + str
  }

  def main(args: Array[String]): Unit = {
    val test = new MyTest4("com")
    test.getResult()
    test.stop()
  }
}

日志 4

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/07/25 15:53:24 INFO SparkContext: Running Spark version 3.1.2
21/07/25 15:53:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/07/25 15:53:24 INFO ResourceUtils: ==============================================================
21/07/25 15:53:24 INFO ResourceUtils: No custom resources configured for spark.driver.
21/07/25 15:53:24 INFO ResourceUtils: ==============================================================
21/07/25 15:53:24 INFO SparkContext: Submitted application: MyTest4
21/07/25 15:53:24 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
21/07/25 15:53:24 INFO ResourceProfile: Limiting resource is cpu
21/07/25 15:53:24 INFO ResourceProfileManager: Added ResourceProfile id: 0
21/07/25 15:53:24 INFO SecurityManager: Changing view acls to: shockang
21/07/25 15:53:24 INFO SecurityManager: Changing modify acls to: shockang
21/07/25 15:53:24 INFO SecurityManager: Changing view acls groups to: 
21/07/25 15:53:24 INFO SecurityManager: Changing modify acls groups to: 
21/07/25 15:53:24 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(shockang); groups with view permissions: Set(); users  with modify permissions: Set(shockang); groups with modify permissions: Set()
21/07/25 15:53:25 INFO Utils: Successfully started service 'sparkDriver' on port 63720.
21/07/25 15:53:25 INFO SparkEnv: Registering MapOutputTracker
21/07/25 15:53:25 INFO SparkEnv: Registering BlockManagerMaster
21/07/25 15:53:25 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
21/07/25 15:53:25 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
21/07/25 15:53:25 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
21/07/25 15:53:25 INFO DiskBlockManager: Created local directory at /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/blockmgr-44d2c8c8-0c79-4d90-b0c5-94ea0528a8ba
21/07/25 15:53:25 INFO MemoryStore: MemoryStore started with capacity 2004.6 MiB
21/07/25 15:53:25 INFO SparkEnv: Registering OutputCommitCoordinator
21/07/25 15:53:25 INFO Utils: Successfully started service 'SparkUI' on port 4040.
21/07/25 15:53:25 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.0.105:4040
21/07/25 15:53:25 INFO Executor: Starting executor ID driver on host 192.168.0.105
21/07/25 15:53:25 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 63721.
21/07/25 15:53:25 INFO NettyBlockTransferService: Server created on 192.168.0.105:63721
21/07/25 15:53:25 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/07/25 15:53:25 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.0.105, 63721, None)
21/07/25 15:53:25 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.0.105:63721 with 2004.6 MiB RAM, BlockManagerId(driver, 192.168.0.105, 63721, None)
21/07/25 15:53:25 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.0.105, 63721, None)
21/07/25 15:53:25 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.0.105, 63721, None)
21/07/25 15:53:26 INFO SparkContext: Starting job: foreach at MyTest4.scala:14
21/07/25 15:53:26 INFO DAGScheduler: Got job 0 (foreach at MyTest4.scala:14) with 12 output partitions
21/07/25 15:53:26 INFO DAGScheduler: Final stage: ResultStage 0 (foreach at MyTest4.scala:14)
21/07/25 15:53:26 INFO DAGScheduler: Parents of final stage: List()
21/07/25 15:53:26 INFO DAGScheduler: Missing parents: List()
21/07/25 15:53:26 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[2] at map at MyTest4.scala:13), which has no missing parents
21/07/25 15:53:26 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 3.6 KiB, free 2004.6 MiB)
21/07/25 15:53:26 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 2003.0 B, free 2004.6 MiB)
21/07/25 15:53:26 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.0.105:63721 (size: 2003.0 B, free: 2004.6 MiB)
21/07/25 15:53:26 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1388
21/07/25 15:53:26 INFO DAGScheduler: Submitting 12 missing tasks from ResultStage 0 (MapPartitionsRDD[2] at map at MyTest4.scala:13) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11))
21/07/25 15:53:26 INFO TaskSchedulerImpl: Adding task set 0.0 with 12 tasks resource profile 0
21/07/25 15:53:26 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (192.168.0.105, executor driver, partition 0, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:53:26 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1) (192.168.0.105, executor driver, partition 1, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:53:26 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2) (192.168.0.105, executor driver, partition 2, PROCESS_LOCAL, 4457 bytes) taskResourceAssignments Map()
21/07/25 15:53:26 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3) (192.168.0.105, executor driver, partition 3, PROCESS_LOCAL, 4449 bytes)

以上是关于org.apache.spark.SparkException: Task not serializable的主要内容,如果未能解决你的问题,请参考以下文章