大数据学习:Scala面向对象和Spark一些代码读和问

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据学习:Scala面向对象和Spark一些代码读和问相关的知识,希望对你有一定的参考价值。

画外音:

Spark对面向对象的支持是非常完美的

主题:

1、简单的类;

2、重写getter、setter方法;

3、利用其它方法来控制外部对值的控制;

4、 private[this];

5、构造器以及构造器相关;



直接代码见真章:

==========最简单的类============

scala> class HiScala{

     | private var name = "Spark"

     | def sayName(){println(name)}

     | def getName = name         

     | }

defined class HiScala

scala> val scal = new HiScala              

scal: HiScala = [email protected]

scala> scal.sayName                  //sayName无参数,可以不传

Spark

scala> scal.getName

res1: String = Spark

==========最简单的类2============

//就算没有private,name也是,对外是public级别的getName

scala> class HiScala{

     | var name = "Scala"

     |  def sayName(){println(name)}

     |  def getName = name

     | }

defined class HiScala

scala> val sca = new HiScala

sca: HiScala = [email protected]

scala> print(sca.name)            //注意:sca.name访问的是public级别的getName,而非name本身

Scala

scala> sca.name = "Spark"

sca.name: String = Spark

scala> sca.name

res3: String = Spark

==========重写getter和setter方法============

scala> class Person{

     | private var myName = "Flink"

     | def name = this.myName

     | def name_=(newName:String){     //这里_=中间不能有空格

     | myName = newName

     | println("Hi:"+myName)

     | }

     | }

defined class Person

scala> val rocky = new Person

rocky: Person = [email protected]

scala> rocky.name

res4: String = Flink

scala> rocky.name = "Spark"

Hi:Spark

rocky.name: String = Spark

==========重写getter和不用setter方法,用来控制外接对我们值的改变============

scala> class Person{

     | private var myName = "Flink"

     | def name = this.myName

     | def update(newName:String){

     | myName = newName

     | println("Hi:"+myName)

     | }

     | }

defined class Person

scala> rok.name

res8: String = Flink

scala> rok.name = "Spark"

<console>:9: error: value name_= is not a member of Person

       rok.name = "Spark"

           ^

scala> rok.update("Spark")

Hi:Spark

scala> rok.name

res10: String = Spark

==========private[this] ============

scala> class Person{

     | private var myName = "Flink"

     | def name = this.myName

     | def update(newName:String){

     | myName = newName

     | println("Hi:"+myName)

     | }

     |

     | def talk(p:Person) = {

     | println("Hello!!!"+p.name)

     | }

     | }

defined class Person

scala> val p1 = new Person

p1: Person = [email protected]

scala> val p2 = new Person

p2: Person = [email protected]

scala> p2.update("Spark")

Hi:Spark

scala> p1.talk(p2)

Hello!!!Spark

=============>

scala> class Person{

     | private[this] var name = "Flink"

     | def update(newName:String){

     | println("Hi:")

     | }

     |

     | def talk(p:Person) = {

     | println("Hello!!!"+p.name)

     | }

     | }

<console>:15: error: value name is not a member of Person      

       println("Hello!!!"+p.name)

//name不能被其它对象访问到,是对象私有的,这个对象的方法也不行

==========重载构造器(必须首先使用默认构造器)============

//与类名放在一起的就是默认构造器

scala> class Person{

     | var name = "Flink"

     | var age = 10

     | def update(newName:String){

     | println("Hi:")

     | }

     |

     | def this(name:String){

     | this()

     | this.name = name

     | }

     |

     | def this(name:String,age:Int){

     | this(name)

     | this.age = age

     | }

     | }

defined class Person

scala> val p = new Person("Spark",12)

p: Person = [email protected]

scala> p.name

res17: String = Spark

========== 默认构造的时候没放在方法的代码都会执行============

scala> class Person{

     | println("Big Data")

     | var name = "Flink"

     | var age = 10

     | def update(newName:String){

     | println("Hi:")

     | }

     |

     | def this(name:String){

     | this()

     | this.name = name

     | }

     |

     | def this(name:String,age:Int){

     | this(name)

     | this.age = age

     | }

     | }

defined class Person

scala> new Person

Big Data

res18: Person = [email protected]

========== 伴生类和伴生对象============

伴生对象一般放静态类和静态成员,object有自己默认的构造器(不带任何参数) ,伴生对象的构造器只会在第一次调用的时候执行,以后不会执行

伴生类和伴生对象必须在同一个文件

scala> class Person{

     | println("Big Data")

     | var name = "Flink"

     | var age = 10

     | def update(newName:String){

     | println("Hi:")

     | }

     |

     | def this(name:String){

     | this()

     | this.name = name

     | }

     |

     | def this(name:String,age:Int){

     | this(name)

     | this.age = age

     | }

     | }

defined class Person

scala> object Person{

     | println("Scala")

     | var salary = 0.0

     | def getSalary = salary

     | }

defined module Person

warning: previously defined class Person is not a companion to object Person.

Companions must be defined together; you may wish to use :paste mode for this.

scala> Person.getSalary

Scala

res0: Double = 0.0

//val array = Array(1,2,3)其实调用的apply,就是伴生对象的工厂方法,Scala中一般都直接用工厂方法

scala> val array = Array(1,2,3)

array: Array[Int] = Array(1, 2, 3)

==========继承、接口============

抽象的方法,子类必须实现,实现的时候规范上最好写override

抽象的属性,子类必须override

trait接口,第一个用extends后面用with混入

Scala不支持多重集成,但是支持集成多个trait

trait抽象字段,没有值,必须说明它的类型

==========SparkContext============

默认构造器,至少要有一个SparkConf

作业:

自己分析SparkContext和RDD类中写的语法,来学习Scala最正宗的语法

***********SparkContext *************

~~~1、~~~

//默认构造器必须要传SparkConf

class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {

理解第一句的时候特意找了下网上解释:

trait的使用方法就是这样子了,它很强大,抽象类能做的事情,trait都可以做。它的长处在于可以多继承。

trait和抽象类的区别在于抽象类是对一个继承链的,类和类之前确实有父子类的继承关系,而trait则如其名字,表示一种特征,可以多继承。


继承了Logging接口,Logging接口的介绍首先就说

Utility trait for classes that want to log data

完全为了记录日志数据用的,它其实用的是slf4j,在Spark核心包中,基于slf4j将里面记录日志的一些方式以Scala的方式来提供给Spark用,Logging深入暂时不深入阅读


继承了ExecutorAllocationClient

这个接口干嘛用的呢?

/**

 * A client that communicates with the cluster manager to request or kill executors.

 * This is currently supported only in YARN mode.

 */

就是作为客户端在集群模式下请求或者杀掉执行者,只能在YARN模式下跑——YARN是啥模式?LINUX比较熟悉的人应该知道,但是我暂时不知道,以后再说。

这个接口中我看代码较少,就请求和杀掉分别两个方法。


另外,对于随处可见的

private [spark]

照例问度娘,得到了比较容易接受的解释: 定义了一个私有的不可变量env,这里的[spark],是一种保护的作用域,这个意思是,env这个量在包spark是可见的,在包spark之外是不可见的。

~~~2、~~~

类下面第一句

private val creationSite: CallSite = Utils. getCallSite()

往后看了下Utils. getCallSite

/**

   * When called inside a class in the spark package, returns the name of the user code class

   * (outside the spark package) that called into Spark, as well as which Spark method they called.

   * This is used, for example, to tell users where in their code each RDD got created.

   *

   * @param skipClass Function that is used to exclude non-user-code classes.

   */

  def getCallSite( skipClass: String => Boolean = sparkInternalExclusionFunction): CallSite = {

从字面理解来看应该是JAVA中的.class差不多,或者我理解不一定正确——当作一个问题吧

至于应用,后续再看,代码量太大,先从语法角度往下看吧

~~~3、~~~

 private val allowMultipleContexts : Boolean =

    config.getBoolean("spark.driver.allowMultipleContexts", false)

老师讲课主要介绍了getBoolean ,就像getOrElse一样

~~~4、~~~

/** Get all executor environment variables set on this SparkConf */

  def getExecutorEnv: Seq[(String , String)] = {

    val prefix = "spark.executorEnv."

    getAll. filter{case (k, v) => k.startsWith (prefix)}

          . map{ case (k , v ) => (k.substring(prefix.length), v)}

  }

这里主要是稍微了解了下filter中case的用法

~~~5、~~~

private [spark] val stopped: AtomicBoolean = new AtomicBoolean( false)


这里主要用了 java.util.concurrent.atomic.AtomicBoolean,这是java中的,至于做啥的,之前可能在多线程方面的研究不是很多,继续度娘,找到一个比较容易理解的答案。

其基本的特性就是在多线程环境下,当有多个线程同时执行这些类的实例包含的方法时,具有排他性,即当某个线程进入方法,执行其中的指令时,不会被其他线程打断,而别的线程就像自旋锁一样,一直等到该方法执行完成,才由JVM从等待队列中选择一个另一个线程进入,这只是一种逻辑上的理解

这个文件中有更新的地方,就是在stop方法中,调用是

if (!stopped .compareAndSet (false, true)) {

      logInfo("SparkContext already stopped.")

      return

    }

这个方法主要两个作用         1. 比较AtomicBoolean和expect的值,如果一致,执行方法内的语句。其实就是一个if语句         2. 把AtomicBoolean的值设成update         比较最要的是这两件事是一气呵成的,这连个动作之间不会被打断,任何内部或者外部的语句都不可能在两个动作之间运行。为多线程的控制提供了解决的方案。

~~~6、~~~

throw new IllegalStateException(

        s"""Cannot call methods on a stopped SparkContext.

           |This stopped SparkContext was created at:

           |

           |$ {creationSite .longForm }

           |

           |The currently active SparkContext was created at:

           |

           |$ activeCreationSite

         """ .stripMargin )

这个里面见到了s这个,开始云里雾里,点击进去看代码

def s(args: Any*): String = standardInterpolator(treatEscapes, args)

   *  a `${}` block, for example:

   *  {{{

   *    println(s"1 + 1 = ${1 + 1}")

   *  }}}

   *  will print the string `1 + 1 = 2`.

对照这个例子,分分钟理解了,就是格式化

~~~7、~~~

/**

   * Create a SparkContext that loads settings from system properties (for instance, when

   * launching with ./bin/spark-submit).

   */

  def this() = this(new SparkConf())

默认构造器说明了必须需要sparkConf

~~~8、~~~

def setLogLevel (logLevel : String) {

    val validLevels = Seq( "ALL", "DEBUG" , "ERROR" , "FATAL" , "INFO" , "OFF" , "TRACE", "WARN" )

    if (! validLevels.contains (logLevel )) {

      throw new IllegalArgumentException (

        s "Supplied level $logLevel did not match one of: ${validLevels. mkString( ",")}" )

    }

    Utils.setLogLevel(org.apache.log4j.Level. toLevel( logLevel))

  }

里面有设置日志级别的

~~~9、~~~

// since we can‘t set env vars directly in sbt.

    for { ( envKey, propKey) <- Seq(( "SPARK_TESTING", "spark.testing" ))

      value <- Option(System.getenv (envKey )).orElse (Option(System.getProperty( propKey)))} {

      executorEnvs( envKey) = value

    }

完全是为了看Option是啥,

def apply[ A](x : A): Option[ A] = if (x == null) None else Some(x)

~~~10、~~~

 Some( Utils.getThreadDump ())

看到个Some

/** Class `Some[A]` represents existing values of type

 *  `A`.

 *

 *  @author  Martin Odersky

 *  @version 1.0, 16/07/2003

 */

@SerialVersionUID( 1234815782226070388L) // value computed by serialver for 2.11.2, annotation added in 2.11.4

final case class Some[+ A]( x: A) extends Option[A ] {

  def isEmpty = false

  def get = x

}

看这注释,难道是JAVA里面的空指针模式???

SparkContext先看这么多吧,里面原理什么的估计得后面课程看,再看下RDD源码

***********RDD *************

~~~1、~~~

abstract class RDD[T: ClassTag](

    @transient private var _sc : SparkContext,

    @transient private var deps : Seq[Dependency[_]]

  ) extends Serializable with Logging {

抽象类,参数泛化

transient关键字修饰的变量不再能被序列化,一个静态变量不管是否被transient修饰,均不能被序列化

~~~2、~~~

private val nextRddId = new AtomicInteger( 0)

在上面的stopped里面已经解释过了,它的递增方式

/** Register a new RDD, returning its RDD ID */

  private[spark] def 大数据学习之Scala语言基本语法学习36

Scala学习-变量常量运算符流程控制和函数

Scala简介及基础语法

大数据学习:Scala隐式转换和并发编程(DT大数据梦工厂)

Scala基本使用

大数据入门:Java和Scala编程对比