SparkEnv
Posted 大冰的小屋
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkEnv相关的知识,希望对你有一定的参考价值。
SparkEnv是Spark的执行环境对象,其中包括与众多Executor执行相关的对象。在local模式下Driver会创建Executor,local-cluster部署模式或者Standalone部署模式下Worker另起的CoarseGrainedExecutorBackend进程中也会创建Executor,所以SparkEnv存在于Driver或者CoarseGrainedExecutorBackend进程中。创建SparkEnv主要使用SparkEnv的createDriverEnv方法,有四个参数:conf、isLocal、listenerBus 以及在本地模式下driver运行executor需要的numberCores。
// 是否是本地模式
def isLocal: Boolean = (master == "local" || master.startsWith("local["))
// 采用监听器模式维护各类事件的处理
// An asynchronous listener bus for Spark events
private[spark] val listenerBus = new LiveListenerBus
...
/**
* 获取在本地模式下执行程序需要的cores个数,否则为0
* The number of driver cores to use for execution in local mode, 0 otherwise.
*/
private[spark] def numDriverCores(master: String): Int =
def convertToInt(threads: String): Int =
if (threads == "*") Runtime.getRuntime.availableProcessors() else threads.toInt
master match
case "local" => 1
case SparkMasterRegex.LOCAL_N_REGEX(threads) => convertToInt(threads)
case SparkMasterRegex.LOCAL_N_FAILURES_REGEX(threads, _) => convertToInt(threads)
case _ => 0 // driver is not used for execution
...
// This function allows components created by SparkEnv to be mocked in unit tests:
private[spark] def createSparkEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv =
SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master))
SparkEnv的构造步骤如下:
1. 创建安全管理器SecurityManager;
2. 创建RpcEnv;
3. 创建基于Akka的分布式消息系统ActorSystem(注意:Spark 1.4.0之后已经废弃了)
4. 创建Map任务输出跟踪器MapOutputTracker;
5. 创建ShuffleManager;
6. 内存管理器MemoryManager;
7. 创建块传输服务NettyBlockTransferService;
8. 创建BlockManagerMaster;
9. 创建块管理器BlockManager;
10. 创建广播管理器BroadcastManager;
11. 创建缓存管理器CacheManager;
12. 创建测量系统MetricsSystem;
13. 创建OutputCommitCoordinator;
14. 创建SparkEnv
1. 创建安全管理器SecurityManager
SecurityManager主要对权限、账号进行设置,如果使用Hadoop YARN作为集群管理器,则需要使用证书生成secret key登录,最后给当前系统设置默认的口令认证实例,此实例采用的是匿名内部类:
private val secretKey = generateSecretKey()
...
// 使用HTTP链接设置口令认证
// Set our own authenticator to properly negotiate user/password for HTTP connections.
// This is needed by the HTTP client fetching from the HttpServer. Put here so its
// only set once.
if (authOn)
Authenticator.setDefault(
// 创建口令认证实例,复写PasswordAuthentication方法,获得用户名和密码
new Authenticator()
override def getPasswordAuthentication(): PasswordAuthentication =
var passAuth: PasswordAuthentication = null
val userInfo = getRequestingURL().getUserInfo()
if (userInfo != null)
val parts = userInfo.split(":", 2)
passAuth = new PasswordAuthentication(parts(0), parts(1).toCharArray())
return passAuth
)
2. 创建RpcEnv
Spark1.6推出的RpcEnv、RpcEndPoint、RpcEndpointRef为核心的新型架构下的RPC通信方式,在底层封装了Akka和Netty,为未来扩充更多的通信系统提供了可能。RpcEnv是RPC的环境,所有的RpcEndpoint都需要注册到RpcEnv实例对象中,管理着这些注册的RpcEndpoint的生命周期:
- 根据name或者uri注册RpcEndpoint;
- 管理各种消息的处理;
- 停止RpcEndpoint
创建RpcEnv的代码:
private[spark] val driverActorSystemName = "sparkDriver"
private[spark] val executorActorSystemName = "sparkExecutor"
val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
// clientMode指定了是否是客户端模式,如果不是客户端模式,在NettyRpcEnvFactory创建RpcEnv时会启动服务。
val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager,
clientMode = !isDriver)
RpcEnv的伴生对象的代码中可以看到RpcEnv支持两种Rpc模式:akka和netty,默认使用的是netty。
/**
* A RpcEnv implementation must have a [[RpcEnvFactory]] implementation with an empty constructor
* so that it can be created via Reflection.
*/
private[spark] object RpcEnv
private def getRpcEnvFactory(conf: SparkConf): RpcEnvFactory =
val rpcEnvNames = Map(
"akka" -> "org.apache.spark.rpc.akka.AkkaRpcEnvFactory",
"netty" -> "org.apache.spark.rpc.netty.NettyRpcEnvFactory")
val rpcEnvName = conf.get("spark.rpc", "netty")
val rpcEnvFactoryClassName = rpcEnvNames.getOrElse(rpcEnvName.toLowerCase, rpcEnvName)
Utils.classForName(rpcEnvFactoryClassName).newInstance().asInstanceOf[RpcEnvFactory]
def create(
name: String,
host: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager,
clientMode: Boolean = false): RpcEnv =
// Using Reflection to create the RpcEnv to avoid to depend on Akka directly
val config = RpcEnvConfig(conf, name, host, port, securityManager, clientMode)
getRpcEnvFactory(conf).create(config)
3. 创建基于Akka的分布式消息系统ActorSystem
在Spark1.4.0之后已经不再支持Actor system了。因为Actor system是基于Akka的,而现在可以被Netty替代了。
// TODO Remove actorSystem
@deprecated("Actor system is no longer supported as of 1.4.0", "1.4.0")
val actorSystem: ActorSystem = _actorSystem
4. 创建Map任务输出跟踪器MapOutputTracker
MapOutputTrack 用于跟踪Map阶段任务的输出状态,此状态便于Reduce阶段任务获取地址及中间结果。每个Map任务或者Reduce任务都会有其唯一的标识,分别为mapId 和 reduceId。每个Reduce任务的输入可能是多个Map任务的输出,Reduce会到各个Map任务的所在节点上拉取Block,这一过程叫做Shuffle。每个Shuffle过程都有唯一的表示shuffleId。
MapOutputTracker 有两个子类:MapOutputTrackerMaster(for driver) 和 MapOutputTrackerWorker(for executors);因为它们使用了不同的HashMap来存储元数据。
MapOutputTrackerMaster使用的是TimeStampedHashMap来维护跟踪各个map任务的输出状态。
/**
* Timestamp based HashMap for storing mapStatuses and cached serialized statuses in the driver,
* so that statuses are dropped only by explicit de-registering or by TTL-based cleaning (if set).
* Other than these two scenarios, nothing should be dropped from this HashMap.
*/
// key对应的是shuffleId,Array存储各个map任务对应的状态信息MapStatus.
// MapStatus维护了map输出Block的地址BlockManagerId,所以reduce任务知道从何处获取map任务的中间输出。
protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]()
...
def registerShuffle(shuffleId: Int, numMaps: Int)
if (mapStatuses.put(shuffleId, new Array[MapStatus](numMaps)).isDefined)
throw new IllegalArgumentException("Shuffle ID " + shuffleId + " registered twice")
def registerMapOutput(shuffleId: Int, mapId: Int, status: MapStatus)
val array = mapStatuses(shuffleId)
array.synchronized
array(mapId) = status
/** Register multiple map output information for the given shuffle */
def registerMapOutputs(shuffleId: Int, statuses: Array[MapStatus], changeEpoch: Boolean = false)
mapStatuses.put(shuffleId, Array[MapStatus]() ++ statuses)
if (changeEpoch)
incrementEpoch()
MapOutputTrackerMaster还使用了cachedSerializedStatus来维护序列化后的各个map任务的输出状态。
// key对应的是shuffleId,Array存储的各个序列化Mapstatus生成的字节数组
private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]()
MapOutputTrackerWorker使用了ConcurrentHashMap来存储从MapOutputTrackerMaster获取的mapOutput 信息
/**
* MapOutputTracker for the executors, which fetches map output information from the driver's
* MapOutputTrackerMaster.
*/
private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf)
protected val mapStatuses: Map[Int, Array[MapStatus]] =
new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
Driver和Executor处理MapOutputTracker 的方式不同:
- 如果当前应用程序是Driver,则创建MapOutputTracker ,然后创建MapOutputTrackerMasterEndpoint,并注册到RpcEnv中;
- 如果当前应用是Executor,则创建MapOutputTrackerWorker,并从RpcUtils中查找到MapOutputTrackerMasterEndpoint的引用。
def registerOrLookupEndpoint(
name: String, endpointCreator: => RpcEndpoint):
RpcEndpointRef =
if (isDriver)
logInfo("Registering " + name)
rpcEnv.setupEndpoint(name, endpointCreator) // 注册
else
RpcUtils.makeDriverRef(name, conf, rpcEnv) // 查找
val mapOutputTracker = if (isDriver)
new MapOutputTrackerMaster(conf)
else
new MapOutputTrackerWorker(conf)
// Have to assign trackerActor after initialization as MapOutputTrackerActor
// requires the MapOutputTracker itself
mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
new MapOutputTrackerMasterEndpoint(
rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
5. 创建ShuffleManager
ShuffleManager负责管理本地及远程的Block数据的shuffle操作。ShuffleManager默认通过反射方式生成的SortShuffleManager的实例。默认使用的是sort模式的SortShuffleManager,当然也可以通过修改属性spark.shuffle.manager为hash来显式控制使用HashShuffleManager。在Spark1.6增加了tungsten-sort,为了更好的实现shuffle。tungsten应该在spark2.0做的更加完善。
// Let the user specify short names for shuffle managers
val shortShuffleMgrNames = Map(
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
"tungsten-sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
6. 内存管理器MemoryManager
在Spark1.6中,增加了一个新的内存管理模型:UnifiedMemoryManager。该模型可以使得execution部分和storage部分的内存不像之前的(StaticMemoryManager)由比例参数限定住,而是两者可以互相借用空闲的内存。完整的注释:
/**
* A [[MemoryManager]] that enforces a soft boundary between execution and storage such that
* either side can borrow memory from the other.
*
* The region shared between execution and storage is a fraction of (the total heap space - 300MB)
* configurable through `spark.memory.fraction` (default 0.75). The position of the boundary
* within this space is further determined by `spark.memory.storageFraction` (default 0.5).
* This means the size of the storage region is 0.75 * 0.5 = 0.375 of the heap space by default.
*
* Storage can borrow as much execution memory as is free until execution reclaims its space.
* When this happens, cached blocks will be evicted from memory until sufficient borrowed
* memory is released to satisfy the execution memory request.
*
* Similarly, execution can borrow as much storage memory as is free. However, execution
* memory is *never* evicted by storage due to the complexities involved in implementing this.
* The implication is that attempts to cache blocks may fail if execution has already eaten
* up most of the storage space, in which case the new blocks will be evicted immediately
* according to their respective storage levels.
*/
MemoryManager 的选择是由spark.memory.useLegacyMode来控制的,默认是使用UnifiedMemoryManager来管理内存。
val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
val memoryManager: MemoryManager =
if (useLegacyMemoryManager)
new StaticMemoryManager(conf, numUsableCores)
else
UnifiedMemoryManager(conf, numUsableCores)
7. 创建块传输服务NettyBlockTransferService
在spark1.6中只保留了NettyBlockTransferService,已经没有了NioBlockTransferService。NettyBlockTransferService使用Netty提供的异步事件驱动的网络应用框架,提供web服务及客户端,获取远程节点上Block的集合。
val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores)
8. 创建BlockManagerMaster
BlockManagerMaster负责对BlockManager的管理和协调,具体操作依赖于BlockManagerMasterEndpoint。Drive和Executor处理BlockManagerMaster的方式不同:registerOrLookupEndpoint方法中
- 如果当前应用程序是Driver,则创建BlockManagerMaster,并且注册到RpcEnv中;
- 如果当前应用程序是Executor,则从RpcEnv中找到BlockManagerMaster的引用。
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
conf, isDriver)
9. 创建块管理器BlockManager
BlockManager负责对Block的管理,只有在BlockManager的初始化方法initialize被调用后才是有效的。
// NB: blockManager is not valid until initialize() is called later.
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
serializer, conf, memoryManager, mapOutputTracker, shuffleManager,
blockTransferService, securityManager, numUsableCores)
10. 创建广播管理器BroadcastManager
BroadcastManager用于将配置信息和序列化后的RDD、job以及ShuffleDependency等信息在本地存储。如果为了容灾,也会复制到其他节点上。
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
BroadcastManager 必须在其初始化方法initialize被调用后才能生效。
11. 创建缓存管理器CacheManager
CacheManager用于缓存RDD某个分区计算后的中间结果,缓存计算的结果发生在迭代计算的时候。
val cacheManager = new CacheManager(blockManager)
12. 创建测量系统MetricsSystem
MetricsSystem 是Spark的测量系统。
val metricsSystem = if (isDriver)
// Don't start metrics system right now for Driver.
// We need to wait for the task scheduler to give us an app ID.
// Then we can start the metrics system.
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
else
// We need to set the executor ID before the MetricsSystem is created because sources and
// sinks specified in the metrics configuration file will want to incorporate this executor's
// ID into the metrics they report.
conf.set("spark.executor.id", executorId)
val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
ms.start()
ms
13. 创建OutputCommitCoordinator
决定是否允许tasks将输出提交到HDFS上。Driver和Executor处理OutputCommitCoordinator的方式不同:
- 如果当前应用程序是Driver,则创建OutputCommitCoordinator,并注册到RpcEnv中;
- 如果当前应用是Executor,从RpcUtils中查找到MapOutputTrackerMasterEndpoint的引用,当请求提交输出时会转到Driver的OutputCommitCoordinator。
val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse
new OutputCommitCoordinator(conf, isDriver)
val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)
14. 创建SparkEnv
当所有的组件准备好之后,最终可以创建执行环境SparkEnv:
val envInstance = new SparkEnv(
executorId,
rpcEnv,
actorSystem,
serializer,
closureSerializer,
cacheManager,
mapOutputTracker,
shuffleManager,
broadcastManager,
blockTransferService,
blockManager,
securityManager,
sparkFilesDir,
metricsSystem,
memoryManager,
outputCommitCoordinator,
conf)
以上是关于SparkEnv的主要内容,如果未能解决你的问题,请参考以下文章