Hadoop 三大调度器分析
Posted 秦时明月0515
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop 三大调度器分析相关的知识,希望对你有一定的参考价值。
如要转载,请注上作者和出处。 由于能力有限,如有错误,请大家指正。
须知: 我们下载的是hadoop-2.7.3-src 源码。 这个版本默认调度器是Capacity调度器。 在2.0.2-alpha版本的时候,有人汇报了一个fifo调度器的bug,社区把默认调度器从原来的fifo切换成capacity了。 参考
在Hadoop中,调度器是一个可插拔的模块,用户可以根据自己的实际应用要求设计调度器,然后在配置文件中指定相应的调度器,这样,当Hadoop集群启动时,便会加载该调度器。当前Hadoop自带了几种调度器,分别是FIFO(默认调度器),Capacity Scheduler和FairScheduler,通常境况下,这些调度器很难满足公司复杂的应用需求,因而往往需要开发自己的调度器。本文介绍了Hadoop调度器的基本编写方法, 参考1
Hadoop1 调度框架:Hadoop的调度器是在JobTracker中加载和调用的,用户可以在配置文件mapred-site.xml中的mapred.jobtracker.taskScheduler属性中指定调度器。本节分析了Hadoop调度器的调度框架,实际上分析了两个重要类:TaskScheduler和JobTracker的关系。
Hadoop2 调度框架:Hadoop的调度器是在ResourceManager中加载和调用的,用户可以在配置文件yarn-site.xml中的yarn.resourcemanager.scheduler.class属性中指定调度器,默认是 org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler ; 还可以配置Fifo调度器,org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler ; 还可以配置Fair调度器, org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler 。 本节分析了Hadoop调度器的调度框架, 类比Hadoop1 , 三个调度器的共同扩展类是 AbstractYarnScheduler <T extends SchedulerApplicationAttempt, N extends SchedulerNode> , 它的功能类似Hadoop1的TaskScheduler ; 如果用户要编写自己的调度器,需要继承抽象类AbstractYarnScheduler。
MapReduce在Hadoop2中称为MR2或YARN,将JobTracker中的资源管理及任务生命周期管理(包括定时触发及监控),拆分成两个独立的服务,用于管理全部资源的ResourceManager以及管理每个应用的ApplicationMaster,ResourceManager用于管理向应用程序分配计算资源,每个ApplicationMaster用于管理应用程序、调度以及协调。一个应用程序可以是经典的MapReduce架构中的一个单独的任务,也可以是这些任务的一个DAG(有向无环图)任务。ResourceManager及每台机上的NodeManager服务,用于管理那台机的用户进程,形成计算架构。每个应用程序的ApplicationMaster实际上是一个框架具体库,并负责从ResourceManager中协调资源及与NodeManager(s)协作执行并监控任务。 参考2
针对Hadoop 1.0中的MapReduce在扩展性和多框架支持等方面的不足,它将JobTracker中的资源管理和作业控制功能分开,分别由组件ResourceManager和ApplicationMaster实现,其中,ResourceManager负责所有应用程序的资源分配,而ApplicationMaster仅负责管理一个应用程序,进而诞生了全新的通用资源管理框架YARN。基于YARN,用户可以运行各种类型的应用程序(不再像1.0那样仅局限于MapReduce一类应用),从离线计算的MapReduce到在线计算(流式处理)的Storm等。Hadoop 2.0对应Hadoop版本为Apache Hadoop 0.23.x、2.x和CDH4。
架构图:
其中ResourceManager包含两个主要的组件:定时调用器(Scheduler)以及应用管理器(ApplicationManager)。
定时调用器(Scheduler): 定时调度器负责向应用程序分配置资源,它不做监控以及应用程序的状 态跟踪,并且它不保证会重启由于应用程序本身或硬件出错而执行失败 的应用程序。
应用管理器(ApplicationManager): 应用程序管理器负责接收新任务,协调并提供在ApplicationMaster容 器失败时的重启功能。
节点管理器(NodeManager): NodeManager是ResourceManager在每台机器的上代理,负责容器的管 理,并监控他们的资源使用情况(cpu,内存,磁盘及网络等),以及向 ResourceManager/Scheduler提供这些资源使用报告。
应用总管(ApplicationMaster): 每个应用程序的ApplicationMaster负责从Scheduler申请资源,以及 跟踪这些资源的使用情况以及任务进度的监控。
1 调度器
我们先想分析调度器,首先要分析它的父类,以及父类的父类和实现接口,如 AbstractService, YarnScheduler, ResourceScheduler 以及 AbstractYarnScheduler, 如下所示:
AbstractService.java 在 hadoop-2.7.3-src/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/AbstractService.java
package org.apache.hadoop.service; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configuration; import com.google.common.annotations.VisibleForTesting; /** * This is the base implementation class for services. */ //这是服务的基本实现类。 @Public @Evolving public abstract class AbstractService implements Service { private static final Log LOG = LogFactory.getLog(AbstractService.class); /** * Service name. */ //服务名称 private final String name; /** service state */ //服务状态 private final ServiceStateModel stateModel; /** * Service start time. Will be zero until the service is started. */ //服务开始时间。在服务开始之前为0。 private long startTime; /** * The configuration. Will be null until the service is initialized. */ //配置。在服务初始化之前为null。 private volatile Configuration config; /** * List of state change listeners; it is final to ensure * that it will never be null. */ //状态更改侦听器列表;最终确保它不为null。 private final ServiceOperations.ServiceListeners listeners = new ServiceOperations.ServiceListeners(); /** * Static listeners to all events across all services */ //所有服务的所有事件的静态监听器 private static ServiceOperations.ServiceListeners globalListeners = new ServiceOperations.ServiceListeners(); /** * The cause of any failure -will be null. * if a service did not stop due to a failure. */ //任何失败的原因 - 是因为null。 如果服务没有因为故障停止。 private Exception failureCause; /** * the state in which the service was when it failed. * Only valid when the service is stopped due to a failure */ //服务失败时的状态。仅当服务由于失败而停止时才有效。 private STATE failureState = null; /** * object used to co-ordinate {@link #waitForServiceToStop(long)} * across threads. */ //对象用于协调 {@link #waitForServiceToStop(long)} 跨线程。 private final AtomicBoolean terminationNotification = new AtomicBoolean(false); /** * History of lifecycle transitions */ //生命周期转换的历史 private final List<LifecycleEvent> lifecycleHistory = new ArrayList<LifecycleEvent>(5); /** * Map of blocking dependencies */ //阻止依赖关系的映射 private final Map<String,String> blockerMap = new HashMap<String, String>(); private final Object stateChangeLock = new Object(); /** * Construct the service. * @param name service name */ //构造服务 public AbstractService(String name) { this.name = name; stateModel = new ServiceStateModel(name); } /* * 获取当前的服务状态。 * 返回:服务的状态 */ @Override public final STATE getServiceState() { return stateModel.getState(); } /* * 获取服务失败时引发的第一个异常。 如果为空,则不记录任何异常 * 返回:在转换到停止状态期间日志记录的故障 */ @Override public final synchronized Throwable getFailureCause() { return failureCause; } /* * 获取发生在{@link #getFailureCause()}中失败的状态。 * 返回:状态,如果没有失败,则为null */ @Override public synchronized STATE getFailureState() { return failureState; } /** * Set the configuration for this service. * This method is called during {@link #init(Configuration)} * and should only be needed if for some reason a service implementation * needs to override that initial setting -for example replacing * it with a new subclass of {@link Configuration} * @param conf new configuration. */ /* * 设置此服务的配置。当{@link #init(Configuration)}时该方法会被调用并且 * 只有在某些原因出现,服务实现需要覆盖该初始设置的情况下才需要这样做 - 例如 * 用{@link Configuration}的新子类替换它。 */ protected void setConfig(Configuration conf) { this.config = conf; } /** * {@inheritDoc} * This invokes {@link #serviceInit} * @param conf the configuration of the service. This must not be null * @throws ServiceStateException if the configuration was null, * the state change not permitted, or something else went wrong */ //这将调用{@link #serviceInit} //子类的serviceInit会初始化所需服务,会创建相应的服务类然后加入服务列表 @Override public void init(Configuration conf) { //服务配置是否为空 if (conf == null) { throw new ServiceStateException("Cannot initialize service " + getName() + ": null configuration"); } //服务是否已经初始化 if (isInState(STATE.INITED)) { return; } synchronized (stateChangeLock) { if (enterState(STATE.INITED) != STATE.INITED) { setConfig(conf); try { //服务初始化,会进入子类的同名函数 serviceInit(config); if (isInState(STATE.INITED)) { //if the service ended up here during init, //notify the listeners notifyListeners(); } } catch (Exception e) { noteFailure(e); ServiceOperations.stopQuietly(LOG, this); throw ServiceStateException.convert(e); } } } } /** * {@inheritDoc} * @throws ServiceStateException if the current service state does not permit * this action */ //开始服务 @Override public void start() { if (isInState(STATE.STARTED)) { return; } //enter the started state synchronized (stateChangeLock) { if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) { try { startTime = System.currentTimeMillis(); serviceStart(); if (isInState(STATE.STARTED)) { //if the service started (and isn\'t now in a later state), notify if (LOG.isDebugEnabled()) { LOG.debug("Service " + getName() + " is started"); } notifyListeners(); } } catch (Exception e) { noteFailure(e); ServiceOperations.stopQuietly(LOG, this); throw ServiceStateException.convert(e); } } } } /** * {@inheritDoc} */ //停止服务 @Override public void stop() { if (isInState(STATE.STOPPED)) { return; } synchronized (stateChangeLock) { if (enterState(STATE.STOPPED) != STATE.STOPPED) { try { serviceStop(); } catch (Exception e) { //stop-time exceptions are logged if they are the first one, noteFailure(e); throw ServiceStateException.convert(e); } finally { //report that the service has terminated terminationNotification.set(true); synchronized (terminationNotification) { terminationNotification.notifyAll(); } //notify anything listening for events notifyListeners(); } } else { //already stopped: note it if (LOG.isDebugEnabled()) { LOG.debug("Ignoring re-entrant call to stop()"); } } } } /** * Relay to {@link #stop()} * @throws IOException */ @Override public final void close() throws IOException { stop(); } /** * Failure handling: record the exception * that triggered it -if there was not one already. * Services are free to call this themselves. * @param exception the exception */ /* * 故障处理:记录触发它的异常 - 如果还没有一个。 服务可以自由调用。 */ protected final void noteFailure(Exception exception) { if (LOG.isDebugEnabled()) { LOG.debug("noteFailure " + exception, null); } if (exception == null) { //make sure failure logic doesn\'t itself cause problems return; } //record the failure details, and log it //记录故障细节,并记录日志 synchronized (this) { if (failureCause == null) { failureCause = exception; failureState = getServiceState(); LOG.info("Service " + getName() + " failed in state " + failureState + "; cause: " + exception, exception); } } } /* * 阻止等待服务停止; 使用终止通知对象这样做。 * 该方法只有在执行所有服务停止操作(成功或失败)之后才返回,或超时已过 * 该方法可以在服务初始化或启动之前调用; 这是为了消除任何竞争条件,服务在此事件发生之前停止。 */ @Override public final boolean waitForServiceToStop(long timeout) { boolean completed = terminationNotification.get(); while (!completed) { try { synchronized(terminationNotification) { terminationNotification.wait(timeout); } // here there has been a timeout, the object has terminated, // or there has been a spurious wakeup (which we ignore) //这里有一个超时,对象已经终止了,或者有一个虚假的唤醒(我们忽略) completed = true; } catch (InterruptedException e) { // interrupted; have another look at the flag completed = terminationNotification.get(); } } return terminationNotification.get(); } /* ===================================================================== */ /* Override Points */ /* ===================================================================== */ /** * All initialization code needed by a service. * * This method will only ever be called once during the lifecycle of * a specific service instance. * * Implementations do not need to be synchronized as the logic * in {@link #init(Configuration)} prevents re-entrancy. * * The base implementation checks to see if the subclass has created * a new configuration instance, and if so, updates the base class value * @param conf configuration * @throws Exception on a failure -these will be caught, * possibly wrapped, and wil; trigger a service stop */ /* * 服务所需的所有初始化代码。 * 该方法只能在特定服务实例的生命周期中被调用一次。 * 实现不需要同步机制,因为{@link #init(Configuration))中的逻辑可以防止重新进入。 * 基本实现检查子类是否已创建新的配置实例,如果是,则更新基类值。 */ protected void serviceInit(Configuration conf) throws Exception { if (conf != config) { LOG.debug("Config has been overridden during init"); setConfig(conf); } } /** * Actions called during the INITED to STARTED transition. * * This method will only ever be called once during the lifecycle of * a specific service instance. * * Implementations do not need to be synchronized as the logic * in {@link #start()} prevents re-entrancy. * * @throws Exception if needed -these will be caught, * wrapped, and trigger a service stop */ /* * 在INITED到STARTED过渡期间所采取的行动。 * 该方法只能在特定服务实例的生命周期中被调用一次。 * 实现不需要同步机制,因为{@link #start()}中的逻辑可以防止重新进入。 */ protected void serviceStart() throws Exception { } /** * Actions called during the transition to the STOPPED state. * * This method will only ever be called once during the lifecycle of * a specific service instance. * * Implementations do not need to be synchronized as the logic * in {@link #stop()} prevents re-entrancy. * * Implementations MUST write this to be robust against failures, including * checks for null references -and for the first failure to not stop other * attempts to shut down parts of the service. * * @throws Exception if needed -these will be caught and logged. */ /* * 在转换到STOPPED状态期间调用的动作。 * 该方法只能在特定服务实例的生命周期中被调用一次。 * 实现不需要同步机制,因为{@link #stop()}中的逻辑可以防止重入。 * 实现MUST写入这个要健壮来避免失败, 包括对空引用的检查,以及第一个不能停止其他尝试关闭部分服务的失败。 */ protected void serviceStop() throws Exception { } /* * 将监听器注册到服务状态更改事件。 * 如果提供的侦听器已经在监听此服务,则此方法是无效的。 * 参数 l 表示:一个新的监听器 */ @Override public void registerServiceListener(ServiceStateChangeListener l) { listeners.add(l); } /* * 取消注册先前注册的服务状态更改事件的侦听器。 如果监听器已经被注销,则不用操作。 * 参数 l 表示:要注销的监听器 */ @Override public void unregisterServiceListener(ServiceStateChangeListener l) { listeners.remove(l); } /** * Register a global listener, which receives notifications * from the state change events of all services in the JVM * @param l listener */ //注册一个全局监听器,它从JVM中所有服务的状态更改事件接收通知 public static void registerGlobalListener(ServiceStateChangeListener l) { globalListeners.add(l); } /** * unregister a global listener. * @param l listener to unregister * @return true if the listener was found (and then deleted) */ //取消注册全局监听器。 public static boolean unregisterGlobalListener(ServiceStateChangeListener l) { return globalListeners.remove(l); } /** * Package-scoped method for testing -resets the global listener list */ //用于测试的程序包范围的方法 - 重新设置全局侦听器列表 @VisibleForTesting static void resetGlobalListeners() { globalListeners.reset(); } /* * 获取服务的名称。 * 返回:服务的名称 */ @Override public String getName() { return name; } /* * 获取该服务的配置信息。 * 这通常不是一个克隆,并且可能被操纵,尽管不能保证这种行为的后果可能如何 * 返回:当前配置,除非具体实现选择。 */ @Override public synchronized Configuration getConfig() { return config; } /* * 获取服务的开始时间。 * 返回:服务的开始时间。 如果服务尚未启动,则为零。 */ @Override public long getStartTime() { return startTime; } /** * Notify local and global listeners of state changes. * Exceptions raised by listeners are NOT passed up. */ //通知本地和全局监听器的状态变化。监听器提出的异常情况不会被传递。 private void notifyListeners() { try { listeners.notifyListeners(this); globalListeners.notifyListeners(this); } catch (Throwable e) { LOG.warn("Exception while notifying listeners of " + this + ": " + e, e); } } /** * Add a state change event to the lifecycle history */ //将状态更改事件添加到生命周期历史记录 private void recordLifecycleEvent() { LifecycleEvent event = new LifecycleEvent(); event.time = System.currentTimeMillis(); event.state = getServiceState(); lifecycleHistory.add(event); } /* * 获取生命周期历史的快照; 它是一个静态列表 * 返回:一个可能是empty的但从不是null的生命周期事件列表。 */ @Override public synchronized List<LifecycleEvent> getLifecycleHistory() { return new ArrayList<LifecycleEvent>(lifecycleHistory); } /** * Enter a state; record this via {@link #recordLifecycleEvent} * and log at the info level. * @param newState the proposed new state * @return the original state * it wasn\'t already in that state, and the state model permits state re-entrancy. */ /* * 输入状态; 记录这个通过{@link #recordLifecycleEvent}并以信息级别记录在日志。 * 参数 newState 表示 提出新的状态 * 返回:原来的状态还没有在这个状态,状态模式允许状态重新进入。 */ private STATE enterState(STATE newState) { assert stateModel != null : "null state in " + name + " " + this.getClass(); STATE oldState = stateModel.enterState(newState); if (oldState != newState) { if (LOG.isDebugEnabled()) { LOG.debug( "Service: " + getName() + " entered state " + getServiceState()); } recordLifecycleEvent(); } return oldState; } /* * 查询状态是否处于特定状态 * 参数 表示提出新的状态 */ @Override public final boolean isInState(Service.STATE expected) { return stateModel.isInState(expected); } @Override public String toString() { return "Service " + name + " in state " + stateModel; } /** * Put a blocker to the blocker map -replacing any * with the same name. * @param name blocker name * @param details any specifics on the block. This must be non-null. */ /* * 将拦截器放在拦截器map上 - 重新放置任何具有相同名称的。 * 参数 name 表示:拦截器名称 * 参数 details 表示:详细说明块上的细节。 这必须是非空。 */ protected void putBlocker(String name, String details) { synchronized (blockerMap) { blockerMap.put(name, details); } } /** * Remove a blocker from the blocker map - * this is a no-op if the blocker is not present * @param name the name of the blocker */ /* * 从拦截器map中移除一个拦截器 - 如果拦截器不存在,这是空操作 * 参数 name 表示:拦截器的名称 */ public void removeBlocker(String name) { synchronized (blockerMap) { blockerMap.remove(name); } } /* * 获取一个服务的拦截器 - 远程依赖关系,使服务不再是<i>live</i>。 * 返回:一个拦截器名称-&gt的(快照)map;描述值 */ @Override public Map<String, String> getBlockers() { synchronized (blockerMap) { Map<String, String> map = 以上是关于Hadoop 三大调度器分析的主要内容,如果未能解决你的问题,请参考以下文章必知Hadoop工作流引擎调度器--Azkaban与Oozie的区别。