spark 01

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark 01相关的知识,希望对你有一定的参考价值。

  • debug environment:scala, abt, git bash, eclipse scala ide plugin
  • spark-shell:

  1spark-shell →spark-submit→(SparkSubmit)spark-class   

  2open jvm→thread dump→main:SparkSubmit.main→repl.Main→SparkILoop.process:(initializeSpark→createSparkContext)

  3SparkContext:(1Utils.getCallSite,  2markPartiallyConstructed(this,config.getBoolean("spark.driver.allowMultipleContexts", false)),  3)

  • SparkConf:()
  • SparkEnv:(1. BlockManager 2. MapOutputTracker 3. ShuffleFetcher 4. ConnectionManager)

 

  • Driver Programme

  1 SparkConf->SparkEnv

  2 TaskScheduler

  3 dagScheduler.start()

  4 ui.start()

 

 

 

  • job :

  1 SparkContext  create instance sc

  2 RDD  using sc new RDD

  3 apply transformation to RDD  get a different RDD

  4 action  action on RDD call sc.runJob

  5 sc.runJob→dagScheduler.runJob→submitJob  submitJob create an event JobSubmmitted sending to eventProcessActor

  6 eventProcessActor.processEvent

  7 submitStage  job splited into stages, get the dependencies, the final stage, submit the final stage and run 

  8  submitMissingTasks  

  9 TaskScheduler::submitTasks  assign tasks to workers

  10 TaskSchedulerImpl  create backend arroding to the mode, backend receive ReceiveOffers from TaskSchedulerImpl

  11 eceiveOffers→executor.launchTask→TaskRunner.run

 

  • transformation of RDD

  1hadoopRDD->MappedRDD  textFile, RDD.scala

  2->FlatMappedRDD  flatMap, RDD.scala

  3 ->MappedRDD  splittedText.map, RDD.scala

  4->PairRDDFunctions  reduceByKey  , in  /src/PairRDDFunctions.scala

  5->ShuffleRDD  

  6->MapPartitionsRDD

 

  • executor:

  1 executorMemory  conf.getOption().orElse().getorElse()

     taskScheduler  

     heartbeatReceiver

env.actorSystem.actorOf(
    Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")

     dagScheduler

  2 taskScheduler.start()

  3

  4

 

  • TaskScheduler:  implement scheduling FIFO & FAIR 

  1 SchedulerBackend  

extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) ,AppClientListener ,Logging

    1 AppClient

    2 maxCores

    3 start :(1driverUrl, 2args, 3extraJavaOpts, 4classPathEntries, 5libraryPathEntries)

    4 sparkJavaOpts, javaOpts, command, appDesc

    5 new AppClient

    6 client.start() , waitForRegistration()

!!: the conf of client including

1sc.env.actorSystem,masters,

2appName,maxCores,sc.executorMemory,sc.ui.appUIAddress, sc.eventLogger.map(_.logDir)

,command("org.apache.spark.executor.CoarseGrainedExecutorBackend", sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)

,args(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}").

  2 start()  backend.start(), spark.speculation

sc.env.actorSystem.scheduler.schedule(

 SPECULATION_INTERVAL milliseconds,SPECULATION_INTERVAL milliseconds) {checkSpeculatableTasks()}

  

  • client.AppClientListener:  trait

  1 connected to the cluster or temporary disconnection  

  2  temporary disconnection for master failure 

  3 unrecoverable Application failure

  4 add an Executor

  5 remove an Executor

 

  • AppClient   val ClientActor  register,reregister 

val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))

actor ! RegisterApplication(appDescription)

  1 RegisteredApplication  

  2 ApplicationRemoved  from m, terminate

  3 ExecutorAdded  

  4 ExecutorUpdated  wheather executor is complete

  5 MasterChanged  from new m

  6 StopAppClient  from AppClient::stop()

 

  •  case RegisterApplication

  1 logInfo

  2 val app

ApplicationInfo(now, newApplicationId(date), desc, date, driver, defaultCores), driver就是AppClient的actor

  3 registerApplication(app)  save app‘s conf to master 

  4 persistenceEngine.addApplication(app) 

  5 schedule()

 

  • schedule

  1 

 

 

→ call or next step  :definition or content  :()including  ()parameter or input  tab:explanation  &at the same time with  ?not sure  ??serious problem  

→(sth)sth is a condition or parameter     !!conclusion or attention

 

以上是关于spark 01的主要内容,如果未能解决你的问题,请参考以下文章

spark关于join后有重复列的问题(org.apache.spark.sql.AnalysisException: Reference '*' is ambiguous)(代码片段

Spark闭包与序列化

spark 例子wordcount topk

Spark:如何加速 foreachRDD?

19 01 11 javascript ?????????????????????(???????????????) ??????????????????????????????(代码片段

[linux][c/c++]代码片段01