大数据学习:Scala面向对象和Spark一些代码读和问
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据学习:Scala面向对象和Spark一些代码读和问相关的知识,希望对你有一定的参考价值。
画外音:
Spark对面向对象的支持是非常完美的
主题:
1、简单的类;
2、重写getter、setter方法;
3、利用其它方法来控制外部对值的控制;
4、 private[this];
5、构造器以及构造器相关;
直接代码见真章:
==========最简单的类============
scala> class HiScala{
| private var name = "Spark"
| def sayName(){println(name)}
| def getName = name
| }
defined class HiScala
scala> val scal = new HiScala
scal: HiScala = [email protected]
scala> scal.sayName //sayName无参数,可以不传
Spark
scala> scal.getName
res1: String = Spark
==========最简单的类2============
//就算没有private,name也是,对外是public级别的getName
scala> class HiScala{
| var name = "Scala"
| def sayName(){println(name)}
| def getName = name
| }
defined class HiScala
scala> val sca = new HiScala
sca: HiScala = [email protected]
scala> print(sca.name) //注意:sca.name访问的是public级别的getName,而非name本身
Scala
scala> sca.name = "Spark"
sca.name: String = Spark
scala> sca.name
res3: String = Spark
==========重写getter和setter方法============
scala> class Person{
| private var myName = "Flink"
| def name = this.myName
| def name_=(newName:String){ //这里_=中间不能有空格
| myName = newName
| println("Hi:"+myName)
| }
| }
defined class Person
scala> val rocky = new Person
rocky: Person = [email protected]
scala> rocky.name
res4: String = Flink
scala> rocky.name = "Spark"
Hi:Spark
rocky.name: String = Spark
==========重写getter和不用setter方法,用来控制外接对我们值的改变============
scala> class Person{
| private var myName = "Flink"
| def name = this.myName
| def update(newName:String){
| myName = newName
| println("Hi:"+myName)
| }
| }
defined class Person
scala> rok.name
res8: String = Flink
scala> rok.name = "Spark"
<console>:9: error: value name_= is not a member of Person
rok.name = "Spark"
^
scala> rok.update("Spark")
Hi:Spark
scala> rok.name
res10: String = Spark
==========private[this] ============
scala> class Person{
| private var myName = "Flink"
| def name = this.myName
| def update(newName:String){
| myName = newName
| println("Hi:"+myName)
| }
|
| def talk(p:Person) = {
| println("Hello!!!"+p.name)
| }
| }
defined class Person
scala> val p1 = new Person
p1: Person = [email protected]
scala> val p2 = new Person
p2: Person = [email protected]
scala> p2.update("Spark")
Hi:Spark
scala> p1.talk(p2)
Hello!!!Spark
=============>
scala> class Person{
| private[this] var name = "Flink"
| def update(newName:String){
| println("Hi:")
| }
|
| def talk(p:Person) = {
| println("Hello!!!"+p.name)
| }
| }
<console>:15: error: value name is not a member of Person
println("Hello!!!"+p.name)
//name不能被其它对象访问到,是对象私有的,这个对象的方法也不行
==========重载构造器(必须首先使用默认构造器)============
//与类名放在一起的就是默认构造器
scala> class Person{
| var name = "Flink"
| var age = 10
| def update(newName:String){
| println("Hi:")
| }
|
| def this(name:String){
| this()
| this.name = name
| }
|
| def this(name:String,age:Int){
| this(name)
| this.age = age
| }
| }
defined class Person
scala> val p = new Person("Spark",12)
p: Person = [email protected]
scala> p.name
res17: String = Spark
========== 默认构造的时候没放在方法的代码都会执行============
scala> class Person{
| println("Big Data")
| var name = "Flink"
| var age = 10
| def update(newName:String){
| println("Hi:")
| }
|
| def this(name:String){
| this()
| this.name = name
| }
|
| def this(name:String,age:Int){
| this(name)
| this.age = age
| }
| }
defined class Person
scala> new Person
Big Data
res18: Person = [email protected]
========== 伴生类和伴生对象============
伴生对象一般放静态类和静态成员,object有自己默认的构造器(不带任何参数) ,伴生对象的构造器只会在第一次调用的时候执行,以后不会执行
伴生类和伴生对象必须在同一个文件
scala> class Person{
| println("Big Data")
| var name = "Flink"
| var age = 10
| def update(newName:String){
| println("Hi:")
| }
|
| def this(name:String){
| this()
| this.name = name
| }
|
| def this(name:String,age:Int){
| this(name)
| this.age = age
| }
| }
defined class Person
scala> object Person{
| println("Scala")
| var salary = 0.0
| def getSalary = salary
| }
defined module Person
warning: previously defined class Person is not a companion to object Person.
Companions must be defined together; you may wish to use :paste mode for this.
scala> Person.getSalary
Scala
res0: Double = 0.0
//val array = Array(1,2,3)其实调用的apply,就是伴生对象的工厂方法,Scala中一般都直接用工厂方法
scala> val array = Array(1,2,3)
array: Array[Int] = Array(1, 2, 3)
==========继承、接口============
抽象的方法,子类必须实现,实现的时候规范上最好写override
抽象的属性,子类必须override
trait接口,第一个用extends后面用with混入
Scala不支持多重集成,但是支持集成多个trait
trait抽象字段,没有值,必须说明它的类型
==========SparkContext============
默认构造器,至少要有一个SparkConf
作业:
自己分析SparkContext和RDD类中写的语法,来学习Scala最正宗的语法
***********SparkContext *************
~~~1、~~~
//默认构造器必须要传SparkConf
class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {
理解第一句的时候特意找了下网上解释:
trait的使用方法就是这样子了,它很强大,抽象类能做的事情,trait都可以做。它的长处在于可以多继承。
trait和抽象类的区别在于抽象类是对一个继承链的,类和类之前确实有父子类的继承关系,而trait则如其名字,表示一种特征,可以多继承。
继承了Logging接口,Logging接口的介绍首先就说
Utility trait for classes that want to log data
完全为了记录日志数据用的,它其实用的是slf4j,在Spark核心包中,基于slf4j将里面记录日志的一些方式以Scala的方式来提供给Spark用,Logging深入暂时不深入阅读
继承了ExecutorAllocationClient
这个接口干嘛用的呢?
/**
* A client that communicates with the cluster manager to request or kill executors.
* This is currently supported only in YARN mode.
*/
就是作为客户端在集群模式下请求或者杀掉执行者,只能在YARN模式下跑——YARN是啥模式?LINUX比较熟悉的人应该知道,但是我暂时不知道,以后再说。
这个接口中我看代码较少,就请求和杀掉分别两个方法。
另外,对于随处可见的
private [spark]
照例问度娘,得到了比较容易接受的解释: 定义了一个私有的不可变量env,这里的[spark],是一种保护的作用域,这个意思是,env这个量在包spark是可见的,在包spark之外是不可见的。
~~~2、~~~
类下面第一句
private val creationSite: CallSite = Utils. getCallSite()
往后看了下Utils. getCallSite
/**
* When called inside a class in the spark package, returns the name of the user code class
* (outside the spark package) that called into Spark, as well as which Spark method they called.
* This is used, for example, to tell users where in their code each RDD got created.
*
* @param skipClass Function that is used to exclude non-user-code classes.
*/
def getCallSite( skipClass: String => Boolean = sparkInternalExclusionFunction): CallSite = {
从字面理解来看应该是JAVA中的.class差不多,或者我理解不一定正确——当作一个问题吧
至于应用,后续再看,代码量太大,先从语法角度往下看吧
~~~3、~~~
private val allowMultipleContexts : Boolean =
config.getBoolean("spark.driver.allowMultipleContexts", false)
老师讲课主要介绍了getBoolean ,就像getOrElse一样
~~~4、~~~
/** Get all executor environment variables set on this SparkConf */
def getExecutorEnv: Seq[(String , String)] = {
val prefix = "spark.executorEnv."
getAll. filter{case (k, v) => k.startsWith (prefix)}
. map{ case (k , v ) => (k.substring(prefix.length), v)}
}
这里主要是稍微了解了下filter中case的用法
~~~5、~~~
private [spark] val stopped: AtomicBoolean = new AtomicBoolean( false)
这里主要用了 java.util.concurrent.atomic.AtomicBoolean,这是java中的,至于做啥的,之前可能在多线程方面的研究不是很多,继续度娘,找到一个比较容易理解的答案。
其基本的特性就是在多线程环境下,当有多个线程同时执行这些类的实例包含的方法时,具有排他性,即当某个线程进入方法,执行其中的指令时,不会被其他线程打断,而别的线程就像自旋锁一样,一直等到该方法执行完成,才由JVM从等待队列中选择一个另一个线程进入,这只是一种逻辑上的理解
这个文件中有更新的地方,就是在stop方法中,调用是
if (!stopped .compareAndSet (false, true)) {
logInfo("SparkContext already stopped.")
return
}
这个方法主要两个作用 1. 比较AtomicBoolean和expect的值,如果一致,执行方法内的语句。其实就是一个if语句 2. 把AtomicBoolean的值设成update 比较最要的是这两件事是一气呵成的,这连个动作之间不会被打断,任何内部或者外部的语句都不可能在两个动作之间运行。为多线程的控制提供了解决的方案。
~~~6、~~~
throw new IllegalStateException(
s"""Cannot call methods on a stopped SparkContext.
|This stopped SparkContext was created at:
|
|$ {creationSite .longForm }
|
|The currently active SparkContext was created at:
|
|$ activeCreationSite
""" .stripMargin )
这个里面见到了s这个,开始云里雾里,点击进去看代码
def s(args: Any*): String = standardInterpolator(treatEscapes, args)
* a `${}` block, for example:
* {{{
* println(s"1 + 1 = ${1 + 1}")
* }}}
* will print the string `1 + 1 = 2`.
对照这个例子,分分钟理解了,就是格式化
~~~7、~~~
/**
* Create a SparkContext that loads settings from system properties (for instance, when
* launching with ./bin/spark-submit).
*/
def this() = this(new SparkConf())
默认构造器说明了必须需要sparkConf
~~~8、~~~
def setLogLevel (logLevel : String) {
val validLevels = Seq( "ALL", "DEBUG" , "ERROR" , "FATAL" , "INFO" , "OFF" , "TRACE", "WARN" )
if (! validLevels.contains (logLevel )) {
throw new IllegalArgumentException (
s "Supplied level $logLevel did not match one of: ${validLevels. mkString( ",")}" )
}
Utils.setLogLevel(org.apache.log4j.Level. toLevel( logLevel))
}
里面有设置日志级别的
~~~9、~~~
// since we can‘t set env vars directly in sbt.
for { ( envKey, propKey) <- Seq(( "SPARK_TESTING", "spark.testing" ))
value <- Option(System.getenv (envKey )).orElse (Option(System.getProperty( propKey)))} {
executorEnvs( envKey) = value
}
完全是为了看Option是啥,
def apply[ A](x : A): Option[ A] = if (x == null) None else Some(x)
~~~10、~~~
Some( Utils.getThreadDump ())
看到个Some
/** Class `Some[A]` represents existing values of type
* `A`.
*
* @author Martin Odersky
* @version 1.0, 16/07/2003
*/
@SerialVersionUID( 1234815782226070388L) // value computed by serialver for 2.11.2, annotation added in 2.11.4
final case class Some[+ A]( x: A) extends Option[A ] {
def isEmpty = false
def get = x
}
看这注释,难道是JAVA里面的空指针模式???
SparkContext先看这么多吧,里面原理什么的估计得后面课程看,再看下RDD源码
***********RDD *************
~~~1、~~~
abstract class RDD[T: ClassTag](
@transient private var _sc : SparkContext,
@transient private var deps : Seq[Dependency[_]]
) extends Serializable with Logging {
抽象类,参数泛化
被transient关键字修饰的变量不再能被序列化,一个静态变量不管是否被transient修饰,均不能被序列化
~~~2、~~~
private val nextRddId = new AtomicInteger( 0)
在上面的stopped里面已经解释过了,它的递增方式
/** Register a new RDD, returning its RDD ID */
private[spark] def 大数据学习之Scala语言基本语法学习36