Scala 错误:线程“主”org.apache.spark.SparkException 中的异常:任务不可序列化

Posted

技术标签:

【中文标题】Scala 错误:线程“主”org.apache.spark.SparkException 中的异常:任务不可序列化【英文标题】:Scala error: Exception in thread "main" org.apache.spark.SparkException: Task not serializable 【发布时间】:2021-12-20 11:06:39 【问题描述】:

运行此代码时出现不可序列化错误:

import org.apache.spark.SparkContext, SparkConf
import scala.collection.mutable.ArrayBuffer
object Task1 
  def findHighestRatingUsers(movieRating: String): (String) = 
    val tokens = movieRating.split(",", -1)
    val movieTitle = tokens(0)
    val ratings = tokens.slice(1, tokens.size)
    val maxRating = ratings.max
    var userIds = ArrayBuffer[Int]()

    for(i <- 0 until ratings.length)
      if (ratings(i) == maxRating) 
        userIds += (i+1)
      
    

    return movieTitle + "," + userIds.mkString(",")

    return movieTitle
  

  def main(args: Array[String]) 

    val conf = new SparkConf().setAppName("Task 1")
    val sc = new SparkContext(conf)

    val Lines = sc.textFile(args(0))


    val TitleAndMaxUserIds = Lines.map(findHighestRatingUsers)
      .saveAsTextFile(args(1))
  

错误发生在以下行:

val TitleAndMaxUserIds =Lines.map(findHighestRatingUsers)
      .saveAsTextFile(args(1))

我相信这是由于函数“findHighestRatingUsers”中的某些原因造成的。有人可以解释为什么以及如何解决它吗?

异常中的更多信息如下:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2362)
    at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:396)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
    at org.apache.spark.rdd.RDD.map(RDD.scala:395)
    at Task1$.main(Task1.scala:63)
    at Task1.main(Task1.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: Task1$
Serialization stack:
    - object not serializable (class: Task1$, value: Task1$@3c770db4)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class Task1$, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic Task1$.$anonfun$main$1:(LTask1$;Ljava/lang/String;)Ljava/lang/String;, instantiatedMethodType=(Ljava/lang/String;)Ljava/lang/String;, numCaptured=1])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class Task1$$$Lambda$1023/20408451, Task1$$$Lambda$1023/20408451@4f59a516)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413)
    ... 22 more

我查看了这篇文章 Difference between object and class in Scala 并尝试使用对象来封装函数:

import org.apache.spark.SparkContext, SparkConf
import scala.collection.mutable.ArrayBuffer

object Function
    def _findHighestRatingUsers(movieRating: String): (String) = 
      val tokens = movieRating.split(",", -1)
      val movieTitle = tokens(0)
      val ratings = tokens.slice(1, tokens.size)
      val maxRating = ratings.max
      var userIds = ArrayBuffer[Int]()

      for(i <- 0 until ratings.length)
        if (ratings(i) == maxRating) 
          userIds += (i+1)
        
      

      return movieTitle + "," + userIds.mkString(",")
    



object Task1 

  def main(args: Array[String]) 
    val conf = new SparkConf().setAppName("Task 1")
    val sc = new SparkContext(conf)

    val textFile = sc.textFile(args(0))

    val output = textFile.map(Function._findHighestRatingUsers)
      .saveAsTextFile(args(1))
  

但仍然出现异常,出现大量错误...


这次我尝试将对象函数放在对象task1中,如下所示:

import org.apache.spark.SparkContext, SparkConf
import scala.collection.mutable.ArrayBuffer
object Task1 
    
  object Function
    def _findHighestRatingUsers(movieRating: String): (String) = 
      val tokens = movieRating.split(",", -1)
      val movieTitle = tokens(0)
      val ratings = tokens.slice(1, tokens.size)
      val maxRating = ratings.max
      var userIds = ArrayBuffer[Int]()

      for(i <- 0 until ratings.length)
        if (ratings(i) == maxRating) 
          userIds += (i+1)
        
      

      return movieTitle + "," + userIds.mkString(",")
    
  

  def main(args: Array[String]) 
    val conf = new SparkConf().setAppName("Task 1")
    val sc = new SparkContext(conf)

    val textFile = sc.textFile(args(0))

    val output = textFile.map(Function._findHighestRatingUsers)
      .saveAsTextFile(args(1))
  

问题解决了。但是我还是不知道为什么嵌套对象解决了这个问题。有人可以解释一下吗? 此外,我有几点不确定:

    scala 中的主要功能是什么?是节目的入口吗? 为什么要用对象来描述主函数? 谁能给出一个包含函数、类或一些基本组件的 Scala 程序的通用结构?

【问题讨论】:

你能发布完整的堆栈跟踪吗?您是否尝试将方法 findHighestRatingUsers 移动到与包含 main 方法的对象不同的对象中? 您使用的是什么版本的 Java? 我已经在@GaëlJ 上面发布了错误。我不知道这些错误的含义。 它的 java8 @tjheslin1 什么异常。你在第二个代码示例中得到了堆栈跟踪吗? 【参考方案1】:

首先,我建议您阅读有关 Scala 和 Spark 的文档来熟悉它,因为您的问题突出表明您才刚刚开始使用它。

我将为您关于“任务不可序列化”的原始问题提供一些见解(但不会准确回答),并让您为您稍后在帖子中添加的问题打开其他问题,否则此答案将搞得一团糟。

您可能知道,Spark 允许分布式计算。为此,Spark 所做的一件事是获取您编写的代码,对其进行序列化,然后将其发送给某处的一些执行程序以实际运行它。这里的关键部分是您的代码必须是可序列化的

你得到的错误是告诉你 Spark 不能序列化你的代码。

现在,如何使其可序列化?这就是它变得具有挑战性的地方,即使 Spark 试图通过提供“序列化堆栈”来帮助您,但有时它提供的信息并没有那么有用。

在您的情况下(代码的第一个示例),findHighestRatingUsers 必须被序列化,但要这样做,它必须序列化整个不可序列化的 object Task1

为什么Task1 不可序列化?我承认我不太确定,但我会打赌main 方法,虽然我希望你的第二个例子可以序列化。

您可以在网络上的各种文档或博客文章中阅读更多相关信息。例如:https://medium.com/swlh/spark-serialization-errors-e0eebcf0f6e6

【讨论】:

以上是关于Scala 错误:线程“主”org.apache.spark.SparkException 中的异常:任务不可序列化的主要内容,如果未能解决你的问题,请参考以下文章

运行 HIVE 命令抛出错误:线程“主”java.lang.RuntimeException 中的异常:org.apache.hadoop.hive.ql.metadata.HiveException

Spark/scala 中的 SQL 查询

intellij 中 spark scala 应用程序中的线程“main”java.lang.NoClassDefFoundError:org/apache/spark/sql/catalyst/St

scala.collection.immutable.Iterable[org.apache.spark.sql.Row] 到 DataFrame ?错误:使用替代方法重载了方法值 createDat

scala中的Flink Kafka程序给出超时错误org.apache.kafka.common.errors.TimeoutException:60000毫秒后更新元数据失败

forEach Spark Scala 中的错误:值选择不是 org.apache.spark.sql.Row 的成员