Yarn Active ResourceManager启动框架分析

Posted 有山先生

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Yarn Active ResourceManager启动框架分析相关的知识,希望对你有一定的参考价值。

1. 前言

通过第一篇文章Yarn Service设计思想知道,YARN为了处理繁多的功能,将每个功能封装成一个Service,通过实现CompositeService统一管理子服务的初始化/启动/停止流程。ResourceManager作为CompositeService的实现类,也遵循这套流程。更重要的是,在启动ResourceManager时,还会通过Zookeeper选举出Active ResourceManager。本篇文章以ResourceManager服务启动为基础,深入探究Active ResourceManager选举框架

2. ResourceManager启动哪些服务

对于ResourceManager提供的服务,分为两种:常驻服务和"活动"服务active service:

  • 常驻服务在两台ResourceManager上都提供。具体提供的服务如下:

  • Active Service只能在一台Active ResourceManager上提供。具体提供的服务如下:

对于ActiveServices的初始化/启动/管理,这里先写结论,具体的分析在后面:

3. 为什么不将ActiveServices服务作为ResourceManager子服务

由于ActiveServices只能在一台ResourceManager上启动,因此ActiveServices的start启动逻辑与ResourceManager的启动逻辑不同:ResourceManager无差别启动所有子服务,此时由于还不确定哪一台ResourceManager作为Active ResourceManager,因此此时如果ActiveServices加入ResourceManager子服务,那么两台ResourceManager上都会启动ActiveServices,这显然不符合ActiveServices服务的唯一性。

4. 何时启动ActiveServices服务

在两台ResourceManager中,为了选举出Active ResourceManager,从而提供唯一的ActiveServices服务。ResourceManager提供了常驻子服务EmbeddedElector。EmbeddedElector内部连接zookeeper,当启动EmbeddedElector时,互斥地争抢当前ResourceManager对应的锁,抢到锁后,当前ResourceManager状态切换成Active ResourceManager,并启动ActiveServices服务;否则当前ResourceManager状态切换成Standby ResourceManager,只维护常驻服务。同时,EmbeddedElector服务还向zookeeper注册watcher,一旦Active ResourceManager状态发生变化,watcher的回调函数会立即切换ResourceManager状态。

5. ResourceManager子服务初始化&启动流程

在针对Active ResourceManager启动方面,其重要的子服务初始化/启动流程如下:

  • EmbeddedElector选举服务初始化
  • ActiveServices初始化
  • EmbeddedElector选举服务启动
  • ActiveServices启动

对应的代码如下:

public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean 
public static void main(String argv[]) 
  //省略
  ResourceManager resourceManager = new ResourceManager();
  // 这里就是调用AbstractService.init,然后调用ResourceManager.serviceInit
  resourceManager.init(conf);
  // 和上面类似,调用ResourceManager.serviceStart
  resourceManager.start();
  //省略

5.1 ResourceManager初始化流程

ResourceManager#init初始化方法是继承自AbstractService#init方法,最终调用ResourceManager#serviceInit:

public abstract class AbstractService implements Service 
  public void init(Configuration conf) 
    if (conf == null) 
      throw new ServiceStateException("Cannot initialize service "
                                      + getName() + ": null configuration");
    
    if (isInState(STATE.INITED)) 
      return;
    
    synchronized (stateChangeLock) 
      if (enterState(STATE.INITED) != STATE.INITED) 
        setConfig(conf);
        try 
          serviceInit(config);
          if (isInState(STATE.INITED)) 
            //if the service ended up here during init,
            //notify the listeners
            notifyListeners();
          
         catch (Exception e) 
          noteFailure(e);
          ServiceOperations.stopQuietly(LOG, this);
          throw ServiceStateException.convert(e);
        
      
    
  

ResourceManager#serviceInit负责创建EmbeddedElector服务作为子服务,并调用ResourceManager#createAndInitActiveServices方法创建并初始化独立的ActiveServices服务:

public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean 
  protected void serviceInit(Configuration conf) throws Exception 
    //省略
    //注册常驻服务,例如AdminService
    adminService = createAdminService();
    addService(adminService);
    rmContext.setRMAdminService(adminService);
    //省略
    if (this.rmContext.isHAEnabled()) 
      // If the RM is configured to use an embedded leader elector,
      // initialize the leader elector.
      if (HAUtil.isAutomaticFailoverEnabled(conf)
          && HAUtil.isAutomaticFailoverEmbedded(conf)) 
        EmbeddedElector elector = createEmbeddedElector();
        addIfService(elector);
        rmContext.setLeaderElectorService(elector);
      
    
    //省略
    createAndInitActiveServices(false);
    //省略
  

5.2 创建选举服务EmbeddedElector

ResourceManager根据yarn.resourcemanager.ha.curator-leader-elector.enabled配置确定EmbeddedElector的具体实现类。如果为true,就确定具体实现类为CuratorBasedElectorService,该实现类基于curator框架,curator框架是zk客户端框架,它在zookeeper原生API接口上进行了包装。默认的实现类为ActiveStandbyElectorBasedElectorService,它基于原生zookeeper API接口:

public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean 
  protected EmbeddedElector createEmbeddedElector() throws IOException 
    EmbeddedElector elector;
    curatorEnabled =
        conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
            YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
    if (curatorEnabled) 
      this.zkManager = createAndStartZKManager(conf);
      elector = new CuratorBasedElectorService(this);
     else 
      elector = new ActiveStandbyElectorBasedElectorService(this);
    
    return elector;
  

5.3 ActiveStandbyElectorBasedElectorService选举服务初始化

本文章基于ActiveStandbyElectorBasedElectorService选举服务讨论Active ResourceManager选举流程。ActiveStandbyElectorBasedElectorService的初始化方法中,定义了zookeeper路径/yarn-leader-election/ActiveStandbyElectorLock,每台ResourceManager的ElectorService都会尝试在zookeeper中创建该临时路径。一旦路径创建成功,该ResourceManager最终会被选举成为Active ResourceManager。

最重要的是,ActiveStandbyElectorBasedElectorService初始化时,创建成员变量ActiveStandbyElector实例:

public class ActiveStandbyElectorBasedElectorService extends AbstractService implements EmbeddedElector, ActiveStandbyElector.ActiveStandbyElectorCallback 
  protected void serviceInit(Configuration conf) throws Exception 
    //省略
    String zkQuorum = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
    //省略
    String rmId = HAUtil.getRMHAId(conf);
    String clusterId = YarnConfiguration.getClusterId(conf);
    localActiveNodeInfo = createActiveNodeInfo(clusterId, rmId);

    String zkBasePath = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
        YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
    String electionZNode = zkBasePath + "/" + clusterId;

    zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS,
        YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);

    List<ACL> zkAcls = ZKCuratorManager.getZKAcls(conf);
    List<ZKUtil.ZKAuthInfo> zkAuths = ZKCuratorManager.getZKAuths(conf);
    //省略
    //创建选举对象
    elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout,
        electionZNode, zkAcls, zkAuths, this, maxRetryNum, false);

    elector.ensureParentZNode();
    //省略
    super.serviceInit(conf);
  

ActiveStandbyElector负责连接Zookeeper服务端,维持watcher,监听互斥锁/yarn-leader-election/ActiveStandbyElectorLock的状态,根据其状态切换ResourceManager的状态。

5.3.1 zookeeper连接

在ActiveStandbyElector初始化时,会创建与zookeeper的连接:

  public ActiveStandbyElector(String zookeeperHostPorts, int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl, List<ZKAuthInfo> authInfo, ActiveStandbyElectorCallback app, int maxRetryNum, boolean failFast) throws IOException, HadoopIllegalArgumentException, KeeperException 
    if (app == null || acl == null || parentZnodeName == null
        || zookeeperHostPorts == null || zookeeperSessionTimeout <= 0) 
      throw new HadoopIllegalArgumentException("Invalid argument");
    
    zkHostPort = zookeeperHostPorts;
    zkSessionTimeout = zookeeperSessionTimeout;
    zkAcl = acl;
    zkAuthInfo = authInfo;
    appClient = app;
    znodeWorkingDir = parentZnodeName;
    zkLockFilePath = znodeWorkingDir + "/" + LOCK_FILENAME;
    zkBreadCrumbPath = znodeWorkingDir + "/" + BREADCRUMB_FILENAME;
    this.maxRetryNum = maxRetryNum;

    // establish the ZK Connection for future API calls
    if (failFast) 
      createConnection();
     else 
      reEstablishSession();
    
  

reEstablishSession在createConnection方法外,包装了一层错误重试。这里直接看ActiveStandbyElector#createConnection方法:

  private void createConnection() throws IOException, KeeperException 
    //省略
    zkClient = connectToZooKeeper();
    //省略
  

ActiveStandbyElector#connectToZooKeeper负责创建Watcher对象,对zookeeper进行监听:

  protected synchronized ZooKeeper connectToZooKeeper() throws IOException, KeeperException 
    watcher = new WatcherWithClientRef();
    //把watcher注册到zookeeper中
    ZooKeeper zk = createZooKeeper();
    watcher.setZooKeeperRef(zk);
    //省略
    watcher.waitForZKConnectionEvent(zkSessionTimeout);
    //省略
    return zk;
  

WatcherWithClientRef#process方法负责处理zk事件,真实处理事件的是ActiveStandbyElector#processWatchEvent方法:

  private final class WatcherWithClientRef implements Watcher 
    private ZooKeeper zk;

    //只有收到zk服务端的返回的连接事件后,才允许处理其它事件
    private CountDownLatch hasReceivedEvent = new CountDownLatch(1);
    //只有等待watcher设置了zookeeper引用,才能处理事件
    private CountDownLatch hasSetZooKeeper = new CountDownLatch(1);
    //省略普通方法

    //process是watcher处理zk事件的方法
    @Override
    public void process(WatchedEvent event) 
      //省略
      ActiveStandbyElector.this.processWatchEvent(zk, event);
      //省略
    
  

5.3.2 zookeeper监听处理

ActiveStandbyElector#processWatchEvent负责处理监听事件,zk状态和事件类型对应关系如下:

根据zk状态和事件类型的不同,对ResourceManager状态的调整策略也不同。具体处理逻辑如下所示:

public class ActiveStandbyElector implements StatCallback, StringCallback 
  private ConnectionState zkConnectionState = ConnectionState.TERMINATED;
  synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) 
    Event.EventType eventType = event.getType();

    //处理连接状态下的事件
    if (eventType == Event.EventType.None) 
      // the connection state has changed
      switch (event.getState()) 
      case SyncConnected:
        LOG.info("Session connected.");
        ConnectionState prevConnectionState = zkConnectionState;
        zkConnectionState = ConnectionState.CONNECTED;
        if (prevConnectionState == ConnectionState.DISCONNECTED &&
            wantToBeInElection) 
          monitorActiveStatus();
        
        break;
      case Disconnected:
        LOG.info("Session disconnected. Entering neutral mode...");

        // ask the app to move to safe state because zookeeper connection
        // is not active and we dont know our state
        zkConnectionState = ConnectionState.DISCONNECTED;
        enterNeutralMode();
        break;
      case Expired:
        // the connection got terminated because of session timeout
        // call listener to reconnect
        LOG.info("Session expired. Entering neutral mode and rejoining...");
        enterNeutralMode();
        reJoinElection(0);
        break;
      case SaslAuthenticated:
        LOG.info("Successfully authenticated to ZooKeeper using SASL.");
        break;
      default:
        fatalError("Unexpected Zookeeper watch event state: "
            + event.getState());
        break;
      

      return;
    

    //监听节点发生修改
    String path = event.getPath();
    if (path != null) 
      switch (eventType) 
      case NodeDeleted:
        if (state == State.ACTIVE) 
          enterNeutralMode();
        
        joinElectionInternal();
        break;
      case NodeDataChanged:
        monitorActiveStatus();
        break;
      default:
        if (LOG.isDebugEnabled()) 
          LOG.debug("Unexpected node event: " + eventType + " for path: " + path);
        
        monitorActiveStatus();
      

      return;
    

    // some unexpected error has occurred
    fatalError("Unexpected watch error from Zookeeper");
  

对于ActiveStandbyElector处理zk事件的方法,无非就是ResourceManager进入active状态/standby状态/neutral状态。这里讨论一下它们的转换逻辑。

5.3.2.1 竞争active状态

检查是否存在节点,不存在就进入standby状态,并重新注册watcher:

public class ActiveStandbyElector implements StatCallback, StringCallback 
  private void monitorLockNodeAsync() 
    monitorLockNodePending = true;
    monitorLockNodeClient = zkClient;
    zkClient.exists(zkLockFilePath, watcher, this, zkClient);
  

ActiveStandbyElector重写了exists回调函数,会根据分布式锁的获取情况转换ResourceManager的主备状态:

public class ActiveStandbyElector implements StatCallback, StringCallback 
  public synchronized void processResult(int rc, String path, Object ctx, Stat stat) 
      //如果当前ResourceManager获取到了zk分布式锁,就进入activce状态,否则就进入standby状态
      if (stat.getEphemeralOwner() == zkClient.getSessionId()) 
        //进入active状态
        if (!becomeActive()) 
          reJoinElectionAfterFailureToBecomeActive();
        
       else 
        //进入standby状态
        becomeStandby();
      

    
    //节点不存在就进入中立状态,并尝试创建zk分布式锁
    if (isNodeDoesNotExist(code)) 
      enterNeutralMode();
      //尝试重新创建zk分布式锁
      joinElectionInternal();
      return;
    
  
5.3.2.2 中立状态处理

如果断连,就进入NEUTRAL状态:

public class ActiveStandbyElector implements StatCallback, StringCallback 
  private void enterNeutralMode() 
    if (state != State.NEUTRAL) 
      state = State.NEUTRAL;
      appClient.enterNeutralMode();
    
  

ActiveStandbyElector#enterNeutralMode调用appClient成员的enterNeutralMode方法。而appClient的实例类型其实就是ActiveStandbyElectorBasedElectorService,即调用ActiveStandbyElectorBasedElectorService#enterNeutralMode进入中立状态。中立状态下ResourceManager丢失与ZK的连接,尝试先进入standby状态:

public class ActiveStandbyElectorBasedElectorService extends AbstractService implements EmbeddedElector, ActiveStandbyElector.ActiveStandbyElectorCallback 
  public void enterNeutralMode() 
        //省略
        zkDisconnectTimer = new Timer("Zookeeper disconnect timer");
        zkDisconnectTimer.schedule(new TimerTask() 
          @Override
          public void run() 
            synchronized (zkDisconnectLock) 
                becomeStandby();
            
          
        , zkSessionTimeout);
      
      //省略
   
5.3.2.3 连接过期处理

如果过期,就重新尝试进入Active状态:

public class ActiveStandbyElector implements StatCallback, StringCallback 
  private void reJoinElection(int sleepTime) 
    sessionReestablishLockForTests.lock();
    try 
      terminateConnection();
      sleepFor(sleepTime);
      if (appData != null) 
        joinElectionInternal();
     finally 
      sessionReestablishLockForTests.unlock();
    
  

对于初次链接zookeeper场景。初始状态是ConnectionState.TERMINATED,当客户端与zookeeper服务端成功创建会话时,客户端收到zookeeper服务端返回的状态是SyncConnected,其对应的事件类型是Event.EventType.None。按照zookeeper事件处理方法processWatchEvent,此时直接break跳出switch分支。这表示,当客户端成功与服务端建立连接,客户端不需要进行任何处理。

5.4 Active Service初始化

在ResourceManager初始化时,会额外调用方法初始化ActiveServices。Active Service不属于ResourceManager的子服务,即ResourceManager的初始化/启动/停止流程与Active Service无关:

public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean 
    protected void serviceInit(Configuration conf) throws Exception 
        //省略
        createAndInitActiveServices(false);
        //省略
    

ResourceManager#createAndInitActiveServices调用activeServices的初始化逻辑:

public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean 
  protected void createAndInitActiveServices(boolean fromActive) 
    activeServices = new RMActiveServices(this);
    activeServices.fromActive = fromActive;
    activeServices.init(conf);
  

activeServices具体类型为RMActiveServices,其初始化过程就是创建子服务,并添加子服务:

public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean 
  public class RMActiveServices extends CompositeService 
    protected void serviceInit(Configuration configuration) throws Exception 
      standByTransitionRunnable = new StandByTransitionRunnable();
      //忽略
      xxxService = createXxxService();
      addService(xxxService);
      //忽略
      super.serviceInit(conf);
    
  

5.5 选举服务启动

创建ActiveStandbyElectorBasedElectorService对象后,所有Resourcemanager都创建了zkClient,与zkServer创建连接。启动EmbeddedElector的调用流如下:
ActiveStandbyElectorBasedElectorService#serviceStart -> ActiveStandbyElector#joinElection -> ActiveStandbyElector#joinElectionInternal -> ActiveStandbyElector#createLockNodeAsync。ActiveStandbyElector#createLockNodeAsync负责获取获取active ResourceManager的锁:

public class ActiveStandbyElector implements StatCallback, StringCallback 
  private void createLockNodeAsync() 
    zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL, this, zkClient);
  

createLockNodeAsync调用Zookeeper#create尝试获取分布式锁,以进入Active状态:

public class ZooKeeper 
    public void create(final String path, byte data[], List<ACL> acl, CreateMode createMode,  StringCallback cb, Object ctx)
    
        //省略
    

StringCallback是异步回调,表示当客户端向服务端发送创建节点的请求时,服务端异步返回响应消息给客户端后,客户端通过StringCallback#processResult处理该响应。

对于上述create方法,ActiveStandbyElector实现了Zookeeper提供的回调接口。当create方法执行完,异步执行ActiveStandbyElector#processResult方法:

public class ActiveStandbyElector implements StatCallback, StringCallback 
  public synchronized void processResult(int rc, String path, Object ctx,
      String name) 
    //省略
    if (isSuccess(code)) 
      // we successfully created the znode. we are the leader. start monitoring
      //尝试进入Active状态
      if (becomeActive()) 
        //验证
        monitorActiveStatus();
       else 
        //否则重新尝试创建zookeeper节点,以获得Active状态
        reJoinElectionAfterFailureToBecomeActive();
      
      return;
    
    //如果创建节点失败,但是节点已经存在,就进入standby状态
    if (isNodeExists(code)) 
      if (createRetryCount == 0) 
        becomeStandby();
      
      monitorActiveStatus();
      return;
    

    //如果创建节点失败,节点尚未存在,就重试
    if (shouldRetry(code)) 
      if (createRetryCount < maxRetryNum) 
        ++createRetryCount;
        createLockNodeAsync();
        return;
      
    //省略
     

  

5.6 Active Service 启动

正常情况下,调用ActiveStandbyElector#becomeActive方法使ResourceManager进入active状态:

public class ActiveStandbyElector implements StatCallback, StringCallback 
  private boolean becomeActive() 
      //省略
      appClient.becomeActive();
      //省略
  

appClient正是初始化ActiveStandbyElector对象时传入的ActiveStandbyElectorBasedElectorService实例:

public class ActiveStandbyElectorBasedElectorService extends AbstractService
    implements EmbeddedElector,
    ActiveStandbyElector.ActiveStandbyElectorCallback 
  public void becomeActive() throws ServiceFailedException 
    cancelDisconnectTimer();

    try 
      rm.getRMContext().getRMAdminService().transitionToActive(req);
     catch (Exception e) 
      throw new ServiceFailedException("RM could not transition to Active", e);
    
  

调用AdminService#transitionToActive使当前ResourceManager进入Active状态:

public class AdminService extends CompositeService implements HAServiceProtocol, ResourceManagerAdministrationProtocol 
  public synchronized void transitionToActive(
      HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException 
      //省略
      rm.transitionToActive();
      //省略
  

AdminService内部调用ResourceManager#startActiveServices方法使ResourceManager进入active状态:

public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean 
  synchronized void transitionToActive() throws Exception 
    if (rmContext.getHAServiceState() == HAServiceProtocol.HAServiceState.ACTIVE) 
      LOG.info("Already in active state");
      return;
    
    LOG.info("Transitioning to active state");

    this.rmLoginUGI.doAs(new PrivilegedExceptionAction<Void>() 
      @Override
      public Void run() throws Exception 
        try 
          startActiveServices();
          return null;
         catch (Exception e) 
          reinitialize(true);
          throw e;
        
      
    );

    rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.ACTIVE);
    LOG.info("Transitioned to active state");
  

ResourceManager#startActiveServices真正启动active services服务:

public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean 
  void startActiveServices() throws Exception 
    if (activeServices != null) 
      clusterTimeStamp = System.currentTimeMillis();
      activeServices.start();
    
  

5.7 切换standby状态

当选举失败时,ResourceManager会进入standby状态;如果此时ResourceManager已经处于active状态,会停止RMActiveServices服务:

public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean 
  synchronized void transitionToStandby(boolean initialize) throws Exception 
    if (rmContext.getHAServiceState() == HAServiceProtocol.HAServiceState.STANDBY) 
      LOG.info("Already in standby state");
      return;
    

    LOG.info("Transitioning to standby state");
    HAServiceState state = rmContext.getHAServiceState();
    rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY);
    if (state == HAServiceProtocol.HAServiceState.ACTIVE) 
      stopActiveServices();
      reinitialize(initialize);
    
    LOG.info("Transitioned to standby state");
  

6. 总结

RMActiveServices不属于ResourceManager的子服务,初始化/启动/停止流程都独立于ResourceManager子服务流程:

  • 启动流程由ActiveStandbyElectorBasedElectorService选举服务负责。
  • 状态切换流程由Zookeeper监听器服务实现。
  • 初始化/停止流程由ResourceManager的额外的方法调用实现。

以上是关于Yarn Active ResourceManager启动框架分析的主要内容,如果未能解决你的问题,请参考以下文章

Yarn Active ResourceManager启动框架分析

Hadoop运行模式群起集群配置workers启动集群启动HDFS拼接Web端查看HDFS的NameNodeWeb端查看YARN的ResourceManager

HDFS 和 YARN 的 HA 故障切换

HDFS&Yarn HA架构设计

使用Nginx+Lua代理Hadoop HA

如何使用 Active Directory 自动向 Hadoop 进行身份验证?