Yarn Active ResourceManager启动框架分析
Posted 有山先生
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Yarn Active ResourceManager启动框架分析相关的知识,希望对你有一定的参考价值。
1. 前言
通过第一篇文章Yarn Service设计思想知道,YARN为了处理繁多的功能,将每个功能封装成一个Service,通过实现CompositeService统一管理子服务的初始化/启动/停止流程。ResourceManager作为CompositeService的实现类,也遵循这套流程。更重要的是,在启动ResourceManager时,还会通过Zookeeper选举出Active ResourceManager。本篇文章以ResourceManager服务启动为基础,深入探究Active ResourceManager选举框架。
2. ResourceManager启动哪些服务
对于ResourceManager提供的服务,分为两种:常驻服务和"活动"服务active service:
- 常驻服务在两台ResourceManager上都提供。具体提供的服务如下:
- Active Service只能在一台Active ResourceManager上提供。具体提供的服务如下:
对于ActiveServices的初始化/启动/管理,这里先写结论,具体的分析在后面:
3. 为什么不将ActiveServices服务作为ResourceManager子服务
由于ActiveServices只能在一台ResourceManager上启动,因此ActiveServices的start启动逻辑与ResourceManager的启动逻辑不同:ResourceManager无差别启动所有子服务,此时由于还不确定哪一台ResourceManager作为Active ResourceManager,因此此时如果ActiveServices加入ResourceManager子服务,那么两台ResourceManager上都会启动ActiveServices,这显然不符合ActiveServices服务的唯一性。
4. 何时启动ActiveServices服务
在两台ResourceManager中,为了选举出Active ResourceManager,从而提供唯一的ActiveServices服务。ResourceManager提供了常驻子服务EmbeddedElector。EmbeddedElector内部连接zookeeper,当启动EmbeddedElector时,互斥地争抢当前ResourceManager对应的锁,抢到锁后,当前ResourceManager状态切换成Active ResourceManager,并启动ActiveServices服务;否则当前ResourceManager状态切换成Standby ResourceManager,只维护常驻服务。同时,EmbeddedElector服务还向zookeeper注册watcher,一旦Active ResourceManager状态发生变化,watcher的回调函数会立即切换ResourceManager状态。
5. ResourceManager子服务初始化&启动流程
在针对Active ResourceManager启动方面,其重要的子服务初始化/启动流程如下:
- EmbeddedElector选举服务初始化
- ActiveServices初始化
- EmbeddedElector选举服务启动
- ActiveServices启动
对应的代码如下:
public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean
public static void main(String argv[])
//省略
ResourceManager resourceManager = new ResourceManager();
// 这里就是调用AbstractService.init,然后调用ResourceManager.serviceInit
resourceManager.init(conf);
// 和上面类似,调用ResourceManager.serviceStart
resourceManager.start();
//省略
5.1 ResourceManager初始化流程
ResourceManager#init初始化方法是继承自AbstractService#init方法,最终调用ResourceManager#serviceInit:
public abstract class AbstractService implements Service
public void init(Configuration conf)
if (conf == null)
throw new ServiceStateException("Cannot initialize service "
+ getName() + ": null configuration");
if (isInState(STATE.INITED))
return;
synchronized (stateChangeLock)
if (enterState(STATE.INITED) != STATE.INITED)
setConfig(conf);
try
serviceInit(config);
if (isInState(STATE.INITED))
//if the service ended up here during init,
//notify the listeners
notifyListeners();
catch (Exception e)
noteFailure(e);
ServiceOperations.stopQuietly(LOG, this);
throw ServiceStateException.convert(e);
ResourceManager#serviceInit负责创建EmbeddedElector服务作为子服务,并调用ResourceManager#createAndInitActiveServices方法创建并初始化独立的ActiveServices服务:
public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean
protected void serviceInit(Configuration conf) throws Exception
//省略
//注册常驻服务,例如AdminService
adminService = createAdminService();
addService(adminService);
rmContext.setRMAdminService(adminService);
//省略
if (this.rmContext.isHAEnabled())
// If the RM is configured to use an embedded leader elector,
// initialize the leader elector.
if (HAUtil.isAutomaticFailoverEnabled(conf)
&& HAUtil.isAutomaticFailoverEmbedded(conf))
EmbeddedElector elector = createEmbeddedElector();
addIfService(elector);
rmContext.setLeaderElectorService(elector);
//省略
createAndInitActiveServices(false);
//省略
5.2 创建选举服务EmbeddedElector
ResourceManager根据yarn.resourcemanager.ha.curator-leader-elector.enabled配置确定EmbeddedElector的具体实现类。如果为true,就确定具体实现类为CuratorBasedElectorService,该实现类基于curator框架,curator框架是zk客户端框架,它在zookeeper原生API接口上进行了包装。默认的实现类为ActiveStandbyElectorBasedElectorService,它基于原生zookeeper API接口:
public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean
protected EmbeddedElector createEmbeddedElector() throws IOException
EmbeddedElector elector;
curatorEnabled =
conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
if (curatorEnabled)
this.zkManager = createAndStartZKManager(conf);
elector = new CuratorBasedElectorService(this);
else
elector = new ActiveStandbyElectorBasedElectorService(this);
return elector;
5.3 ActiveStandbyElectorBasedElectorService选举服务初始化
本文章基于ActiveStandbyElectorBasedElectorService选举服务讨论Active ResourceManager选举流程。ActiveStandbyElectorBasedElectorService的初始化方法中,定义了zookeeper路径/yarn-leader-election/ActiveStandbyElectorLock,每台ResourceManager的ElectorService都会尝试在zookeeper中创建该临时路径。一旦路径创建成功,该ResourceManager最终会被选举成为Active ResourceManager。
最重要的是,ActiveStandbyElectorBasedElectorService初始化时,创建成员变量ActiveStandbyElector实例:
public class ActiveStandbyElectorBasedElectorService extends AbstractService implements EmbeddedElector, ActiveStandbyElector.ActiveStandbyElectorCallback
protected void serviceInit(Configuration conf) throws Exception
//省略
String zkQuorum = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
//省略
String rmId = HAUtil.getRMHAId(conf);
String clusterId = YarnConfiguration.getClusterId(conf);
localActiveNodeInfo = createActiveNodeInfo(clusterId, rmId);
String zkBasePath = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
String electionZNode = zkBasePath + "/" + clusterId;
zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS,
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
List<ACL> zkAcls = ZKCuratorManager.getZKAcls(conf);
List<ZKUtil.ZKAuthInfo> zkAuths = ZKCuratorManager.getZKAuths(conf);
//省略
//创建选举对象
elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout,
electionZNode, zkAcls, zkAuths, this, maxRetryNum, false);
elector.ensureParentZNode();
//省略
super.serviceInit(conf);
ActiveStandbyElector负责连接Zookeeper服务端,维持watcher,监听互斥锁/yarn-leader-election/ActiveStandbyElectorLock的状态,根据其状态切换ResourceManager的状态。
5.3.1 zookeeper连接
在ActiveStandbyElector初始化时,会创建与zookeeper的连接:
public ActiveStandbyElector(String zookeeperHostPorts, int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl, List<ZKAuthInfo> authInfo, ActiveStandbyElectorCallback app, int maxRetryNum, boolean failFast) throws IOException, HadoopIllegalArgumentException, KeeperException
if (app == null || acl == null || parentZnodeName == null
|| zookeeperHostPorts == null || zookeeperSessionTimeout <= 0)
throw new HadoopIllegalArgumentException("Invalid argument");
zkHostPort = zookeeperHostPorts;
zkSessionTimeout = zookeeperSessionTimeout;
zkAcl = acl;
zkAuthInfo = authInfo;
appClient = app;
znodeWorkingDir = parentZnodeName;
zkLockFilePath = znodeWorkingDir + "/" + LOCK_FILENAME;
zkBreadCrumbPath = znodeWorkingDir + "/" + BREADCRUMB_FILENAME;
this.maxRetryNum = maxRetryNum;
// establish the ZK Connection for future API calls
if (failFast)
createConnection();
else
reEstablishSession();
reEstablishSession在createConnection方法外,包装了一层错误重试。这里直接看ActiveStandbyElector#createConnection方法:
private void createConnection() throws IOException, KeeperException
//省略
zkClient = connectToZooKeeper();
//省略
ActiveStandbyElector#connectToZooKeeper负责创建Watcher对象,对zookeeper进行监听:
protected synchronized ZooKeeper connectToZooKeeper() throws IOException, KeeperException
watcher = new WatcherWithClientRef();
//把watcher注册到zookeeper中
ZooKeeper zk = createZooKeeper();
watcher.setZooKeeperRef(zk);
//省略
watcher.waitForZKConnectionEvent(zkSessionTimeout);
//省略
return zk;
WatcherWithClientRef#process方法负责处理zk事件,真实处理事件的是ActiveStandbyElector#processWatchEvent方法:
private final class WatcherWithClientRef implements Watcher
private ZooKeeper zk;
//只有收到zk服务端的返回的连接事件后,才允许处理其它事件
private CountDownLatch hasReceivedEvent = new CountDownLatch(1);
//只有等待watcher设置了zookeeper引用,才能处理事件
private CountDownLatch hasSetZooKeeper = new CountDownLatch(1);
//省略普通方法
//process是watcher处理zk事件的方法
@Override
public void process(WatchedEvent event)
//省略
ActiveStandbyElector.this.processWatchEvent(zk, event);
//省略
5.3.2 zookeeper监听处理
ActiveStandbyElector#processWatchEvent负责处理监听事件,zk状态和事件类型对应关系如下:
根据zk状态和事件类型的不同,对ResourceManager状态的调整策略也不同。具体处理逻辑如下所示:
public class ActiveStandbyElector implements StatCallback, StringCallback
private ConnectionState zkConnectionState = ConnectionState.TERMINATED;
synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event)
Event.EventType eventType = event.getType();
//处理连接状态下的事件
if (eventType == Event.EventType.None)
// the connection state has changed
switch (event.getState())
case SyncConnected:
LOG.info("Session connected.");
ConnectionState prevConnectionState = zkConnectionState;
zkConnectionState = ConnectionState.CONNECTED;
if (prevConnectionState == ConnectionState.DISCONNECTED &&
wantToBeInElection)
monitorActiveStatus();
break;
case Disconnected:
LOG.info("Session disconnected. Entering neutral mode...");
// ask the app to move to safe state because zookeeper connection
// is not active and we dont know our state
zkConnectionState = ConnectionState.DISCONNECTED;
enterNeutralMode();
break;
case Expired:
// the connection got terminated because of session timeout
// call listener to reconnect
LOG.info("Session expired. Entering neutral mode and rejoining...");
enterNeutralMode();
reJoinElection(0);
break;
case SaslAuthenticated:
LOG.info("Successfully authenticated to ZooKeeper using SASL.");
break;
default:
fatalError("Unexpected Zookeeper watch event state: "
+ event.getState());
break;
return;
//监听节点发生修改
String path = event.getPath();
if (path != null)
switch (eventType)
case NodeDeleted:
if (state == State.ACTIVE)
enterNeutralMode();
joinElectionInternal();
break;
case NodeDataChanged:
monitorActiveStatus();
break;
default:
if (LOG.isDebugEnabled())
LOG.debug("Unexpected node event: " + eventType + " for path: " + path);
monitorActiveStatus();
return;
// some unexpected error has occurred
fatalError("Unexpected watch error from Zookeeper");
对于ActiveStandbyElector处理zk事件的方法,无非就是ResourceManager进入active状态/standby状态/neutral状态。这里讨论一下它们的转换逻辑。
5.3.2.1 竞争active状态
检查是否存在节点,不存在就进入standby状态,并重新注册watcher:
public class ActiveStandbyElector implements StatCallback, StringCallback
private void monitorLockNodeAsync()
monitorLockNodePending = true;
monitorLockNodeClient = zkClient;
zkClient.exists(zkLockFilePath, watcher, this, zkClient);
ActiveStandbyElector重写了exists回调函数,会根据分布式锁的获取情况转换ResourceManager的主备状态:
public class ActiveStandbyElector implements StatCallback, StringCallback
public synchronized void processResult(int rc, String path, Object ctx, Stat stat)
//如果当前ResourceManager获取到了zk分布式锁,就进入activce状态,否则就进入standby状态
if (stat.getEphemeralOwner() == zkClient.getSessionId())
//进入active状态
if (!becomeActive())
reJoinElectionAfterFailureToBecomeActive();
else
//进入standby状态
becomeStandby();
//节点不存在就进入中立状态,并尝试创建zk分布式锁
if (isNodeDoesNotExist(code))
enterNeutralMode();
//尝试重新创建zk分布式锁
joinElectionInternal();
return;
5.3.2.2 中立状态处理
如果断连,就进入NEUTRAL状态:
public class ActiveStandbyElector implements StatCallback, StringCallback
private void enterNeutralMode()
if (state != State.NEUTRAL)
state = State.NEUTRAL;
appClient.enterNeutralMode();
ActiveStandbyElector#enterNeutralMode调用appClient成员的enterNeutralMode方法。而appClient的实例类型其实就是ActiveStandbyElectorBasedElectorService,即调用ActiveStandbyElectorBasedElectorService#enterNeutralMode进入中立状态。中立状态下ResourceManager丢失与ZK的连接,尝试先进入standby状态:
public class ActiveStandbyElectorBasedElectorService extends AbstractService implements EmbeddedElector, ActiveStandbyElector.ActiveStandbyElectorCallback
public void enterNeutralMode()
//省略
zkDisconnectTimer = new Timer("Zookeeper disconnect timer");
zkDisconnectTimer.schedule(new TimerTask()
@Override
public void run()
synchronized (zkDisconnectLock)
becomeStandby();
, zkSessionTimeout);
//省略
5.3.2.3 连接过期处理
如果过期,就重新尝试进入Active状态:
public class ActiveStandbyElector implements StatCallback, StringCallback
private void reJoinElection(int sleepTime)
sessionReestablishLockForTests.lock();
try
terminateConnection();
sleepFor(sleepTime);
if (appData != null)
joinElectionInternal();
finally
sessionReestablishLockForTests.unlock();
对于初次链接zookeeper场景。初始状态是ConnectionState.TERMINATED,当客户端与zookeeper服务端成功创建会话时,客户端收到zookeeper服务端返回的状态是SyncConnected,其对应的事件类型是Event.EventType.None。按照zookeeper事件处理方法processWatchEvent,此时直接break跳出switch分支。这表示,当客户端成功与服务端建立连接,客户端不需要进行任何处理。
5.4 Active Service初始化
在ResourceManager初始化时,会额外调用方法初始化ActiveServices。Active Service不属于ResourceManager的子服务,即ResourceManager的初始化/启动/停止流程与Active Service无关:
public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean
protected void serviceInit(Configuration conf) throws Exception
//省略
createAndInitActiveServices(false);
//省略
ResourceManager#createAndInitActiveServices调用activeServices的初始化逻辑:
public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean
protected void createAndInitActiveServices(boolean fromActive)
activeServices = new RMActiveServices(this);
activeServices.fromActive = fromActive;
activeServices.init(conf);
activeServices具体类型为RMActiveServices,其初始化过程就是创建子服务,并添加子服务:
public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean
public class RMActiveServices extends CompositeService
protected void serviceInit(Configuration configuration) throws Exception
standByTransitionRunnable = new StandByTransitionRunnable();
//忽略
xxxService = createXxxService();
addService(xxxService);
//忽略
super.serviceInit(conf);
5.5 选举服务启动
创建ActiveStandbyElectorBasedElectorService对象后,所有Resourcemanager都创建了zkClient,与zkServer创建连接。启动EmbeddedElector的调用流如下:
ActiveStandbyElectorBasedElectorService#serviceStart -> ActiveStandbyElector#joinElection -> ActiveStandbyElector#joinElectionInternal -> ActiveStandbyElector#createLockNodeAsync。ActiveStandbyElector#createLockNodeAsync负责获取获取active ResourceManager的锁:
public class ActiveStandbyElector implements StatCallback, StringCallback
private void createLockNodeAsync()
zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL, this, zkClient);
createLockNodeAsync调用Zookeeper#create尝试获取分布式锁,以进入Active状态:
public class ZooKeeper
public void create(final String path, byte data[], List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx)
//省略
StringCallback是异步回调,表示当客户端向服务端发送创建节点的请求时,服务端异步返回响应消息给客户端后,客户端通过StringCallback#processResult处理该响应。
对于上述create方法,ActiveStandbyElector实现了Zookeeper提供的回调接口。当create方法执行完,异步执行ActiveStandbyElector#processResult方法:
public class ActiveStandbyElector implements StatCallback, StringCallback
public synchronized void processResult(int rc, String path, Object ctx,
String name)
//省略
if (isSuccess(code))
// we successfully created the znode. we are the leader. start monitoring
//尝试进入Active状态
if (becomeActive())
//验证
monitorActiveStatus();
else
//否则重新尝试创建zookeeper节点,以获得Active状态
reJoinElectionAfterFailureToBecomeActive();
return;
//如果创建节点失败,但是节点已经存在,就进入standby状态
if (isNodeExists(code))
if (createRetryCount == 0)
becomeStandby();
monitorActiveStatus();
return;
//如果创建节点失败,节点尚未存在,就重试
if (shouldRetry(code))
if (createRetryCount < maxRetryNum)
++createRetryCount;
createLockNodeAsync();
return;
//省略
5.6 Active Service 启动
正常情况下,调用ActiveStandbyElector#becomeActive方法使ResourceManager进入active状态:
public class ActiveStandbyElector implements StatCallback, StringCallback
private boolean becomeActive()
//省略
appClient.becomeActive();
//省略
appClient正是初始化ActiveStandbyElector对象时传入的ActiveStandbyElectorBasedElectorService实例:
public class ActiveStandbyElectorBasedElectorService extends AbstractService
implements EmbeddedElector,
ActiveStandbyElector.ActiveStandbyElectorCallback
public void becomeActive() throws ServiceFailedException
cancelDisconnectTimer();
try
rm.getRMContext().getRMAdminService().transitionToActive(req);
catch (Exception e)
throw new ServiceFailedException("RM could not transition to Active", e);
调用AdminService#transitionToActive使当前ResourceManager进入Active状态:
public class AdminService extends CompositeService implements HAServiceProtocol, ResourceManagerAdministrationProtocol
public synchronized void transitionToActive(
HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException
//省略
rm.transitionToActive();
//省略
AdminService内部调用ResourceManager#startActiveServices方法使ResourceManager进入active状态:
public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean
synchronized void transitionToActive() throws Exception
if (rmContext.getHAServiceState() == HAServiceProtocol.HAServiceState.ACTIVE)
LOG.info("Already in active state");
return;
LOG.info("Transitioning to active state");
this.rmLoginUGI.doAs(new PrivilegedExceptionAction<Void>()
@Override
public Void run() throws Exception
try
startActiveServices();
return null;
catch (Exception e)
reinitialize(true);
throw e;
);
rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.ACTIVE);
LOG.info("Transitioned to active state");
ResourceManager#startActiveServices真正启动active services服务:
public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean
void startActiveServices() throws Exception
if (activeServices != null)
clusterTimeStamp = System.currentTimeMillis();
activeServices.start();
5.7 切换standby状态
当选举失败时,ResourceManager会进入standby状态;如果此时ResourceManager已经处于active状态,会停止RMActiveServices服务:
public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean
synchronized void transitionToStandby(boolean initialize) throws Exception
if (rmContext.getHAServiceState() == HAServiceProtocol.HAServiceState.STANDBY)
LOG.info("Already in standby state");
return;
LOG.info("Transitioning to standby state");
HAServiceState state = rmContext.getHAServiceState();
rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY);
if (state == HAServiceProtocol.HAServiceState.ACTIVE)
stopActiveServices();
reinitialize(initialize);
LOG.info("Transitioned to standby state");
6. 总结
RMActiveServices不属于ResourceManager的子服务,初始化/启动/停止流程都独立于ResourceManager子服务流程:
- 启动流程由ActiveStandbyElectorBasedElectorService选举服务负责。
- 状态切换流程由Zookeeper监听器服务实现。
- 初始化/停止流程由ResourceManager的额外的方法调用实现。
以上是关于Yarn Active ResourceManager启动框架分析的主要内容,如果未能解决你的问题,请参考以下文章
Yarn Active ResourceManager启动框架分析
Hadoop运行模式群起集群配置workers启动集群启动HDFS拼接Web端查看HDFS的NameNodeWeb端查看YARN的ResourceManager