Flink1.15源码解析--选举
Posted 宝哥大数据
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink1.15源码解析--选举相关的知识,希望对你有一定的参考价值。
文章目录
角色 | 说明 |
---|---|
LeaderContender(竞选者) | 需要选主的主体,比如dispatcher、resource manager。当选leader或者回收leader时进行回调 |
LeaderElectionService(竞选服务) | 负责选举的服务。a.关联leaderElectionDriver实现,具体驱动实现可以是zk、k8s等;在启动时初始化驱动类;b.实现LeaderElectionEventHandler调用 |
LeaderElectionDriver(竞选服务驱动) | 选主具体驱动实现。以ZooKeeperLeaderElectionDriver实现为例,其会实现LeaderLatchListener回调接口,在主节点变化和节点变化时得到监听,然后调用 LeaderElectionEventHandler |
LeaderElectionEventHandler(竞选服务的事件处理类) | leader竞选回调。被选为Leader会调用LeaderContender的grantLeadership方法进行不同实现主体的后续逻辑,比如DispatcherRunner的grantLeadership后会创建Dispatcher服务,启动DispatcherBootstrap |
LeaderRetrievalService(leader监听服务) | 监听leader的变化,传递给监听者。实现与LeaderElectionService类似,但作用不同。主要是接收Leader信息然后通知监听者LeaderRetrievalListener,监听者 |
LeaderRetriever(监听者) | 监听leader的监听者,实现了LeaderRetrievalListener接口。具体实现为RpcGatewayRetriever。RpcGatewayRetriever在回调中可以获取到leader的信息,创建akka的连接,生成aop代理类实例 |
RpcGateway | Rcp网关,实现类通过AOP的方式封装了akka层的细节,可以直接调用实现类方法实现akka通信 |
一、LeaderContender
其中 LeaderContender 接口主要在 leader 选举中使用,代表了参与leader竞争的角色,
其实现类有
- JobMasterServiceLeadershipRunner
- ResourceManager
- DefaultDispatcherRunner
- WebMonitorEndpoint
该接口中包含了两个重要的方法:
1. grantLeadership,表示leader竞选成功的回调方法
2. revokeLeadership,表示由leader变为非leader的回调方法
一个 服务需要进行选举, 在启动时,将自身作为竞争者,传递给了 leaderElectionService。
@Override
public void start() throws Exception
LOG.debug("Start leadership runner for job .", getJobID());
leaderElectionService.start(this);
二、LeaderElectionService
Leader选举服务 ,以其子类 DefaultLeaderElectionService 为例
2.1、LeaderElectionService
LeaderElectionService主要提供了参与选举竞争的接口调用以及竞争结果的查询接口:
2.2、LeaderElectionEventHandler(竞选服务的事件处理类)
leader竞选回调。被选为Leader会调用LeaderContender的grantLeadership方法进行不同实现主体的后续逻辑,比如DispatcherRunner的grantLeadership后会创建Dispatcher服务,启动DispatcherBootstrap
在开始参加 Leader 时(DefaultleaderElectionService::start ),会通过选举驱动器工厂创建一个 leaderElectionDriver,通过这个Driver工厂类,Flink 将基于 zookeeper 的 CuratorFramework 的细节,与 Flink 本身做了解耦
并将自身作为一个 LeaderElectionEventHandler 传入leaderElectionDriver。
@Override
public final void start(LeaderContender contender) throws Exception
checkNotNull(contender, "Contender must not be null.");
Preconditions.checkState(leaderContender == null, "Contender was already set.");
synchronized (lock)
running = true;
leaderContender = contender;
leaderElectionDriver =
leaderElectionDriverFactory.createLeaderElectionDriver(
this,
new LeaderElectionFatalErrorHandler(),
leaderContender.getDescription());
LOG.info("Starting DefaultLeaderElectionService with .", leaderElectionDriver);
三 、LeaderElectionDriver
LeaderElectionDriver 选主具体驱动实现。以ZooKeeperLeaderElectionDriver实现为例,其会实现 LeaderLatchListener 回调接口,在主节点变化和节点变化时得到监听,然后调用 LeaderElectionEventHandler
以其 子类实现 ZooKeeperLeaderElectionDriver 为例
通过工厂创建
// 通过工厂创建
@Override
public ZooKeeperLeaderElectionDriver createLeaderElectionDriver(
LeaderElectionEventHandler leaderEventHandler, // leaderElectionService 对象 作为 leaderEventHandler
FatalErrorHandler fatalErrorHandler,
String leaderContenderDescription)
throws Exception
return new ZooKeeperLeaderElectionDriver(
client, path, leaderEventHandler, fatalErrorHandler, leaderContenderDescription);
ZooKeeperLeaderElectionDriver 封装了 CuratorFramework 作为选举框架
/**
* Creates a ZooKeeperLeaderElectionDriver object.
*
* @param client Client which is connected to the ZooKeeper quorum
* @param path ZooKeeper node path for the leader election
* @param leaderElectionEventHandler Event handler for processing leader change events
* @param fatalErrorHandler Fatal error handler
* @param leaderContenderDescription Leader contender description
*/
public ZooKeeperLeaderElectionDriver(
CuratorFramework client,
String path,
LeaderElectionEventHandler leaderElectionEventHandler, // DefaultLeaderElectionService
FatalErrorHandler fatalErrorHandler,
String leaderContenderDescription)
throws Exception
checkNotNull(path);
this.client = checkNotNull(client);
this.connectionInformationPath = ZooKeeperUtils.generateConnectionInformationPath(path);
this.leaderElectionEventHandler = checkNotNull(leaderElectionEventHandler);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
this.leaderContenderDescription = checkNotNull(leaderContenderDescription);
leaderLatchPath = ZooKeeperUtils.generateLeaderLatchPath(path);
leaderLatch = new LeaderLatch(client, leaderLatchPath);
this.cache =
ZooKeeperUtils.createTreeCache(
client,
connectionInformationPath,
this::retrieveLeaderInformationFromZooKeeper);
running = true;
// 启动选举
leaderLatch.addListener(this); // 添加 LeaderLatchListener
leaderLatch.start();
cache.start();
client.getConnectionStateListenable().addListener(listener);
// org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch#start
public void start() throws Exception
Preconditions.checkState(this.state.compareAndSet(LeaderLatch.State.LATENT, LeaderLatch.State.STARTED), "Cannot be started more than once");
this.startTask.set(AfterConnectionEstablished.execute(this.client, new Runnable()
public void run()
try
LeaderLatch.this.internalStart();
finally
LeaderLatch.this.startTask.set((Object)null);
));
3.1、LeaderLatchListener
public interface LeaderLatchListener
// 选举成功回调方法
void isLeader();
void notLeader();
回调实现
@Override
public void isLeader()
// org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService#start 中
// DefaultLeaderElectionService 将 自己作为 一个 leaderElectionEventHandler
// 所以此处选举成功, 通过回调 通知 DefaultLeaderElectionService 选举leader
leaderElectionEventHandler.onGrantLeadership(UUID.randomUUID());
@Override
public void notLeader()
leaderElectionEventHandler.onRevokeLeadership();
DefaultLeaderElectionService 响应选举成功,回调 leaderContender.grantLeadership:
@Override
@GuardedBy("lock")
public void onGrantLeadership(UUID newLeaderSessionId)
synchronized (lock)
if (running)
issuedLeaderSessionID = newLeaderSessionId;
clearConfirmedLeaderInformation();
if (LOG.isDebugEnabled())
LOG.debug(
"Grant leadership to contender with session ID .",
leaderContender.getDescription(),
issuedLeaderSessionID);
// 主要逻辑 leaderContender,
leaderContender.grantLeadership(issuedLeaderSessionID);
else
if (LOG.isDebugEnabled())
LOG.debug(
"Ignoring the grant leadership notification since the has "
+ "already been closed.",
leaderElectionDriver);
以 WebMonitorEndpoint 为例
@Override
public void grantLeadership(final UUID leaderSessionID)
log.info(
" was granted leadership with leaderSessionID=",
getRestBaseUrl(),
leaderSessionID);
leaderElectionService.confirmLeadership(leaderSessionID, getRestBaseUrl());
参考:
https://www.modb.pro/db/107324
https://blog.csdn.net/qq_44836294/article/details/108022739
以上是关于Flink1.15源码解析--选举的主要内容,如果未能解决你的问题,请参考以下文章
Flink1.15源码解析--启动JobManager----ResourceManager启动