SparkEnv

Posted 大冰的小屋

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkEnv相关的知识,希望对你有一定的参考价值。

SparkEnv是Spark的执行环境对象,其中包括与众多Executor执行相关的对象。在local模式下Driver会创建Executor,local-cluster部署模式或者Standalone部署模式下Worker另起的CoarseGrainedExecutorBackend进程中也会创建Executor,所以SparkEnv存在于Driver或者CoarseGrainedExecutorBackend进程中。创建SparkEnv主要使用SparkEnv的createDriverEnv方法,有四个参数:conf、isLocal、listenerBus 以及在本地模式下driver运行executor需要的numberCores。

  // 是否是本地模式
  def isLocal: Boolean = (master == "local" || master.startsWith("local["))
  // 采用监听器模式维护各类事件的处理
  // An asynchronous listener bus for Spark events
  private[spark] val listenerBus = new LiveListenerBus
  ...

  /**
   * 获取在本地模式下执行程序需要的cores个数,否则为0
   * The number of driver cores to use for execution in local mode, 0 otherwise.
   */
  private[spark] def numDriverCores(master: String): Int = 
    def convertToInt(threads: String): Int = 
      if (threads == "*") Runtime.getRuntime.availableProcessors() else threads.toInt
    
    master match 
      case "local" => 1
      case SparkMasterRegex.LOCAL_N_REGEX(threads) => convertToInt(threads)
      case SparkMasterRegex.LOCAL_N_FAILURES_REGEX(threads, _) => convertToInt(threads)
      case _ => 0 // driver is not used for execution
    
  

  ...
  // This function allows components created by SparkEnv to be mocked in unit tests:
  private[spark] def createSparkEnv(
      conf: SparkConf,
      isLocal: Boolean,
      listenerBus: LiveListenerBus): SparkEnv = 
    SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master))
  

SparkEnv的构造步骤如下:
1. 创建安全管理器SecurityManager;
2. 创建RpcEnv;
3. 创建基于Akka的分布式消息系统ActorSystem(注意:Spark 1.4.0之后已经废弃了
4. 创建Map任务输出跟踪器MapOutputTracker;
5. 创建ShuffleManager;
6. 内存管理器MemoryManager;
7. 创建块传输服务NettyBlockTransferService;
8. 创建BlockManagerMaster
9. 创建块管理器BlockManager
10. 创建广播管理器BroadcastManager;
11. 创建缓存管理器CacheManager;
12. 创建测量系统MetricsSystem;
13. 创建OutputCommitCoordinator;
14. 创建SparkEnv

1. 创建安全管理器SecurityManager

SecurityManager主要对权限、账号进行设置,如果使用Hadoop YARN作为集群管理器,则需要使用证书生成secret key登录,最后给当前系统设置默认的口令认证实例,此实例采用的是匿名内部类:

  private val secretKey = generateSecretKey()
  ...

  // 使用HTTP链接设置口令认证
  // Set our own authenticator to properly negotiate user/password for HTTP connections.
  // This is needed by the HTTP client fetching from the HttpServer. Put here so its
  // only set once.
  if (authOn) 
    Authenticator.setDefault(
     // 创建口令认证实例,复写PasswordAuthentication方法,获得用户名和密码
      new Authenticator() 
        override def getPasswordAuthentication(): PasswordAuthentication = 
          var passAuth: PasswordAuthentication = null
          val userInfo = getRequestingURL().getUserInfo()
          if (userInfo != null) 
            val  parts = userInfo.split(":", 2)
            passAuth = new PasswordAuthentication(parts(0), parts(1).toCharArray())
          
          return passAuth
        
      
    )
  

2. 创建RpcEnv

Spark1.6推出的RpcEnv、RpcEndPoint、RpcEndpointRef为核心的新型架构下的RPC通信方式,在底层封装了Akka和Netty,为未来扩充更多的通信系统提供了可能。RpcEnv是RPC的环境,所有的RpcEndpoint都需要注册到RpcEnv实例对象中,管理着这些注册的RpcEndpoint的生命周期:
- 根据name或者uri注册RpcEndpoint;
- 管理各种消息的处理;
- 停止RpcEndpoint
创建RpcEnv的代码:

    private[spark] val driverActorSystemName = "sparkDriver"
    private[spark] val executorActorSystemName = "sparkExecutor"
    val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
    // clientMode指定了是否是客户端模式,如果不是客户端模式,在NettyRpcEnvFactory创建RpcEnv时会启动服务。
    val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager,
      clientMode = !isDriver)

RpcEnv的伴生对象的代码中可以看到RpcEnv支持两种Rpc模式:akka和netty,默认使用的是netty。

 /**
 * A RpcEnv implementation must have a [[RpcEnvFactory]] implementation with an empty constructor
 * so that it can be created via Reflection.
 */
private[spark] object RpcEnv 

  private def getRpcEnvFactory(conf: SparkConf): RpcEnvFactory = 
    val rpcEnvNames = Map(
      "akka" -> "org.apache.spark.rpc.akka.AkkaRpcEnvFactory",
      "netty" -> "org.apache.spark.rpc.netty.NettyRpcEnvFactory")
    val rpcEnvName = conf.get("spark.rpc", "netty")
    val rpcEnvFactoryClassName = rpcEnvNames.getOrElse(rpcEnvName.toLowerCase, rpcEnvName)
    Utils.classForName(rpcEnvFactoryClassName).newInstance().asInstanceOf[RpcEnvFactory]
  

  def create(
      name: String,
      host: String,
      port: Int,
      conf: SparkConf,
      securityManager: SecurityManager,
      clientMode: Boolean = false): RpcEnv = 
    // Using Reflection to create the RpcEnv to avoid to depend on Akka directly
    val config = RpcEnvConfig(conf, name, host, port, securityManager, clientMode)
    getRpcEnvFactory(conf).create(config)
  

3. 创建基于Akka的分布式消息系统ActorSystem

在Spark1.4.0之后已经不再支持Actor system了。因为Actor system是基于Akka的,而现在可以被Netty替代了。

  // TODO Remove actorSystem
  @deprecated("Actor system is no longer supported as of 1.4.0", "1.4.0")
  val actorSystem: ActorSystem = _actorSystem

4. 创建Map任务输出跟踪器MapOutputTracker

MapOutputTrack 用于跟踪Map阶段任务的输出状态,此状态便于Reduce阶段任务获取地址及中间结果。每个Map任务或者Reduce任务都会有其唯一的标识,分别为mapId 和 reduceId。每个Reduce任务的输入可能是多个Map任务的输出,Reduce会到各个Map任务的所在节点上拉取Block,这一过程叫做Shuffle。每个Shuffle过程都有唯一的表示shuffleId。
MapOutputTracker 有两个子类:MapOutputTrackerMaster(for driver) 和 MapOutputTrackerWorker(for executors);因为它们使用了不同的HashMap来存储元数据。

MapOutputTrackerMaster使用的是TimeStampedHashMap来维护跟踪各个map任务的输出状态。

  /**
   * Timestamp based HashMap for storing mapStatuses and cached serialized statuses in the driver,
   * so that statuses are dropped only by explicit de-registering or by TTL-based cleaning (if set).
   * Other than these two scenarios, nothing should be dropped from this HashMap.
   */
  // key对应的是shuffleId,Array存储各个map任务对应的状态信息MapStatus.
  // MapStatus维护了map输出Block的地址BlockManagerId,所以reduce任务知道从何处获取map任务的中间输出。
  protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]()
  ...
  def registerShuffle(shuffleId: Int, numMaps: Int) 
    if (mapStatuses.put(shuffleId, new Array[MapStatus](numMaps)).isDefined) 
      throw new IllegalArgumentException("Shuffle ID " + shuffleId + " registered twice")
    
  

  def registerMapOutput(shuffleId: Int, mapId: Int, status: MapStatus) 
    val array = mapStatuses(shuffleId)
    array.synchronized 
      array(mapId) = status
    
  

  /** Register multiple map output information for the given shuffle */
  def registerMapOutputs(shuffleId: Int, statuses: Array[MapStatus], changeEpoch: Boolean = false) 
    mapStatuses.put(shuffleId, Array[MapStatus]() ++ statuses)
    if (changeEpoch) 
      incrementEpoch()
    
  

MapOutputTrackerMaster还使用了cachedSerializedStatus来维护序列化后的各个map任务的输出状态。

// key对应的是shuffleId,Array存储的各个序列化Mapstatus生成的字节数组
private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]()

MapOutputTrackerWorker使用了ConcurrentHashMap来存储从MapOutputTrackerMaster获取的mapOutput 信息

/**
 * MapOutputTracker for the executors, which fetches map output information from the driver's
 * MapOutputTrackerMaster.
 */
private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) 
  protected val mapStatuses: Map[Int, Array[MapStatus]] =
    new ConcurrentHashMap[Int, Array[MapStatus]]().asScala

Driver和Executor处理MapOutputTracker 的方式不同:

  • 如果当前应用程序是Driver,则创建MapOutputTracker ,然后创建MapOutputTrackerMasterEndpoint,并注册到RpcEnv中;
  • 如果当前应用是Executor,则创建MapOutputTrackerWorker,并从RpcUtils中查找到MapOutputTrackerMasterEndpoint的引用。
    def registerOrLookupEndpoint(
        name: String, endpointCreator: => RpcEndpoint):
      RpcEndpointRef = 
      if (isDriver) 
        logInfo("Registering " + name)
        rpcEnv.setupEndpoint(name, endpointCreator)   // 注册
       else 
        RpcUtils.makeDriverRef(name, conf, rpcEnv)      // 查找
      
    

    val mapOutputTracker = if (isDriver) 
      new MapOutputTrackerMaster(conf)
     else 
      new MapOutputTrackerWorker(conf)
    

    // Have to assign trackerActor after initialization as MapOutputTrackerActor
    // requires the MapOutputTracker itself
    mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
      new MapOutputTrackerMasterEndpoint(
        rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))

5. 创建ShuffleManager

ShuffleManager负责管理本地及远程的Block数据的shuffle操作。ShuffleManager默认通过反射方式生成的SortShuffleManager的实例。默认使用的是sort模式的SortShuffleManager,当然也可以通过修改属性spark.shuffle.manager为hash来显式控制使用HashShuffleManager。在Spark1.6增加了tungsten-sort,为了更好的实现shuffle。tungsten应该在spark2.0做的更加完善。

    // Let the user specify short names for shuffle managers
    val shortShuffleMgrNames = Map(
      "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
      "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
      "tungsten-sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
    val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
    val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
    val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

6. 内存管理器MemoryManager

在Spark1.6中,增加了一个新的内存管理模型:UnifiedMemoryManager。该模型可以使得execution部分和storage部分的内存不像之前的(StaticMemoryManager)由比例参数限定住,而是两者可以互相借用空闲的内存。完整的注释:

/**
 * A [[MemoryManager]] that enforces a soft boundary between execution and storage such that
 * either side can borrow memory from the other.
 *
 * The region shared between execution and storage is a fraction of (the total heap space - 300MB)
 * configurable through `spark.memory.fraction` (default 0.75). The position of the boundary
 * within this space is further determined by `spark.memory.storageFraction` (default 0.5).
 * This means the size of the storage region is 0.75 * 0.5 = 0.375 of the heap space by default.
 *
 * Storage can borrow as much execution memory as is free until execution reclaims its space.
 * When this happens, cached blocks will be evicted from memory until sufficient borrowed
 * memory is released to satisfy the execution memory request.
 *
 * Similarly, execution can borrow as much storage memory as is free. However, execution
 * memory is *never* evicted by storage due to the complexities involved in implementing this.
 * The implication is that attempts to cache blocks may fail if execution has already eaten
 * up most of the storage space, in which case the new blocks will be evicted immediately
 * according to their respective storage levels.
 */

MemoryManager 的选择是由spark.memory.useLegacyMode来控制的,默认是使用UnifiedMemoryManager来管理内存。

    val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
    val memoryManager: MemoryManager =
      if (useLegacyMemoryManager) 
        new StaticMemoryManager(conf, numUsableCores)
       else 
        UnifiedMemoryManager(conf, numUsableCores)
      

7. 创建块传输服务NettyBlockTransferService

在spark1.6中只保留了NettyBlockTransferService,已经没有了NioBlockTransferService。NettyBlockTransferService使用Netty提供的异步事件驱动的网络应用框架,提供web服务及客户端,获取远程节点上Block的集合。

val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores)

8. 创建BlockManagerMaster

BlockManagerMaster负责对BlockManager的管理和协调,具体操作依赖于BlockManagerMasterEndpoint。Drive和Executor处理BlockManagerMaster的方式不同:registerOrLookupEndpoint方法中

  • 如果当前应用程序是Driver,则创建BlockManagerMaster,并且注册到RpcEnv中;
  • 如果当前应用程序是Executor,则从RpcEnv中找到BlockManagerMaster的引用。
    val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
      BlockManagerMaster.DRIVER_ENDPOINT_NAME,
      new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
      conf, isDriver)

9. 创建块管理器BlockManager

BlockManager负责对Block的管理,只有在BlockManager的初始化方法initialize被调用后才是有效的。

   // NB: blockManager is not valid until initialize() is called later.
    val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
      serializer, conf, memoryManager, mapOutputTracker, shuffleManager,
      blockTransferService, securityManager, numUsableCores)

10. 创建广播管理器BroadcastManager

BroadcastManager用于将配置信息和序列化后的RDD、job以及ShuffleDependency等信息在本地存储。如果为了容灾,也会复制到其他节点上。

val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)

BroadcastManager 必须在其初始化方法initialize被调用后才能生效。


11. 创建缓存管理器CacheManager

CacheManager用于缓存RDD某个分区计算后的中间结果,缓存计算的结果发生在迭代计算的时候。

val cacheManager = new CacheManager(blockManager)

12. 创建测量系统MetricsSystem

MetricsSystem 是Spark的测量系统。

    val metricsSystem = if (isDriver) 
      // Don't start metrics system right now for Driver.
      // We need to wait for the task scheduler to give us an app ID.
      // Then we can start the metrics system.
      MetricsSystem.createMetricsSystem("driver", conf, securityManager)
     else 
      // We need to set the executor ID before the MetricsSystem is created because sources and
      // sinks specified in the metrics configuration file will want to incorporate this executor's
      // ID into the metrics they report.
      conf.set("spark.executor.id", executorId)
      val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
      ms.start()
      ms
    

13. 创建OutputCommitCoordinator

决定是否允许tasks将输出提交到HDFS上。Driver和Executor处理OutputCommitCoordinator的方式不同:

  • 如果当前应用程序是Driver,则创建OutputCommitCoordinator,并注册到RpcEnv中;
  • 如果当前应用是Executor,从RpcUtils中查找到MapOutputTrackerMasterEndpoint的引用,当请求提交输出时会转到Driver的OutputCommitCoordinator。
    val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse 
      new OutputCommitCoordinator(conf, isDriver)
    
    val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
      new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
    outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)

14. 创建SparkEnv

当所有的组件准备好之后,最终可以创建执行环境SparkEnv:

    val envInstance = new SparkEnv(
      executorId,
      rpcEnv,
      actorSystem,
      serializer,
      closureSerializer,
      cacheManager,
      mapOutputTracker,
      shuffleManager,
      broadcastManager,
      blockTransferService,
      blockManager,
      securityManager,
      sparkFilesDir,
      metricsSystem,
      memoryManager,
      outputCommitCoordinator,
      conf)

以上是关于SparkEnv的主要内容,如果未能解决你的问题,请参考以下文章

SparkEnv

Spark运行环境之SparkEnv和通信工具RpcEnv

Spark运行环境之SparkEnv和通信工具RpcEnv

2.1.4SparkEnv中创建BroadcastManager

spark源码之SparkContext

为什么在重复调用clock_gettime时会看到400x异常值时序?