Yarn状态机框架分析
Posted 有山先生
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Yarn状态机框架分析相关的知识,希望对你有一定的参考价值。
1. 前言
上一篇文章介绍了Yarn事件驱动模型框架分析,了解到Yarn基于生产者-消费者模式处理事件。基于GenericEventHandler#handle生产事件;通过自定义的Handler实现类消费事件。其中,在消费事件时,会导致Yarn中对象状态的变化,将对象所有状态的变化情况汇总起来就是状态机。本文将介绍Yarn状态机框架实现思路。
2. 什么是状态机
状态机(State Machine),是有限状态自动机的简称,是现实事物运行规则抽象而成的一个数学模型:给定一个状态机,同时给定它的当前状态以及输入,那么输出状态时可以明确的运算出来的。例如对于自动门,给定初始状态closed,给定输入“开门”,那么下一个状态时可以运算出来的。自动门状态机基于状态转换图如下所示:
在Yarn中,状态机中最重要的两大概念是State状态和Event事件。例如,对于一个应用RMApp而言,RMApp存在一个初始状态,处理事件时,会根据事件类型匹配对应的转换类Transition,将RMApp从初始状态转化成目标状态。RMApp经历的流程为:初始状态-->转换方法-->目标状态,将其所有流程汇总起来,就是状态机。
在Yarn中,App、AppAttempt、Container、Node都可以使用状态机表示。其中,RMApp:用于维护一个Application的生命周期;RMAppAttempt:用于维护一次尝试运行的生命周期;RMContainer:用于维护一个已分配的资源最小单位Container的生命周期;RMNode:用于维护一个NodeManager的生命周期。
3. 为什么要设计状态机
对于任意实体,与其相关的事件可能非常多,事件对应的类型和初始状态多种多样。如果不合理的组织起来,实体的状态转换流程会负责冗杂。Yarn状态机就负责合理地组织这些状态转换流程,快速找到指定初始状态和事件类型对应的状态转换方法。
4. 何时使用状态机
在Handler消费事件队列中的事件时,会使用状态机,更新其对象的状态。状态机的使用流程分为两步:
- 第一步:Service注册Handler。
- 第二步:Handler使用状态机。
4.1 Service注册Handler
以RMApp的状态机为例,Active ResourceManager服务会管理RMApp对象的生命周期,RMApp状态机负责管理RMApp的状态变化。在Active ResourceManager初始化方法ResourceManager$RMActiveServices#serviceInit中,将RMAppEventType类型的事件注册了Handler实现类ApplicationEventDispatcher,ApplicationEventDispatcher负责处理RMAppEventType类型事件:
public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean
public class RMActiveServices extends CompositeService
protected void serviceInit(Configuration configuration) throws Exception
//省略
rmDispatcher.register(RMAppEventType.class, new ApplicationEventDispatcher(rmContext));
//省略
4.2 Handler使用状态机
ApplicationEventDispatcher实现了EventHandler接口,其handler实现方法并没有真正地处理RMAppEvent事件,而是由RMApp处理:
public static final class ApplicationEventDispatcher implements EventHandler<RMAppEvent>
private final RMContext rmContext;
public ApplicationEventDispatcher(RMContext rmContext)
this.rmContext = rmContext;
@Override
public void handle(RMAppEvent event)
ApplicationId appID = event.getApplicationId();
//rmContext对象中包含了appId对应的RMApp对象,实际上RMApp对应的对象类型是其实现类RMAppImpl
RMApp rmApp = this.rmContext.getRMApps().get(appID);
//省略
rmApp.handle(event);
//省略
在RMAppImpl#handle负责处理RMAppEvent事件,RMAppImpl实现了RMApp接口,而RMApp接口也继承了EventHandler接口:
public interface RMApp extends EventHandler<RMAppEvent> ...
因此RMAppImpl也是一个Handler,在RMAppImpl处理过程中,使用状态机改变RMApp对象状态:
public class RMAppImpl implements RMApp, Recoverable
public void handle(RMAppEvent event)
//省略
this.stateMachine.doTransition(event.getType(), event);
//省略
从RMAppImpl#handle方法可以看出,RMAppImpl维护了成员变量stateMachine,stateMachine就是状态机,通过该变量处理事件,并将RMAppImpl对象的状态进行转化。
5. 状态机初始化
对于要维护的对象,一般该对象对应的类负责初始化状态机。例如RMAppImpl作为一个被维护的应用对象,就包含了 StateMachineFactory成员,StateMachineFactory负责构建状态机。如下,通过StateMachineFactory#addTransition负责添加状态转换方法,StateMachineFactory#installTopology负责创建状态机:
public class RMAppImpl implements RMApp, Recoverable
private static final StateMachineFactory<RMAppImpl, RMAppState, RMAppEventType, RMAppEvent> stateMachineFactory =
new StateMachineFactory<RMAppImpl, RMAppState, RMAppEventType, RMAppEvent>(RMAppState.NEW)
.addTransition(RMAppState.NEW, RMAppState.NEW,RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING, RMAppEventType.START, new RMAppNewlySavingTransition())
.addTransition(RMAppState.NEW,
EnumSet.of(RMAppState.SUBMITTED, RMAppState.ACCEPTED, RMAppState.FINISHED, RMAppState.FAILED,RMAppState.KILLED, State.FINAL_SAVING)
, RMAppEventType.RECOVER, new RMAppRecoveredTransition())
//省略
.installTopology();
StateMachineFactory负责真正存储具体的状态机,它由两个成员变量组成:transitionsListNode和stateMachineTable。transitionsListNode负责暂时存储Transition方法,最终会将transitionsListNode中的方法弹出,并放到stateMachineTable中存储。StateMachineFactory定义如下:
final public class StateMachineFactory<OPERAND, STATE extends Enum<STATE>, EVENTTYPE extends Enum<EVENTTYPE>, EVENT>
private final TransitionsListNode transitionsListNode;
private Map<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>> stateMachineTable;
public StateMachineFactory(STATE defaultInitialState)
this.transitionsListNode = null;
this.defaultInitialState = defaultInitialState;
this.optimized = false;
this.stateMachineTable = null;
5.1 TransitionsListNode暂存Transtion流程
TransitionsListNode是链表结构,节点存储Transition状态转换方法,转换方法类型是ApplicableTransition:
private class TransitionsListNode
final ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> transition;
final TransitionsListNode next;
TransitionsListNode(ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> transition, TransitionsListNode next)
this.transition = transition;
this.next = next;
ApplicableTransition是一个接口,它的泛型有四个:OPERAND, STATE, EVENTTYPE, EVENT。其中,OPERAND表示操作对象,STATE表示目的状态,EVENTTYPE表示事件类型,EVENT表示事件。ApplicableTransition接口定义了apply方法,apply方法负责将TransitionsListNode链表中的节点取出,并放入到最终状态机映射表中。
private interface ApplicableTransition<OPERAND, STATE extends Enum<STATE>, EVENTTYPE extends Enum<EVENTTYPE>, EVENT>
void apply(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> subject);
ApplicableTransition接口具体的实现类为ApplicableSingleOrMultipleTransition,它负责储存真正的状态转换方法Transition对象,preState和eventType表示该状态方法对应的准备状态和时间类型。即:在满足preState状态和eventType类型的情况下,使用该Transition处理事件。apply方法则是将上述内容放入状态机映射表中。ApplicableSingleOrMultipleTransition定义如下:
static private class ApplicableSingleOrMultipleTransition<OPERAND, STATE extends Enum<STATE>, EVENTTYPE extends Enum<EVENTTYPE>, EVENT>
implements ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT>
final STATE preState;
final EVENTTYPE eventType;
final Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition;
ApplicableSingleOrMultipleTransition
(STATE preState, EVENTTYPE eventType,
Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition)
this.preState = preState;
this.eventType = eventType;
this.transition = transition;
@Override
public void apply(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> subject)
//获取状态机中preState准备状态对应表
Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>> transitionMap = subject.stateMachineTable.get(preState);
if (transitionMap == null)
// I use HashMap here because I would expect most EVENTTYPEs to not
// apply out of a particular state, so FSM sizes would be
// quadratic if I use EnumMaps here as I do at the top level.
transitionMap = new HashMap<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>();
subject.stateMachineTable.put(preState, transitionMap);
//将对应表中放入事件类型和处理方法
transitionMap.put(eventType, transition);
Transition接口定义真正执行状态转换的方法:
private interface Transition<OPERAND, STATE extends Enum<STATE>, EVENTTYPE extends Enum<EVENTTYPE>, EVENT>
STATE doTransition(OPERAND operand, STATE oldState, EVENT event, EVENTTYPE eventType);
Transition接口的实现类之一:SingleInternalArc。它表示初始状态在进行状态转换方法后,只有一种结束状态:
private class SingleInternalArc implements Transition<OPERAND, STATE, EVENTTYPE, EVENT>
private STATE postState;
private SingleArcTransition<OPERAND, EVENT> hook; // transition hook
SingleInternalArc(STATE postState,
SingleArcTransition<OPERAND, EVENT> hook)
this.postState = postState;
this.hook = hook;
@Override
public STATE doTransition(OPERAND operand, STATE oldState,
EVENT event, EVENTTYPE eventType)
if (hook != null)
hook.transition(operand, event);
return postState;
Transition接口的实现类之二:MultipleInternalArc。它表示初始状态在进行状态转换方法后,根据Transition的执行结果返回结束状态:
private class MultipleInternalArc implements Transition<OPERAND, STATE, EVENTTYPE, EVENT>
// Fields
private Set<STATE> validPostStates;
private MultipleArcTransition<OPERAND, EVENT, STATE> hook; // transition hook
MultipleInternalArc(Set<STATE> postStates, MultipleArcTransition<OPERAND, EVENT, STATE> hook)
this.validPostStates = postStates;
this.hook = hook;
@Override
public STATE doTransition(OPERAND operand, STATE oldState, EVENT event, EVENTTYPE eventType) throws InvalidStateTransitionException
STATE postState = hook.transition(operand, event);
if (!validPostStates.contains(postState))
throw new InvalidStateTransitionException(oldState, eventType);
return postState;
上述Transition实现类中,分别有SingleArcTransition和MultipleArcTransition,它们内存才是真正封装转换方法的类。在StateMachineFactory#addTranstion时,传入的Transtion对象就是它们的实现类。
SingleArcTransition定义如下:
public interface SingleArcTransition<OPERAND, EVENT>
//由于结束状态确定,所以返回void
public void transition(OPERAND operand, EVENT event);
MultipleArcTransition定义如下:
public interface MultipleArcTransition<OPERAND, EVENT, STATE extends Enum<STATE>>
//由于结束状态位置,需要根据执行结果确定结束状态
public STATE transition(OPERAND operand, EVENT event);
它们的实现类很多,例如:
5.2 stateMachineTable状态机
stateMachineTable就是状态机,它的类型是两层Map:Map<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>,外层Map的key表示旧状态,内层Map的key表示事件类型,内层Map的value是Transition<OPERAND, STATE, EVENTTYPE, EVENT>接口,OPERAND表示操作对象,STATE表示目的状态,EVENTTYPE表示事件类型,EVENT表示事件。stateMachineTable起到的作用是:RMAppImpl可能有多种旧状态,每种旧状态可以对应多种事件类型,根据旧状态和要处理事件的类型,就能找到处理这种情形的状态转换方法和目的状态,同时状态转换方法包含对事件的处理。
5.3 StateMachineFactory构建状态机过程
在了解了StateMachineFactory的成员变量transitionsListNode和stateMachineTable后,就可以深入了解状态机的构建过程。在Handler实现类RMAppImpl中通过StateMachineFactory#addTransition注册状态转换方法。其中,对于SingleArcTransition这种唯一结束状态的实现类使用下面第一种addTransition方法注册;对于MultipleArcTransition这种不确定结束状态的实现类使用下面第二种addTransition方法注册。
5.3.1 注册SingleArcTransition实现类
public StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> addTransition(STATE preState, STATE postState,
Set<EVENTTYPE> eventTypes, SingleArcTransition<OPERAND, EVENT> hook)
StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> factory = null;
for (EVENTTYPE event : eventTypes)
if (factory == null)
factory = addTransition(preState, postState, event, hook);
else
factory = factory.addTransition(preState, postState, event, hook);
return factory;
5.3.2 注册MultipleArcTransition实现类
public StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> addTransition(STATE preState, Set<STATE> postStates,
EVENTTYPE eventType, MultipleArcTransition<OPERAND, EVENT, STATE> hook)
return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>(this,
new ApplicableSingleOrMultipleTransition<OPERAND, STATE, EVENTTYPE, EVENT>(preState, eventType, new MultipleInternalArc(postStates, hook)));
MultipleArcTransition的构造函数,向旧的StateMachineFactory的transitionsListNode成员中添加前置状态preState和事件类型共同对应的状态转换方法:
private StateMachineFactory(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> that, ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> t)
this.defaultInitialState = that.defaultInitialState;
this.transitionsListNode = new TransitionsListNode(t, that.transitionsListNode);
this.optimized = false;
this.stateMachineTable = null;
5.3.3 构建状态机映射表
StateMachineFactory#addTransition方法完成了对TransitionsListNode的构建。而TransitionsListNode只用来暂时存储状态转换方法,StateMachineFactory#installTopology才是真正负责构建状态机映射表的:
public StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> installTopology()
return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>(this, true);
在StateMachineFactory的构造函数中,如果传递optimized参数为true,表示要通过transitionsListNode构建状态机映射表。采用StateMachineFactory#makeStateMachineTable方法:
private StateMachineFactory(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> that, boolean optimized)
this.defaultInitialState = that.defaultInitialState;
this.transitionsListNode = that.transitionsListNode;
this.optimized = optimized;
if (optimized)
makeStateMachineTable();
else
stateMachineTable = null;
StateMachineFactory#makeStateMachineTable方法将链表转为栈,弹栈并调用ApplicableTransition接口实现类的apply方法将Transition注册到状态机映射表汇总:
private void makeStateMachineTable()
Stack<ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT>> stack =
new Stack<ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT>>();
Map<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>
prototype = new HashMap<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>();
prototype.put(defaultInitialState, null);
// I use EnumMap here because itll be faster and denser. I would
// expect most of the states to have at least one transition.
stateMachineTable = new EnumMap<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>(prototype);
for (TransitionsListNode cursor = transitionsListNode; cursor != null; cursor = cursor.next)
stack.push(cursor.transition);
while (!stack.isEmpty())
stack.pop().apply(this);
调用ApplicableTransition的实现类ApplicableSingleOrMultipleTransition的apply方法,注册Transition到状态机映射表中:
static private class ApplicableSingleOrMultipleTransition<OPERAND, STATE extends Enum<STATE>, EVENTTYPE extends Enum<EVENTTYPE>, EVENT>
implements ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT>
final STATE preState;
final EVENTTYPE eventType;
final Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition;
ApplicableSingleOrMultipleTransition(STATE preState, EVENTTYPE eventType, Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition)
this.preState = preState;
this.eventType = eventType;
this.transition = transition;
@Override
public void apply(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> subject)
Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>> transitionMap
= subject.stateMachineTable.get(preState);
if (transitionMap == null)
// I use HashMap here because I would expect most EVENTTYPEs to not
// apply out of a particular state, so FSM sizes would be
// quadratic if I use EnumMaps here as I do at the top level.
transitionMap = new HashMap<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>();
subject.stateMachineTable.put(preState, transitionMap);
transitionMap.put(eventType, transition);
当栈中的所有元素都调用完apply方法后,状态机构建完成。后面就开始访问状态机中的状态转换方法了。
6. 使用状态机(执行状态转换)
对于EventHandler而言,调用handle方法就会调用StateMachine#doTransition方法执行状态转换,例如RMAppImpl这个EventHandler就是如此:
public class RMAppImpl implements RMApp, Recoverable
public void handle(RMAppEvent event)
this.writeLock.lock();
try
ApplicationId appID = event.getApplicationId();
final RMAppState oldState = getState();
//省略
this.stateMachine.doTransition(event.getType(), event);
//省略
finally
this.writeLock.unlock();
StateMachineFactory$InternalStateMachine#doTransition负责执行作为中间方法,额外增加listener处理逻辑,有点类似AOP处理。它调用StateMachineFactory#doTransition方法进行状态转换,并返回处理结果的状态:
public synchronized STATE doTransition(EVENTTYPE eventType, EVENT event) throws InvalidStateTransitionException
listener.preTransition(operand, currentState, event);
STATE oldState = currentState;
currentState = StateMachineFactory.this.doTransition(operand, currentState, eventType, event);
listener.postTransition(operand, oldState, currentState, event);
return currentState;
StateMachineFactory#doTransition根据前置状态和事件类型找到对应的Transition实现类,即SingleInternalArc或者MultipleInternalArc,而它们分别封装了SingleArcTransition和MultipleArcTransition,其实现类就是用户注册的Transition方法。最终执行状态转换:
private STATE doTransition(OPERAND operand, STATE oldState, EVENTTYPE eventType, EVENT event) throws InvalidStateTransitionException
Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>> transitionMap = stateMachineTable.get(oldState);
if (transitionMap != null)
Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition
= transitionMap.get(eventType);
if (transition != null)
return transition.doTransition(operand, oldState, event, eventType);
throw new InvalidStateTransitionException(oldState, eventType);
最终执行的就是用户注册的自定义的Transition。例如:RMAppRecoveredTransition。它在处理完事件后,返回对应的应用状态:
private static final class RMAppRecoveredTransition implements
MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState>
@Override
public RMAppState transition(RMAppImpl app, RMAppEvent event)
RMAppRecoverEvent recoverEvent = (RMAppRecoverEvent) event;
app.recover(recoverEvent.getRMState());
// The app has completed.
if (app.recoveredFinalState != null)
app.recoverAppAttempts();
new FinalTransition(app.recoveredFinalState).transition(app, event);
return app.recoveredFinalState;
if (UserGroupInformation.isSecurityEnabled())
// asynchronously renew delegation token on recovery.
try
app.rmContext.getDelegationTokenRenewer()
.addApplicationAsyncDuringRecovery(app.getApplicationId(),
BuilderUtils.parseCredentials(app.submissionContext),
app.submissionContext.getCancelTokensWhenComplete(),
app.getUser(),
BuilderUtils.parseTokensConf(app.submissionContext));
catch (Exception e)
String msg = "Failed to fetch user credentials from application:" + e
.getMessage();
app.diagnostics.append(msg);
LOG.error(msg, e);
for (Map.Entry<ApplicationTimeoutType, Long> timeout : app.applicationTimeouts
.entrySet())
app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId,
timeout.getKey(), timeout.getValue());
if (LOG.isDebugEnabled())
long remainingTime = timeout.getValue() - app.systemClock.getTime();
LOG.debug("Application " + app.applicationId
+ " is registered for timeout monitor, type=" + timeout.getKey()
+ " remaining timeout=" + (remainingTime > 0 ?
remainingTime / 1000 :
0) + " seconds");
// No existent attempts means the attempt associated with this app was not
// started or started but not yet saved.
if (app.attempts.isEmpty())
app.scheduler.handle(
new AppAddedSchedulerEvent(app.user, app.submissionContext, false,
app.applicationPriority, app.placementContext));
return RMAppState.SUBMITTED;
// Add application to scheduler synchronously to guarantee scheduler
// knows applications before AM or NM re-registers.
app.scheduler.handle(
new AppAddedSchedulerEvent(app.user, app.submissionContext, true,
app.applicationPriority, app.placementContext));
// recover attempts
app.recoverAppAttempts();
// YARN-1507 is saving the application state after the application is
// accepted. So after YARN-1507, an app is saved meaning it is accepted.
// Thus we return ACCECPTED state on recovery.
return RMAppState.ACCEPTED;
7. 总结
StateMachineFactory构建了一个初始状态preState->事件类型eventType->状态转换方法Transtition的对应表,合理地组织了多种多样的状态转换方法。这个对应表就是状态机。
以上是关于Yarn状态机框架分析的主要内容,如果未能解决你的问题,请参考以下文章