Hadoop 三大调度器分析

Posted 秦时明月0515

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop 三大调度器分析相关的知识,希望对你有一定的参考价值。

如要转载,请注上作者和出处。  由于能力有限,如有错误,请大家指正。

须知: 我们下载的是hadoop-2.7.3-src 源码。 这个版本默认调度器是Capacity调度器。 在2.0.2-alpha版本的时候,有人汇报了一个fifo调度器的bug,社区把默认调度器从原来的fifo切换成capacity了。  参考   

  在Hadoop中,调度器是一个可插拔的模块,用户可以根据自己的实际应用要求设计调度器,然后在配置文件中指定相应的调度器,这样,当Hadoop集群启动时,便会加载该调度器。当前Hadoop自带了几种调度器,分别是FIFO(默认调度器),Capacity Scheduler和FairScheduler,通常境况下,这些调度器很难满足公司复杂的应用需求,因而往往需要开发自己的调度器。本文介绍了Hadoop调度器的基本编写方法,  参考1    

  Hadoop1 调度框架:Hadoop的调度器是在JobTracker中加载和调用的,用户可以在配置文件mapred-site.xml中的mapred.jobtracker.taskScheduler属性中指定调度器。本节分析了Hadoop调度器的调度框架,实际上分析了两个重要类:TaskScheduler和JobTracker的关系。

  Hadoop2 调度框架:Hadoop的调度器是在ResourceManager中加载和调用的,用户可以在配置文件yarn-site.xml中的yarn.resourcemanager.scheduler.class属性中指定调度器,默认是 org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler ; 还可以配置Fifo调度器,org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler ; 还可以配置Fair调度器, org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler 。 本节分析了Hadoop调度器的调度框架, 类比Hadoop1 , 三个调度器的共同扩展类是 AbstractYarnScheduler <T extends SchedulerApplicationAttempt, N extends SchedulerNode> , 它的功能类似Hadoop1的TaskScheduler ; 如果用户要编写自己的调度器,需要继承抽象类AbstractYarnScheduler

  MapReduce在Hadoop2中称为MR2或YARN,将JobTracker中的资源管理及任务生命周期管理(包括定时触发及监控),拆分成两个独立的服务,用于管理全部资源的ResourceManager以及管理每个应用的ApplicationMaster,ResourceManager用于管理向应用程序分配计算资源,每个ApplicationMaster用于管理应用程序、调度以及协调。一个应用程序可以是经典的MapReduce架构中的一个单独的任务,也可以是这些任务的一个DAG(有向无环图)任务。ResourceManager及每台机上的NodeManager服务,用于管理那台机的用户进程,形成计算架构。每个应用程序的ApplicationMaster实际上是一个框架具体库,并负责从ResourceManager中协调资源及与NodeManager(s)协作执行并监控任务。  参考2

  针对Hadoop 1.0中的MapReduce在扩展性和多框架支持等方面的不足,它将JobTracker中的资源管理和作业控制功能分开,分别由组件ResourceManager和ApplicationMaster实现,其中,ResourceManager负责所有应用程序的资源分配,而ApplicationMaster仅负责管理一个应用程序,进而诞生了全新的通用资源管理框架YARN。基于YARN,用户可以运行各种类型的应用程序(不再像1.0那样仅局限于MapReduce一类应用),从离线计算的MapReduce到在线计算(流式处理)的Storm等。Hadoop 2.0对应Hadoop版本为Apache Hadoop 0.23.x、2.x和CDH4。

架构图:

 

其中ResourceManager包含两个主要的组件:定时调用器(Scheduler)以及应用管理器(ApplicationManager)。

定时调用器(Scheduler): 定时调度器负责向应用程序分配置资源,它不做监控以及应用程序的状 态跟踪,并且它不保证会重启由于应用程序本身或硬件出错而执行失败 的应用程序。

应用管理器(ApplicationManager): 应用程序管理器负责接收新任务,协调并提供在ApplicationMaster容 器失败时的重启功能。

节点管理器(NodeManager): NodeManager是ResourceManager在每台机器的上代理,负责容器的管 理,并监控他们的资源使用情况(cpu,内存,磁盘及网络等),以及向 ResourceManager/Scheduler提供这些资源使用报告。

应用总管(ApplicationMaster): 每个应用程序的ApplicationMaster负责从Scheduler申请资源,以及 跟踪这些资源的使用情况以及任务进度的监控。

1  调度器

我们先想分析调度器,首先要分析它的父类,以及父类的父类和实现接口,如 AbstractService, YarnScheduler, ResourceScheduler 以及 AbstractYarnScheduler, 如下所示:

AbstractService.java 在 hadoop-2.7.3-src/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/AbstractService.java 

package org.apache.hadoop.service;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;

import com.google.common.annotations.VisibleForTesting;

/**
 * This is the base implementation class for services.
 */
//这是服务的基本实现类。
@Public
@Evolving
public abstract class AbstractService implements Service {

  private static final Log LOG = LogFactory.getLog(AbstractService.class);

  /**
   * Service name.
   */
  //服务名称
  private final String name;

  /** service state */
  //服务状态
  private final ServiceStateModel stateModel;

  /**
   * Service start time. Will be zero until the service is started.
   */
  //服务开始时间。在服务开始之前为0。
  private long startTime;

  /**
   * The configuration. Will be null until the service is initialized.
   */
  //配置。在服务初始化之前为null。
  private volatile Configuration config;

  /**
   * List of state change listeners; it is final to ensure
   * that it will never be null.
   */
  //状态更改侦听器列表;最终确保它不为null。
  private final ServiceOperations.ServiceListeners listeners
    = new ServiceOperations.ServiceListeners();
  /**
   * Static listeners to all events across all services
   */
  //所有服务的所有事件的静态监听器
  private static ServiceOperations.ServiceListeners globalListeners
    = new ServiceOperations.ServiceListeners();

  /**
   * The cause of any failure -will be null.
   * if a service did not stop due to a failure.
   */
  //任何失败的原因 - 是因为null。 如果服务没有因为故障停止。
  private Exception failureCause;

  /**
   * the state in which the service was when it failed.
   * Only valid when the service is stopped due to a failure
   */
  //服务失败时的状态。仅当服务由于失败而停止时才有效。
  private STATE failureState = null;

  /**
   * object used to co-ordinate {@link #waitForServiceToStop(long)}
   * across threads.
   */
  //对象用于协调 {@link #waitForServiceToStop(long)} 跨线程。
  private final AtomicBoolean terminationNotification =
    new AtomicBoolean(false);

  /**
   * History of lifecycle transitions
   */
  //生命周期转换的历史
  private final List<LifecycleEvent> lifecycleHistory
    = new ArrayList<LifecycleEvent>(5);

  /**
   * Map of blocking dependencies
   */
  //阻止依赖关系的映射
  private final Map<String,String> blockerMap = new HashMap<String, String>();

  private final Object stateChangeLock = new Object();
 
  /**
   * Construct the service.
   * @param name service name
   */
  //构造服务
  public AbstractService(String name) {
    this.name = name;
    stateModel = new ServiceStateModel(name);
  }

  /*
   * 获取当前的服务状态。
   * 返回:服务的状态
   */
  @Override
  public final STATE getServiceState() {
    return stateModel.getState();
  }

  /*
   * 获取服务失败时引发的第一个异常。 如果为空,则不记录任何异常
   * 返回:在转换到停止状态期间日志记录的故障
   */
  @Override
  public final synchronized Throwable getFailureCause() {
    return failureCause;
  }

  /*
   * 获取发生在{@link #getFailureCause()}中失败的状态。
   * 返回:状态,如果没有失败,则为null
   */
  @Override
  public synchronized STATE getFailureState() {
    return failureState;
  }

  /**
   * Set the configuration for this service.
   * This method is called during {@link #init(Configuration)}
   * and should only be needed if for some reason a service implementation
   * needs to override that initial setting -for example replacing
   * it with a new subclass of {@link Configuration}
   * @param conf new configuration.
   */
  /*
   * 设置此服务的配置。当{@link #init(Configuration)}时该方法会被调用并且
   * 只有在某些原因出现,服务实现需要覆盖该初始设置的情况下才需要这样做 - 例如
   * 用{@link Configuration}的新子类替换它。
   */
  protected void setConfig(Configuration conf) {
    this.config = conf;
  }

  /**
   * {@inheritDoc}
   * This invokes {@link #serviceInit}
   * @param conf the configuration of the service. This must not be null
   * @throws ServiceStateException if the configuration was null,
   * the state change not permitted, or something else went wrong
   */
  //这将调用{@link #serviceInit}
  //子类的serviceInit会初始化所需服务,会创建相应的服务类然后加入服务列表
  @Override
  public void init(Configuration conf) {
    //服务配置是否为空
    if (conf == null) {
      throw new ServiceStateException("Cannot initialize service "
                                      + getName() + ": null configuration");
    }
    //服务是否已经初始化
    if (isInState(STATE.INITED)) {
      return;
    }
    synchronized (stateChangeLock) {
      if (enterState(STATE.INITED) != STATE.INITED) {
        setConfig(conf);
        try {
          //服务初始化,会进入子类的同名函数
          serviceInit(config);
          if (isInState(STATE.INITED)) {
            //if the service ended up here during init,
            //notify the listeners
            notifyListeners();
          }
        } catch (Exception e) {
          noteFailure(e);
          ServiceOperations.stopQuietly(LOG, this);
          throw ServiceStateException.convert(e);
        }
      }
    }
  }

  /**
   * {@inheritDoc}
   * @throws ServiceStateException if the current service state does not permit
   * this action
   */
  //开始服务
  @Override
  public void start() {
    if (isInState(STATE.STARTED)) {
      return;
    }
    //enter the started state
    synchronized (stateChangeLock) {
      if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {
        try {
          startTime = System.currentTimeMillis();
          serviceStart();
          if (isInState(STATE.STARTED)) {
            //if the service started (and isn\'t now in a later state), notify
            if (LOG.isDebugEnabled()) {
              LOG.debug("Service " + getName() + " is started");
            }
            notifyListeners();
          }
        } catch (Exception e) {
          noteFailure(e);
          ServiceOperations.stopQuietly(LOG, this);
          throw ServiceStateException.convert(e);
        }
      }
    }
  }

  /**
   * {@inheritDoc}
   */
  //停止服务
  @Override
  public void stop() {
    if (isInState(STATE.STOPPED)) {
      return;
    }
    synchronized (stateChangeLock) {
      if (enterState(STATE.STOPPED) != STATE.STOPPED) {
        try {
          serviceStop();
        } catch (Exception e) {
          //stop-time exceptions are logged if they are the first one,
          noteFailure(e);
          throw ServiceStateException.convert(e);
        } finally {
          //report that the service has terminated
          terminationNotification.set(true);
          synchronized (terminationNotification) {
            terminationNotification.notifyAll();
          }
          //notify anything listening for events
          notifyListeners();
        }
      } else {
        //already stopped: note it
        if (LOG.isDebugEnabled()) {
          LOG.debug("Ignoring re-entrant call to stop()");
        }
      }
    }
  }

  /**
   * Relay to {@link #stop()}
   * @throws IOException
   */
  @Override
  public final void close() throws IOException {
    stop();
  }

  /**
   * Failure handling: record the exception
   * that triggered it -if there was not one already.
   * Services are free to call this themselves.
   * @param exception the exception
   */
  /*
   * 故障处理:记录触发它的异常 - 如果还没有一个。 服务可以自由调用。
   */
  protected final void noteFailure(Exception exception) {
    if (LOG.isDebugEnabled()) {
      LOG.debug("noteFailure " + exception, null);
    }
    if (exception == null) {
      //make sure failure logic doesn\'t itself cause problems
      return;
    }
    //record the failure details, and log it
    //记录故障细节,并记录日志
    synchronized (this) {
      if (failureCause == null) {
        failureCause = exception;
        failureState = getServiceState();
        LOG.info("Service " + getName()
                 + " failed in state " + failureState
                 + "; cause: " + exception,
                 exception);
      }
    }
  }

  /*
   * 阻止等待服务停止; 使用终止通知对象这样做。
   * 该方法只有在执行所有服务停止操作(成功或失败)之后才返回,或超时已过
   * 该方法可以在服务初始化或启动之前调用; 这是为了消除任何竞争条件,服务在此事件发生之前停止。
   */
  @Override
  public final boolean waitForServiceToStop(long timeout) {
    boolean completed = terminationNotification.get();
    while (!completed) {
      try {
        synchronized(terminationNotification) {
          terminationNotification.wait(timeout);
        }
        // here there has been a timeout, the object has terminated,
        // or there has been a spurious wakeup (which we ignore)
        //这里有一个超时,对象已经终止了,或者有一个虚假的唤醒(我们忽略)
        completed = true;
      } catch (InterruptedException e) {
        // interrupted; have another look at the flag
        completed = terminationNotification.get();
      }
    }
    return terminationNotification.get();
  }

  /* ===================================================================== */
  /* Override Points */
  /* ===================================================================== */

  /**
   * All initialization code needed by a service.
   *
   * This method will only ever be called once during the lifecycle of
   * a specific service instance.
   *
   * Implementations do not need to be synchronized as the logic
   * in {@link #init(Configuration)} prevents re-entrancy.
   *
   * The base implementation checks to see if the subclass has created
   * a new configuration instance, and if so, updates the base class value
   * @param conf configuration
   * @throws Exception on a failure -these will be caught,
   * possibly wrapped, and wil; trigger a service stop
   */
  /*
   * 服务所需的所有初始化代码。
   * 该方法只能在特定服务实例的生命周期中被调用一次。
   * 实现不需要同步机制,因为{@link #init(Configuration))中的逻辑可以防止重新进入。
   * 基本实现检查子类是否已创建新的配置实例,如果是,则更新基类值。
   */
  protected void serviceInit(Configuration conf) throws Exception {
    if (conf != config) {
      LOG.debug("Config has been overridden during init");
      setConfig(conf);
    }
  }

  /**
   * Actions called during the INITED to STARTED transition.
   *
   * This method will only ever be called once during the lifecycle of
   * a specific service instance.
   *
   * Implementations do not need to be synchronized as the logic
   * in {@link #start()} prevents re-entrancy.
   *
   * @throws Exception if needed -these will be caught,
   * wrapped, and trigger a service stop
   */
  /*
   * 在INITED到STARTED过渡期间所采取的行动。
   * 该方法只能在特定服务实例的生命周期中被调用一次。
   * 实现不需要同步机制,因为{@link #start()}中的逻辑可以防止重新进入。
   */
  protected void serviceStart() throws Exception {

  }

  /**
   * Actions called during the transition to the STOPPED state.
   *
   * This method will only ever be called once during the lifecycle of
   * a specific service instance.
   *
   * Implementations do not need to be synchronized as the logic
   * in {@link #stop()} prevents re-entrancy.
   *
   * Implementations MUST write this to be robust against failures, including
   * checks for null references -and for the first failure to not stop other
   * attempts to shut down parts of the service.
   *
   * @throws Exception if needed -these will be caught and logged.
   */
  /*
   * 在转换到STOPPED状态期间调用的动作。
   * 该方法只能在特定服务实例的生命周期中被调用一次。
   * 实现不需要同步机制,因为{@link #stop()}中的逻辑可以防止重入。
   * 实现MUST写入这个要健壮来避免失败, 包括对空引用的检查,以及第一个不能停止其他尝试关闭部分服务的失败。
   */
  protected void serviceStop() throws Exception {

  }

  /*
   * 将监听器注册到服务状态更改事件。
   * 如果提供的侦听器已经在监听此服务,则此方法是无效的。
   * 参数 l 表示:一个新的监听器
   */
  @Override
  public void registerServiceListener(ServiceStateChangeListener l) {
    listeners.add(l);
  }

  /*
   * 取消注册先前注册的服务状态更改事件的侦听器。 如果监听器已经被注销,则不用操作。
   * 参数 l 表示:要注销的监听器
   */
  @Override
  public void unregisterServiceListener(ServiceStateChangeListener l) {
    listeners.remove(l);
  }

  /**
   * Register a global listener, which receives notifications
   * from the state change events of all services in the JVM
   * @param l listener
   */
  //注册一个全局监听器,它从JVM中所有服务的状态更改事件接收通知
  public static void registerGlobalListener(ServiceStateChangeListener l) {
    globalListeners.add(l);
  }

  /**
   * unregister a global listener.
   * @param l listener to unregister
   * @return true if the listener was found (and then deleted)
   */
  //取消注册全局监听器。
  public static boolean unregisterGlobalListener(ServiceStateChangeListener l) {
    return globalListeners.remove(l);
  }

  /**
   * Package-scoped method for testing -resets the global listener list
   */
  //用于测试的程序包范围的方法 - 重新设置全局侦听器列表
  @VisibleForTesting
  static void resetGlobalListeners() {
    globalListeners.reset();
  }

  /*
   * 获取服务的名称。
   * 返回:服务的名称
   */
  @Override
  public String getName() {
    return name;
  }

  /*
   * 获取该服务的配置信息。
   * 这通常不是一个克隆,并且可能被操纵,尽管不能保证这种行为的后果可能如何
   * 返回:当前配置,除非具体实现选择。
   */
  @Override
  public synchronized Configuration getConfig() {
    return config;
  }

  /*
   * 获取服务的开始时间。
   * 返回:服务的开始时间。 如果服务尚未启动,则为零。
   */
  @Override
  public long getStartTime() {
    return startTime;
  }

  /**
   * Notify local and global listeners of state changes.
   * Exceptions raised by listeners are NOT passed up.
   */
  //通知本地和全局监听器的状态变化。监听器提出的异常情况不会被传递。
  private void notifyListeners() {
    try {
      listeners.notifyListeners(this);
      globalListeners.notifyListeners(this);
    } catch (Throwable e) {
      LOG.warn("Exception while notifying listeners of " + this + ": " + e,
               e);
    }
  }

  /**
   * Add a state change event to the lifecycle history
   */
  //将状态更改事件添加到生命周期历史记录
  private void recordLifecycleEvent() {
    LifecycleEvent event = new LifecycleEvent();
    event.time = System.currentTimeMillis();
    event.state = getServiceState();
    lifecycleHistory.add(event);
  }

  /*
   * 获取生命周期历史的快照; 它是一个静态列表
   * 返回:一个可能是empty的但从不是null的生命周期事件列表。
   */
  @Override
  public synchronized List<LifecycleEvent> getLifecycleHistory() {
    return new ArrayList<LifecycleEvent>(lifecycleHistory);
  }

  /**
   * Enter a state; record this via {@link #recordLifecycleEvent}
   * and log at the info level.
   * @param newState the proposed new state
   * @return the original state
   * it wasn\'t already in that state, and the state model permits state re-entrancy.
   */
  /*
   * 输入状态; 记录这个通过{@link #recordLifecycleEvent}并以信息级别记录在日志。
   * 参数 newState 表示 提出新的状态
   * 返回:原来的状态还没有在这个状态,状态模式允许状态重新进入。
   */
  private STATE enterState(STATE newState) {
    assert stateModel != null : "null state in " + name + " " + this.getClass();
    STATE oldState = stateModel.enterState(newState);
    if (oldState != newState) {
      if (LOG.isDebugEnabled()) {
        LOG.debug(
          "Service: " + getName() + " entered state " + getServiceState());
      }
      recordLifecycleEvent();
    }
    return oldState;
  }

  /*
   * 查询状态是否处于特定状态
   * 参数 表示提出新的状态
   */
  @Override
  public final boolean isInState(Service.STATE expected) {
    return stateModel.isInState(expected);
  }

  @Override
  public String toString() {
    return "Service " + name + " in state " + stateModel;
  }

  /**
   * Put a blocker to the blocker map -replacing any
   * with the same name.
   * @param name blocker name
   * @param details any specifics on the block. This must be non-null.
   */
  /*
   * 将拦截器放在拦截器map上 - 重新放置任何具有相同名称的。
   * 参数 name 表示:拦截器名称
   * 参数 details 表示:详细说明块上的细节。 这必须是非空。
   */
  protected void putBlocker(String name, String details) {
    synchronized (blockerMap) {
      blockerMap.put(name, details);
    }
  }

  /**
   * Remove a blocker from the blocker map -
   * this is a no-op if the blocker is not present
   * @param name the name of the blocker
   */
  /*
   * 从拦截器map中移除一个拦截器 - 如果拦截器不存在,这是空操作
   * 参数 name 表示:拦截器的名称
   */
  public void removeBlocker(String name) {
    synchronized (blockerMap) {
      blockerMap.remove(name);
    }
  }

  /*
   * 获取一个服务的拦截器 - 远程依赖关系,使服务不再是<i>live</i>。
   * 返回:一个拦截器名称-&gt的(快照)map;描述值
   */
  @Override
  public Map<String, String> getBlockers() {
    synchronized (blockerMap) {
      Map<String, String> map = 以上是关于Hadoop 三大调度器分析的主要内容,如果未能解决你的问题,请参考以下文章

必知Hadoop工作流引擎调度器--Azkaban与Oozie的区别。

Hadoop系列Hadoop三大核心之Yarn-资源调度初探

Hadoop生态圈中的调度组件-YARN

第三章 Goroutine调度策略(16)

Hadoop三大组件以及Hive的基础认知

Hadoop知识汇总