Spark闭包与序列化

Posted bluishglc

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark闭包与序列化相关的知识,希望对你有一定的参考价值。

推荐:博主历时三年倾注大量心血创作的《大数据平台架构与原型实现:数据中台建设实战》一书已由知名IT图书品牌电子工业出版社博文视点出版发行,真诚推荐给每一位读者!点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,扫码进入京东手机购书页面!

在这里插入图片描述

本文原文出处: http://blog.csdn.net/bluishglc/article/details/50945032 严禁任何形式的转载,否则将委托CSDN官方维护权益!

Spark的官方文档再三强调那些将要作用到RDD上的操作,不管它们是一个函数还是一段代码片段,它们都是“闭包”,Spark会把这个闭包分发到各个worker节点上去执行,这里涉及到了一个容易被忽视的问题:闭包的“序列化”。

显然,闭包是有状态的,这主要是指它牵涉到的那些自由变量以及自由变量依赖到的其他变量,所以,在将一个简单的函数或者一段代码片段(就是闭包)传递给类似RDD.map这样的操作前,Spark需要检索闭包内所有的涉及到的变量(包括传递依赖的变量),正确地把这些变量序列化之后才能传递到worker节点并反序列化去执行。如果在涉及到的所有的变量中有任何不支持序列化或没有指明如何序列化自己时,你就会遇到这样的错误:

org.apache.spark.SparkException: Task not serializable

在下面的例子中,我们从kafka中持续地接收json消息,并在spark-streaming中将字符串解析成对应的实体:

object App {
    private val config = ConfigFactory.load("my-streaming.conf")
    case class Person (firstName: String,lastName: String)
    def main(args: Array[String]) {
        val zkQuorum = config.getString("kafka.zkQuorum")
        val myTopic = config.getString("kafka.myTopic")
        val myGroup = config.getString("kafka.myGroup")
        val conf = new SparkConf().setAppName("my-streaming")
        val ssc = new StreamingContext(conf, Seconds(1))
        val lines = KafkaUtils.createStream(ssc, zkQuorum, myGroup, Map(myTopic -> 1))
        //this val is a part of closure, and it's not serializable!
        implicit val formats = DefaultFormats
        def parser(json: String) = parse(json).extract[Person].firstName
        lines.map(_._2).map(parser).print
        ....
        ssc.start()
        ssc.awaitTerminationOrTimeout(10000)
        ssc.stop()
    }

}

这段代码在执行时就会报如下错误:

org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: org.json4s.DefaultFormats$

问题的症结就在于:闭包没有办法序列化。在这个例子里,闭包的范围是:函数parser以及它所依赖的一个隐式参数: formats , 而问题就出在这个隐式参数上, 它的类型是DefaultFormats,这个类没有提供序列化和反序列自身的说明,所以Spark无法序列化formats,进而无法将task推送到远端执行。

隐式参数formats是为extract准备的,它的参数列表如下:

org.json4s.ExtractableJsonAstNode#extract[A](implicit formats: Formats, mf: scala.reflect.Manifest[A]): A = ...

找到问题的根源之后就好解决了。实际上我们根本不需要序列化formats, 对我们来说,它是无状态的。所以,我们只需要把它声明为一个全局静态的变量就可以绕过序列化。所以改动的方法就是简单地把implicit val formats = DefaultFormats的声明从方法内部迁移到App Object的字段位置上即可。

object App {
    private val config = ConfigFactory.load("my-streaming.conf")
    case class Person (firstName: String,lastName: String)
    //As Object field, global, static, no need to serialize
    implicit val formats = DefaultFormats
	
    def main(args: Array[String]) {
        val zkQuorum = config.getString("kafka.zkQuorum")
        val myTopic = config.getString("kafka.myTopic")
        val myGroup = config.getString("kafka.myGroup")
        val conf = new SparkConf().setAppName("my-streaming")
        val ssc = new StreamingContext(conf, Seconds(1))
        val lines = KafkaUtils.createStream(ssc, zkQuorum, myGroup, Map(myTopic -> 1))
        def parser(json: String) = parse(json).extract[Person].firstName
        lines..map(_._2).map(parser).print
	    ....
        ssc.start()
        ssc.awaitTerminationOrTimeout(10000)
        ssc.stop()
    }

}

这里再提供另外一个很好的例子:

这个例子很好演示了解决类似问题的方案:“把类成员变量拷贝一份到闭包中" ,不然整个对象都需要被序列化!

最后我们来总结一下应该如何正确的处理Spark Task闭包的序列化问题。首先你需要对Task涉及的闭包的边界要有一个清晰的认识,要尽量地控制闭包的范围和牵涉到的自由变量,一个非常值得警惕的地方是:尽量不要在闭包中直接引用一个类的成员变量和函数,这样会导致整个类实例被序列化。这样的例子在Spark文档中也有提及,如下:

class MyClass {
  def func1(s: String): String = { ... }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}

然后,一个好的组织代码的方式是:除了那些很短小的函数,尽量把复杂的操作封装到全局单一的函数体:全局静态方法或者函数对象

如果确实需要某个类的实例参与到计算过程中,则要作好相关的序列化工作。


#相关阅读
#Spark官方文档《Spark Programming Guide》解读
#Spark SQL: Error in query: undefined function错误的解决方法
# 类型类 V.S. 桥接模式:撞衫不可怕,谁丑谁尴尬

以上是关于Spark闭包与序列化的主要内容,如果未能解决你的问题,请参考以下文章

Spark - 不可序列化的任务:如何使用调用外部类/对象的复杂地图闭包?

Spark-序列化依赖关系持久化

Spark——传递函数与闭包

在这个 spark 代码片段中 ordering.by 是啥意思?

2016.3.3(Spark框架预览,Scala部分应用函数闭包高阶函数,关于语义分析的一些心得)

python+spark程序代码片段