来自 RDD 映射的 Spark Scala 序列化错误

Posted

技术标签:

【中文标题】来自 RDD 映射的 Spark Scala 序列化错误【英文标题】:Spark Scala Serialization Error from RDD map 【发布时间】:2017-07-22 21:59:45 【问题描述】:

我有一个格式为 RDD[((Long, Long), (Long, Long))] 的 RDD,我需要转换或转换为 RDD[((Long, Long), (Long, Long, Long, Long ))] 其中第二个 RDD 元组基于第一个 RDD 中的函数。

我正在尝试实现这个基于地图的功能,但是我认为这里做错了。请帮我解决问题。

这里是完整的代码:

package com.ranker.correlation.listitem
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import scala.collection.Map

class ListItemCorrelation(sc: SparkContext) extends Serializable 

  def up_down(dirX: Long, dirY: Long): (Long, Long, Long, Long) = 
    if (dirX.equals(1)) 
      if (dirY.equals(1)) 
        return (1, 0, 0, 0)
       else 
        return (0, 1, 0, 0)
      
     else 
      if (dirY.equals(1)) 
        return (0, 0, 1, 0)
       else 
        return (0, 0, 0, 1)
      
    
  

  def run(votes: String):  RDD[((Long, Long), (Long, Long, Long, Long))]   = 
    val userVotes = sc.textFile(votes)
    val userVotesPairs = userVotes.map  t =>
      val p = t.split(",")
      (p(0).toLong, (p(1).toLong, p(2).toLong))
    
    val jn = userVotesPairs.join(userVotesPairs).values.filter(t => t._1._1.<(t._2._1))
    val first = jn.map(t => ((t._1._1, t._2._1), (t._1._2, t._2._2)))
    var second = first.map(t => ((t._1._1, t._2._1), up_down(t._1._2, t._2._2)))
    //More functionality
    return result
  


object ListItemCorrelation extends Serializable 
  def main(args: Array[String]) 
    val votes = args(0)
    val conf = new SparkConf().setAppName("SparkJoins").setMaster("local")
    val context = new SparkContext(conf)
    val job = new ListItemCorrelation(context)
    val results = job.run(votes)
    val output = args(1)
    results.saveAsTextFile(output)
    context.stop()
  

当我尝试运行此脚本时,出现以下错误:

线程“主”org.apache.spark.SparkException 中的异常:任务不是 可序列化在 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) 在 org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) 在 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) 在 org.apache.spark.SparkContext.clean(SparkContext.scala:2094) 在 org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370) 在 org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 在 org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 在 org.apache.spark.rdd.RDD.map(RDD.scala:369) 在 com.ranker.correlation.listitem.ListItemCorrelation.run(ListItemCorrelation.scala:34) 在 com.ranker.correlation.listitem.ListItemCorrelation$.main(ListItemCorrelation.scala:47) 在 com.ranker.correlation.listitem.ListItemCorrelation.main(ListItemCorrelation.scala) 引起:java.io.NotSerializableException: org.apache.spark.SparkContext 序列化栈: - 对象不可序列化(类:org.apache.spark.SparkContext,值:org.apache.spark.SparkContext@4248e66b) - 字段(类:com.ranker.correlation.listitem.ListItemCorrelation,名称:sc,类型:类 org.apache.spark.SparkContext) - 对象(com.ranker.correlation.listitem.ListItemCorrelation 类,com.ranker.correlation.listitem.ListItemCorrelation@270b6b5e) - 字段(类:com.ranker.correlation.listitem.ListItemCorrelation$$anonfun$4,名称: $外部,类型:类 com.ranker.correlation.listitem.ListItemCorrelation) - 对象(com.ranker.correlation.listitem.ListItemCorrelation$$anonfun$4 类, ) 在 org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 在 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) 在 org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) 在 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) ... 12 更多

执行以下行时发生此错误:

var second = first.map(t => ((t._1._1, t._2._1), up_down(t._1._2, t._2._2)))

我对 scala 很陌生,请帮助我找到正确的方法。

【问题讨论】:

SparkContext 下推作为run 的参数?用当前的实现会一直拖下去,由于不是序列化,导致失败。 @geek:你找到解决方案了吗? 【参考方案1】:

up_down 方法放在伴随对象上。当在 RDD 闭包中访问任何类变量时,该类(以及其中的所有内容,如 SparkContext)都会被序列化。方法参数在这里算作类变量。使用静态对象可以解决这个问题:

package com.ranker.correlation.listitem
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import scala.collection.Map

object ListItemCorrelation 
  def up_down(dirX: Long, dirY: Long): (Long, Long, Long, Long) = 
    if (dirX.equals(1)) 
      if (dirY.equals(1)) 
        return (1, 0, 0, 0)
       else 
        return (0, 1, 0, 0)
      
     else 
      if (dirY.equals(1)) 
        return (0, 0, 1, 0)
       else 
        return (0, 0, 0, 1)
      
    
  



class ListItemCorrelation(sc: SparkContext) extends Serializable 

  def run(votes: String):  RDD[((Long, Long), (Long, Long, Long, Long))]   = 
    val userVotes = sc.textFile(votes)
    val userVotesPairs = userVotes.map  t =>
      val p = t.split(",")
      (p(0).toLong, (p(1).toLong, p(2).toLong))
    
    val jn = userVotesPairs.join(userVotesPairs).values.filter(t => t._1._1.<(t._2._1))
    val first = jn.map(t => ((t._1._1, t._2._1), (t._1._2, t._2._2)))
    var second = first.map(t => ((t._1._1, t._2._1), ListItemCorrelation.up_down(t._1._2, t._2._2)))
    //More functionality
    return result
  


object ListItemCorrelation extends Serializable 
  def main(args: Array[String]) 
    val votes = args(0)
    val conf = new SparkConf().setAppName("SparkJoins").setMaster("local")
    val context = new SparkContext(conf)
    val job = new ListItemCorrelation(context)
    val results = job.run(votes)
    val output = args(1)
    results.saveAsTextFile(output)
    context.stop()
  

【讨论】:

以上是关于来自 RDD 映射的 Spark Scala 序列化错误的主要内容,如果未能解决你的问题,请参考以下文章

基于Scala中另一列的值映射RDD列

来自 Dataset 的 RDD 导致 Spark 2.x 的序列化错误

Spark-序列化依赖关系持久化

如何将地图转换为 Spark 的 RDD

Spark中RDD转换成DataFrame的两种方式(分别用Java和Scala实现)

如何将 RDD [GenericRecord] 转换为 scala 中的数据框?