Yarn状态机框架分析

Posted 有山先生

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Yarn状态机框架分析相关的知识,希望对你有一定的参考价值。

1. 前言

上一篇文章介绍了Yarn事件驱动模型框架分析,了解到Yarn基于生产者-消费者模式处理事件。基于GenericEventHandler#handle生产事件;通过自定义的Handler实现类消费事件。其中,在消费事件时,会导致Yarn中对象状态的变化,将对象所有状态的变化情况汇总起来就是状态机。本文将介绍Yarn状态机框架实现思路。

2. 什么是状态机

状态机(State Machine),是有限状态自动机的简称,是现实事物运行规则抽象而成的一个数学模型:给定一个状态机,同时给定它的当前状态以及输入,那么输出状态时可以明确的运算出来的。例如对于自动门,给定初始状态closed,给定输入“开门”,那么下一个状态时可以运算出来的。自动门状态机基于状态转换图如下所示:

在Yarn中,状态机中最重要的两大概念是State状态和Event事件。例如,对于一个应用RMApp而言,RMApp存在一个初始状态,处理事件时,会根据事件类型匹配对应的转换类Transition,将RMApp从初始状态转化成目标状态。RMApp经历的流程为:初始状态-->转换方法-->目标状态,将其所有流程汇总起来,就是状态机。

在Yarn中,App、AppAttempt、Container、Node都可以使用状态机表示。其中,RMApp:用于维护一个Application的生命周期;RMAppAttempt:用于维护一次尝试运行的生命周期;RMContainer:用于维护一个已分配的资源最小单位Container的生命周期;RMNode:用于维护一个NodeManager的生命周期。

3. 为什么要设计状态机

对于任意实体,与其相关的事件可能非常多,事件对应的类型和初始状态多种多样。如果不合理的组织起来,实体的状态转换流程会负责冗杂。Yarn状态机就负责合理地组织这些状态转换流程,快速找到指定初始状态和事件类型对应的状态转换方法。

4. 何时使用状态机

在Handler消费事件队列中的事件时,会使用状态机,更新其对象的状态。状态机的使用流程分为两步:

  1. 第一步:Service注册Handler。
  2. 第二步:Handler使用状态机。

4.1 Service注册Handler

以RMApp的状态机为例,Active ResourceManager服务会管理RMApp对象的生命周期,RMApp状态机负责管理RMApp的状态变化。在Active ResourceManager初始化方法ResourceManager$RMActiveServices#serviceInit中,将RMAppEventType类型的事件注册了Handler实现类ApplicationEventDispatcher,ApplicationEventDispatcher负责处理RMAppEventType类型事件:

public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean 
  public class RMActiveServices extends CompositeService 
    protected void serviceInit(Configuration configuration) throws Exception 
      //省略
      rmDispatcher.register(RMAppEventType.class, new ApplicationEventDispatcher(rmContext));
      //省略
    
  

4.2 Handler使用状态机

ApplicationEventDispatcher实现了EventHandler接口,其handler实现方法并没有真正地处理RMAppEvent事件,而是由RMApp处理:

  public static final class ApplicationEventDispatcher implements EventHandler<RMAppEvent> 

    private final RMContext rmContext;

    public ApplicationEventDispatcher(RMContext rmContext) 
      this.rmContext = rmContext;
    

    @Override
    public void handle(RMAppEvent event) 
      ApplicationId appID = event.getApplicationId();
      //rmContext对象中包含了appId对应的RMApp对象,实际上RMApp对应的对象类型是其实现类RMAppImpl
      RMApp rmApp = this.rmContext.getRMApps().get(appID);
      //省略
      rmApp.handle(event);
      //省略
      
    
  

在RMAppImpl#handle负责处理RMAppEvent事件,RMAppImpl实现了RMApp接口,而RMApp接口也继承了EventHandler接口:

public interface RMApp extends EventHandler<RMAppEvent> ...

因此RMAppImpl也是一个Handler,在RMAppImpl处理过程中,使用状态机改变RMApp对象状态:

public class RMAppImpl implements RMApp, Recoverable 
  public void handle(RMAppEvent event) 
    //省略
    this.stateMachine.doTransition(event.getType(), event);
    //省略
  

从RMAppImpl#handle方法可以看出,RMAppImpl维护了成员变量stateMachine,stateMachine就是状态机,通过该变量处理事件,并将RMAppImpl对象的状态进行转化。

5. 状态机初始化

对于要维护的对象,一般该对象对应的类负责初始化状态机。例如RMAppImpl作为一个被维护的应用对象,就包含了 StateMachineFactory成员,StateMachineFactory负责构建状态机。如下,通过StateMachineFactory#addTransition负责添加状态转换方法,StateMachineFactory#installTopology负责创建状态机:

public class RMAppImpl implements RMApp, Recoverable 
 private static final StateMachineFactory<RMAppImpl, RMAppState, RMAppEventType, RMAppEvent> stateMachineFactory = 
  new StateMachineFactory<RMAppImpl, RMAppState, RMAppEventType, RMAppEvent>(RMAppState.NEW)
  .addTransition(RMAppState.NEW, RMAppState.NEW,RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
  .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING, RMAppEventType.START, new RMAppNewlySavingTransition())
  .addTransition(RMAppState.NEW, 
    EnumSet.of(RMAppState.SUBMITTED, RMAppState.ACCEPTED, RMAppState.FINISHED, RMAppState.FAILED,RMAppState.KILLED, State.FINAL_SAVING)
    , RMAppEventType.RECOVER, new RMAppRecoveredTransition())
  //省略
  .installTopology();

StateMachineFactory负责真正存储具体的状态机,它由两个成员变量组成:transitionsListNode和stateMachineTable。transitionsListNode负责暂时存储Transition方法,最终会将transitionsListNode中的方法弹出,并放到stateMachineTable中存储。StateMachineFactory定义如下:

final public class StateMachineFactory<OPERAND, STATE extends Enum<STATE>, EVENTTYPE extends Enum<EVENTTYPE>, EVENT> 
  private final TransitionsListNode transitionsListNode;
  private Map<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>> stateMachineTable;

  public StateMachineFactory(STATE defaultInitialState) 
    this.transitionsListNode = null;
    this.defaultInitialState = defaultInitialState;
    this.optimized = false;
    this.stateMachineTable = null;
  

5.1 TransitionsListNode暂存Transtion流程

TransitionsListNode是链表结构,节点存储Transition状态转换方法,转换方法类型是ApplicableTransition:

private class TransitionsListNode 
    final ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> transition;
    final TransitionsListNode next;

    TransitionsListNode(ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> transition, TransitionsListNode next) 
      this.transition = transition;
      this.next = next;
    
  

ApplicableTransition是一个接口,它的泛型有四个:OPERAND, STATE, EVENTTYPE, EVENT。其中,OPERAND表示操作对象,STATE表示目的状态,EVENTTYPE表示事件类型,EVENT表示事件。ApplicableTransition接口定义了apply方法,apply方法负责将TransitionsListNode链表中的节点取出,并放入到最终状态机映射表中。

  private interface ApplicableTransition<OPERAND, STATE extends Enum<STATE>, EVENTTYPE extends Enum<EVENTTYPE>, EVENT> 
    void apply(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> subject);
  

ApplicableTransition接口具体的实现类为ApplicableSingleOrMultipleTransition,它负责储存真正的状态转换方法Transition对象,preState和eventType表示该状态方法对应的准备状态和时间类型。即:在满足preState状态和eventType类型的情况下,使用该Transition处理事件。apply方法则是将上述内容放入状态机映射表中。ApplicableSingleOrMultipleTransition定义如下:

  static private class ApplicableSingleOrMultipleTransition<OPERAND, STATE extends Enum<STATE>, EVENTTYPE extends Enum<EVENTTYPE>, EVENT>
          implements ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> 
    final STATE preState;
    final EVENTTYPE eventType;
    final Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition;

    ApplicableSingleOrMultipleTransition
        (STATE preState, EVENTTYPE eventType,
         Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition) 
      this.preState = preState;
      this.eventType = eventType;
      this.transition = transition;
    

    @Override
    public void apply(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> subject) 
      //获取状态机中preState准备状态对应表
      Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>> transitionMap = subject.stateMachineTable.get(preState);
      if (transitionMap == null) 
        // I use HashMap here because I would expect most EVENTTYPEs to not
        //  apply out of a particular state, so FSM sizes would be 
        //  quadratic if I use EnumMaps here as I do at the top level.
        transitionMap = new HashMap<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>();
        subject.stateMachineTable.put(preState, transitionMap);
      
      //将对应表中放入事件类型和处理方法
      transitionMap.put(eventType, transition);
    
  

Transition接口定义真正执行状态转换的方法:

private interface Transition<OPERAND, STATE extends Enum<STATE>, EVENTTYPE extends Enum<EVENTTYPE>, EVENT> 
    STATE doTransition(OPERAND operand, STATE oldState, EVENT event, EVENTTYPE eventType);
  

Transition接口的实现类之一:SingleInternalArc。它表示初始状态在进行状态转换方法后,只有一种结束状态:

  private class SingleInternalArc implements Transition<OPERAND, STATE, EVENTTYPE, EVENT> 

    private STATE postState;
    private SingleArcTransition<OPERAND, EVENT> hook; // transition hook

    SingleInternalArc(STATE postState,
        SingleArcTransition<OPERAND, EVENT> hook) 
      this.postState = postState;
      this.hook = hook;
    

    @Override
    public STATE doTransition(OPERAND operand, STATE oldState,
                              EVENT event, EVENTTYPE eventType) 
      if (hook != null) 
        hook.transition(operand, event);
      
      return postState;
    
  

Transition接口的实现类之二:MultipleInternalArc。它表示初始状态在进行状态转换方法后,根据Transition的执行结果返回结束状态:

  private class MultipleInternalArc implements Transition<OPERAND, STATE, EVENTTYPE, EVENT>

    // Fields
    private Set<STATE> validPostStates;
    private MultipleArcTransition<OPERAND, EVENT, STATE> hook;  // transition hook

    MultipleInternalArc(Set<STATE> postStates, MultipleArcTransition<OPERAND, EVENT, STATE> hook) 
      this.validPostStates = postStates;
      this.hook = hook;
    

    @Override
    public STATE doTransition(OPERAND operand, STATE oldState, EVENT event, EVENTTYPE eventType) throws InvalidStateTransitionException 
      STATE postState = hook.transition(operand, event);

      if (!validPostStates.contains(postState)) 
        throw new InvalidStateTransitionException(oldState, eventType);
      
      return postState;
    
  

上述Transition实现类中,分别有SingleArcTransition和MultipleArcTransition,它们内存才是真正封装转换方法的类。在StateMachineFactory#addTranstion时,传入的Transtion对象就是它们的实现类。

SingleArcTransition定义如下:

public interface SingleArcTransition<OPERAND, EVENT> 
  //由于结束状态确定,所以返回void
  public void transition(OPERAND operand, EVENT event);

MultipleArcTransition定义如下:

public interface MultipleArcTransition<OPERAND, EVENT, STATE extends Enum<STATE>> 
  //由于结束状态位置,需要根据执行结果确定结束状态
  public STATE transition(OPERAND operand, EVENT event);

它们的实现类很多,例如:

5.2 stateMachineTable状态机

stateMachineTable就是状态机,它的类型是两层Map:Map<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>,外层Map的key表示旧状态,内层Map的key表示事件类型,内层Map的value是Transition<OPERAND, STATE, EVENTTYPE, EVENT>接口,OPERAND表示操作对象,STATE表示目的状态,EVENTTYPE表示事件类型,EVENT表示事件。stateMachineTable起到的作用是:RMAppImpl可能有多种旧状态,每种旧状态可以对应多种事件类型,根据旧状态和要处理事件的类型,就能找到处理这种情形的状态转换方法和目的状态,同时状态转换方法包含对事件的处理。

5.3 StateMachineFactory构建状态机过程

在了解了StateMachineFactory的成员变量transitionsListNode和stateMachineTable后,就可以深入了解状态机的构建过程。在Handler实现类RMAppImpl中通过StateMachineFactory#addTransition注册状态转换方法。其中,对于SingleArcTransition这种唯一结束状态的实现类使用下面第一种addTransition方法注册;对于MultipleArcTransition这种不确定结束状态的实现类使用下面第二种addTransition方法注册。

5.3.1 注册SingleArcTransition实现类

  public StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> addTransition(STATE preState, STATE postState,
    Set<EVENTTYPE> eventTypes, SingleArcTransition<OPERAND, EVENT> hook) 
      StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> factory = null;
      for (EVENTTYPE event : eventTypes) 
      if (factory == null) 
        factory = addTransition(preState, postState, event, hook);
       else 
        factory = factory.addTransition(preState, postState, event, hook);
      
    
    return factory;
  

5.3.2 注册MultipleArcTransition实现类

  public StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> addTransition(STATE preState, Set<STATE> postStates, 
    EVENTTYPE eventType, MultipleArcTransition<OPERAND, EVENT, STATE> hook)
      return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>(this, 
        new ApplicableSingleOrMultipleTransition<OPERAND, STATE, EVENTTYPE, EVENT>(preState, eventType, new MultipleInternalArc(postStates, hook)));
  

MultipleArcTransition的构造函数,向旧的StateMachineFactory的transitionsListNode成员中添加前置状态preState和事件类型共同对应的状态转换方法:

  private StateMachineFactory(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> that, ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> t) 
    this.defaultInitialState = that.defaultInitialState;
    this.transitionsListNode = new TransitionsListNode(t, that.transitionsListNode);
    this.optimized = false;
    this.stateMachineTable = null;
  

5.3.3 构建状态机映射表

StateMachineFactory#addTransition方法完成了对TransitionsListNode的构建。而TransitionsListNode只用来暂时存储状态转换方法,StateMachineFactory#installTopology才是真正负责构建状态机映射表的:

  public StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> installTopology() 
    return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>(this, true);
  

在StateMachineFactory的构造函数中,如果传递optimized参数为true,表示要通过transitionsListNode构建状态机映射表。采用StateMachineFactory#makeStateMachineTable方法:

  private StateMachineFactory(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> that, boolean optimized) 
    this.defaultInitialState = that.defaultInitialState;
    this.transitionsListNode = that.transitionsListNode;
    this.optimized = optimized;
    if (optimized) 
      makeStateMachineTable();
     else 
      stateMachineTable = null;
    
  

StateMachineFactory#makeStateMachineTable方法将链表转为栈,弹栈并调用ApplicableTransition接口实现类的apply方法将Transition注册到状态机映射表汇总:

  private void makeStateMachineTable() 
    Stack<ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT>> stack =
      new Stack<ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT>>();

    Map<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>
      prototype = new HashMap<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>();

    prototype.put(defaultInitialState, null);

    // I use EnumMap here because itll be faster and denser.  I would
    //  expect most of the states to have at least one transition.
    stateMachineTable = new EnumMap<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>(prototype);

    for (TransitionsListNode cursor = transitionsListNode; cursor != null; cursor = cursor.next) 
      stack.push(cursor.transition);
    

    while (!stack.isEmpty()) 
      stack.pop().apply(this);
    
  

调用ApplicableTransition的实现类ApplicableSingleOrMultipleTransition的apply方法,注册Transition到状态机映射表中:

  static private class ApplicableSingleOrMultipleTransition<OPERAND, STATE extends Enum<STATE>, EVENTTYPE extends Enum<EVENTTYPE>, EVENT>
          implements ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> 
    final STATE preState;
    final EVENTTYPE eventType;
    final Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition;

    ApplicableSingleOrMultipleTransition(STATE preState, EVENTTYPE eventType, Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition) 
      this.preState = preState;
      this.eventType = eventType;
      this.transition = transition;
    

    @Override
    public void apply(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> subject) 
      Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>> transitionMap
        = subject.stateMachineTable.get(preState);
      if (transitionMap == null) 
        // I use HashMap here because I would expect most EVENTTYPEs to not
        //  apply out of a particular state, so FSM sizes would be 
        //  quadratic if I use EnumMaps here as I do at the top level.
        transitionMap = new HashMap<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>();
        subject.stateMachineTable.put(preState, transitionMap);
      
      transitionMap.put(eventType, transition);
    
  

当栈中的所有元素都调用完apply方法后,状态机构建完成。后面就开始访问状态机中的状态转换方法了。

6. 使用状态机(执行状态转换)

对于EventHandler而言,调用handle方法就会调用StateMachine#doTransition方法执行状态转换,例如RMAppImpl这个EventHandler就是如此:

public class RMAppImpl implements RMApp, Recoverable 
  public void handle(RMAppEvent event) 

    this.writeLock.lock();
    try 
      ApplicationId appID = event.getApplicationId();
      final RMAppState oldState = getState();
      //省略
      this.stateMachine.doTransition(event.getType(), event);
      //省略
     finally 
      this.writeLock.unlock();
    
  

StateMachineFactory$InternalStateMachine#doTransition负责执行作为中间方法,额外增加listener处理逻辑,有点类似AOP处理。它调用StateMachineFactory#doTransition方法进行状态转换,并返回处理结果的状态:

    public synchronized STATE doTransition(EVENTTYPE eventType, EVENT event) throws InvalidStateTransitionException  
      listener.preTransition(operand, currentState, event);
      STATE oldState = currentState;
      currentState = StateMachineFactory.this.doTransition(operand, currentState, eventType, event);
      listener.postTransition(operand, oldState, currentState, event);
      return currentState;
    
  

StateMachineFactory#doTransition根据前置状态和事件类型找到对应的Transition实现类,即SingleInternalArc或者MultipleInternalArc,而它们分别封装了SingleArcTransition和MultipleArcTransition,其实现类就是用户注册的Transition方法。最终执行状态转换:

  private STATE doTransition(OPERAND operand, STATE oldState, EVENTTYPE eventType, EVENT event) throws InvalidStateTransitionException 
    Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>> transitionMap = stateMachineTable.get(oldState);
    if (transitionMap != null) 
      Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition
          = transitionMap.get(eventType);
      if (transition != null) 
        return transition.doTransition(operand, oldState, event, eventType);
      
    
    throw new InvalidStateTransitionException(oldState, eventType);
  

最终执行的就是用户注册的自定义的Transition。例如:RMAppRecoveredTransition。它在处理完事件后,返回对应的应用状态:

private static final class RMAppRecoveredTransition implements
      MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> 

    @Override
    public RMAppState transition(RMAppImpl app, RMAppEvent event) 

      RMAppRecoverEvent recoverEvent = (RMAppRecoverEvent) event;
      app.recover(recoverEvent.getRMState());
      // The app has completed.
      if (app.recoveredFinalState != null) 
        app.recoverAppAttempts();
        new FinalTransition(app.recoveredFinalState).transition(app, event);
        return app.recoveredFinalState;
      

      if (UserGroupInformation.isSecurityEnabled()) 
        // asynchronously renew delegation token on recovery.
        try 
          app.rmContext.getDelegationTokenRenewer()
              .addApplicationAsyncDuringRecovery(app.getApplicationId(),
                  BuilderUtils.parseCredentials(app.submissionContext),
                  app.submissionContext.getCancelTokensWhenComplete(),
                  app.getUser(),
                  BuilderUtils.parseTokensConf(app.submissionContext));
         catch (Exception e) 
          String msg = "Failed to fetch user credentials from application:" + e
              .getMessage();
          app.diagnostics.append(msg);
          LOG.error(msg, e);
        
      

      for (Map.Entry<ApplicationTimeoutType, Long> timeout : app.applicationTimeouts
          .entrySet()) 
        app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId,
            timeout.getKey(), timeout.getValue());
        if (LOG.isDebugEnabled()) 
          long remainingTime = timeout.getValue() - app.systemClock.getTime();
          LOG.debug("Application " + app.applicationId
              + " is registered for timeout monitor, type=" + timeout.getKey()
              + " remaining timeout=" + (remainingTime > 0 ?
              remainingTime / 1000 :
              0) + " seconds");
        
      

      // No existent attempts means the attempt associated with this app was not
      // started or started but not yet saved.
      if (app.attempts.isEmpty()) 
        app.scheduler.handle(
            new AppAddedSchedulerEvent(app.user, app.submissionContext, false,
                app.applicationPriority, app.placementContext));
        return RMAppState.SUBMITTED;
      

      // Add application to scheduler synchronously to guarantee scheduler
      // knows applications before AM or NM re-registers.
      app.scheduler.handle(
          new AppAddedSchedulerEvent(app.user, app.submissionContext, true,
              app.applicationPriority, app.placementContext));

      // recover attempts
      app.recoverAppAttempts();

      // YARN-1507 is saving the application state after the application is
      // accepted. So after YARN-1507, an app is saved meaning it is accepted.
      // Thus we return ACCECPTED state on recovery.
      return RMAppState.ACCEPTED;
    
  

7. 总结

StateMachineFactory构建了一个初始状态preState->事件类型eventType->状态转换方法Transtition的对应表,合理地组织了多种多样的状态转换方法。这个对应表就是状态机。

以上是关于Yarn状态机框架分析的主要内容,如果未能解决你的问题,请参考以下文章

Yarn Active ResourceManager启动框架分析

状态机引擎在vivo营销自动化中的深度实践 | 引擎篇02

Yarn中的几种状态机

Yarn中的几种状态机

源码学习:yarn application 状态机

源码学习:yarn application 状态机