Flink的高可用原理
Posted 我不需要这个昵称
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink的高可用原理相关的知识,希望对你有一定的参考价值。
本篇原理是对老版的Flink高可用原理的一篇更新
1. 基于ZK的高可用
1.1 创建高可用的服务
ClusterEntrypoint.java
haServices = createHaServices(configuration, ioExecutor, rpcSystem);
有次可见,在Flink当前版本中,若使用基于Zookeeper的HA模式,此时haServices的具体实现类为:ZooKeeperMultipleComponentLeaderElectionHaServices
1.2 选举服务
进入
clusterComponent =
dispatcherResourceManagerComponentFactory.create(
configuration,
resourceId.unwrap(),
ioExecutor,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
delegationTokenManager,
metricRegistry,
executionGraphInfoStore,
new RpcMetricQueryServiceRetriever(
metricRegistry.getMetricQueryServiceRpcService()),
this);
DefaultDispatcherResourceManagerComponentFactory.java
1.2.1 创建选举服务&高可用服务工厂
DefaultDispatcherResourceManagerComponentFactory.java
webMonitorEndpoint =
restEndpointFactory.createRestEndpoint(
configuration,
dispatcherGatewayRetriever,
resourceManagerGatewayRetriever,
blobServer,
executor,
metricFetcher,
highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
fatalErrorHandler);
highAvailabilityServices.getClusterRestEndpointLeaderElectionService()
highAvailabilityServices具体实现是ZooKeeperMultipleComponentLeaderElectionHaServices的对象
进入getClusterRestEndpointLeaderElectionService()
AbstractHaServices.java
public LeaderElectionService getClusterRestEndpointLeaderElectionService()
return createLeaderElectionService(getLeaderPathForRestServer());
进入createLeaderElectionService(getLeaderPathForRestServer())
ZooKeeperMultipleComponentLeaderElectionHaServices.java
protected LeaderElectionService createLeaderElectionService(String leaderName)
return new DefaultLeaderElectionService(
getOrInitializeSingleLeaderElectionService().createDriverFactory(leaderName));
因此,对于webMonitorEndpoint对象来说,leaderElectionService的具体实现是DefaultLeaderElectionService的对象。
进入:getOrInitializeSingleLeaderElectionService().createDriverFactory(leaderName))
DefaultMultipleComponentLeaderElectionService.java
public LeaderElectionDriverFactory createDriverFactory(String componentId)
return new MultipleComponentLeaderElectionDriverAdapterFactory(componentId, this);
进入:new DefaultLeaderElectionService(getOrInitializeSingleLeaderElectionService().createDriverFactory(leaderName))
DefaultLeaderElectionService.java
public DefaultLeaderElectionService(LeaderElectionDriverFactory leaderElectionDriverFactory)
this.leaderElectionDriverFactory = checkNotNull(leaderElectionDriverFactory);
this.leaderContender = null;
this.issuedLeaderSessionID = null;
this.leaderElectionDriver = null;
this.confirmedLeaderInformation = LeaderInformation.empty();
this.running = false;
因此在DefaultLeaderElectionService中,成员变量leaderElectionDriverFactory即为MultipleComponentLeaderElectionDriverAdapterFactory的实例对象
1.2.2 选举&写ZK
DefaultDispatcherResourceManagerComponentFactory.java
webMonitorEndpoint.start();
进入webMonitorEndpoint.start();
RestServerEndpoint.java
public final void start() throws Exception
synchronized (lock)
Preconditions.checkState(
state == State.CREATED, "The RestServerEndpoint cannot be restarted.");
log.info("Starting rest endpoint.");
final Router router = new Router();
final CompletableFuture<String> restAddressFuture = new CompletableFuture<>();
handlers = initializeHandlers(restAddressFuture);
/* sort the handlers such that they are ordered the following:
* /jobs
* /jobs/overview
* /jobs/:jobid
* /jobs/:jobid/config
* /:*
*/
Collections.sort(handlers, RestHandlerUrlComparator.INSTANCE);
checkAllEndpointsAndHandlersAreUnique(handlers);
handlers.forEach(handler -> registerHandler(router, handler, log));
ChannelInitializer<SocketChannel> initializer =
new ChannelInitializer<SocketChannel>()
@Override
protected void initChannel(SocketChannel ch) throws ConfigurationException
RouterHandler handler = new RouterHandler(router, responseHeaders);
// SSL should be the first handler in the pipeline
if (isHttpsEnabled())
ch.pipeline()
.addLast(
"ssl",
new RedirectingSslHandler(
restAddress,
restAddressFuture,
sslHandlerFactory));
ch.pipeline()
.addLast(new HttpServerCodec())
.addLast(new FileUploadHandler(uploadDir))
.addLast(
new FlinkHttpObjectAggregator(
maxContentLength, responseHeaders));
for (InboundChannelHandlerFactory factory :
inboundChannelHandlerFactories)
Optional<ChannelHandler> channelHandler =
factory.createHandler(configuration, responseHeaders);
if (channelHandler.isPresent())
ch.pipeline().addLast(channelHandler.get());
ch.pipeline()
.addLast(new ChunkedWriteHandler())
.addLast(handler.getName(), handler)
.addLast(new PipelineErrorHandler(log, responseHeaders));
;
NioEventLoopGroup bossGroup =
new NioEventLoopGroup(
1, new ExecutorThreadFactory("flink-rest-server-netty-boss"));
NioEventLoopGroup workerGroup =
new NioEventLoopGroup(
0, new ExecutorThreadFactory("flink-rest-server-netty-worker"));
bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup)
.channel(NioserverSocketChannel.class)
.childHandler(initializer);
Iterator<Integer> portsIterator;
try
portsIterator = NetUtils.getPortRangeFromString(restBindPortRange);
catch (IllegalConfigurationException e)
throw e;
catch (Exception e)
throw new IllegalArgumentException(
"Invalid port range definition: " + restBindPortRange);
int chosenPort = 0;
while (portsIterator.hasNext())
try
chosenPort = portsIterator.next();
final ChannelFuture channel;
if (restBindAddress == null)
channel = bootstrap.bind(chosenPort);
else
channel = bootstrap.bind(restBindAddress, chosenPort);
serverChannel = channel.syncUninterruptibly().channel();
break;
catch (final Exception e)
// syncUninterruptibly() throws checked exceptions via Unsafe
// continue if the exception is due to the port being in use, fail early
// otherwise
if (!(e instanceof java.net.BindException))
throw e;
if (serverChannel == null)
throw new BindException(
"Could not start rest endpoint on any port in port range "
+ restBindPortRange);
log.debug("Binding rest endpoint to :.", restBindAddress, chosenPort);
final InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
final String advertisedAddress;
if (bindAddress.getAddress().isAnyLocalAddress())
advertisedAddress = this.restAddress;
else
advertisedAddress = bindAddress.getAddress().getHostAddress();
port = bindAddress.getPort();
log.info("Rest endpoint listening at :", advertisedAddress, port);
restBaseUrl = new URL(determineProtocol(), advertisedAddress, port, "").toString();
restAddressFuture.complete(restBaseUrl);
state = State.RUNNING;
startInternal();
进入:startInternal();
WebMonitorEndpoint.java
public void startInternal() throws Exception
leaderElectionService.start(this);
startExecutionGraphCacheCleanupTask();
if (hasWebUI)
log.info("Web frontend listening at .", getRestBaseUrl());
进入:leaderElectionService.start(this)
DefaultLeaderElectionService.java
public final void start(LeaderContender contender) throws Exception
checkNotNull(contender, "Contender must not be null.");
Preconditions.checkState(leaderContender == null, "Contender was already set.");
synchronized (lock)
running = true;
leaderContender = contender;
leaderElectionDriver =
leaderElectionDriverFactory.createLeaderElectionDriver(
this,
new LeaderElectionFatalErrorHandler(),
leaderContender.getDescription());
LOG.info("Starting DefaultLeaderElectionService with .", leaderElectionDriver);
进入:leaderElectionDriverFactory.createLeaderElectionDriver()
MultipleComponentLeaderElectionDriverAdapterFactory.java
public LeaderElectionDriver createLeaderElectionDriver(
LeaderElectionEventHandler leaderEventHandler,
FatalErrorHandler fatalErrorHandler,
String leaderContenderDescription)
throws Exception
return new MultipleComponentLeaderElectionDriverAdapter(
leaderName, singleLeaderElectionService, leaderEventHandler);
进入:new MultipleComponentLeaderElectionDriverAdapter(leaderName, singleLeaderElectionService, leaderEventHandler)
MultipleComponentLeaderElectionDriverAdapter.java
MultipleComponentLeaderElectionDriverAdapter(
String componentId,
MultipleComponentLeaderElectionService multipleComponentLeaderElectionService,
LeaderElectionEventHandler leaderElectionEventHandler)
this.componentId = Preconditions.checkNotNull(componentId);
this.multipleComponentLeaderElectionService =
Preconditions.checkNotNull(multipleComponentLeaderElectionService);
multipleComponentLeaderElectionService.registerLeaderElectionEventHandler(
this.componentId, leaderElectionEventHandler);
进入:multipleComponentLeaderElectionService.registerLeaderElectionEventHandler(this.componentId, leaderElectionEventHandler)
DefaultMultipleComponentLeaderElectionService.java
public void registerLeaderElectionEventHandler(
String componentId, LeaderElectionEventHandler leaderElectionEventHandler)
synchronized (lock)
Preconditions.checkArgument(
!leaderElectionEventHandlers.containsKey(componentId),
"Do not support duplicate LeaderElectionEventHandler registration under %s",
componentId);
leaderElectionEventHandlers.put(componentId, leaderElectionEventHandler);
if (currentLeaderSessionId != null)
final UUID leaderSessionId = currentLeaderSessionId;
leadershipOperationExecutor.execute(
() -> leaderElectionEventHandler.onGrantLeadership(leaderSessionId));
进入:leaderElectionEventHandler.onGrantLeadership(leaderSessionId))
DefaultLeaderElectionService.java
public void onGrantLeadership(UUID newLeaderSessionId)
synchronized (lock)
if (running)
issuedLeaderSessionID = newLeaderSessionId;
clearConfirmedLeaderInformation();
if (LOG.isDebugEnabled())
LOG.debug(
"Grant leadership to contender with session ID .",
leaderContender.getDescription(),
issuedLeaderSessionID);
leaderContender.grantLeadership(issuedLeaderSessionID);
else
if (LOG.isDebugEnabled())
LOG.debug(
"Ignoring the grant leadership notification since the has "
+ "already been closed.",
leaderElectionDriver);
进入:leaderContender.grantLeadership(issuedLeaderSessionID)
WebMonitorEndpoint.java
public void grantLeadership(final UUID leaderSessionID)
log.info(
" was granted leadership with leaderSessionID=",
getRestBaseUrl(),
leaderSessionID);
leaderElectionService.confirmLeadership(leaderSessionID, getRestBaseUrl());
进入:leaderElectionService.confirmLeadership(leaderSessionID, getRestBaseUrl())
DefaultLeaderElectionService.java
public void confirmLeadership(UUID leaderSessionID, String leaderAddress)
if (LOG.isDebugEnabled())
LOG.debug(
"Confirm leader session ID for leader .", leaderSessionID, leaderAddress);
checkNotNull(leaderSessionID);
synchronized (lock)
if (hasLeadership(leaderSessionID))
if (running)
confirmLeaderInformation(leaderSessionID, leaderAddress);
else
if (LOG.isDebugEnabled())
LOG.debug(
"Ignoring the leader session Id confirmation, since the "
+ "LeaderElectionService has already been stopped.",
leaderSessionID);
else
// Received an old confirmation call
if (!leaderSessionID.equals(this.issuedLeaderSessionID))
if (LOG.isDebugEnabled())
LOG.debug(
"Receive an old confirmation call of leader session ID , "
+ "current issued session ID is ",
leaderSessionID,
issuedLeaderSessionID);
else
LOG.warn(
"The leader session ID was confirmed even though the "
+2.Flink安装部署Local本地模式-了解Standalone独立集群模式Standalone-HA高可用集群模式(原理|操作|测试)
本文来自:Flink1.12-2021黑马程序员贺岁视频
2.Flink安装部署
2.1.Local本地模式-了解
2.1.1.原理
2.1.2.操作
2.1.3.测试
2.2.Standalone独立集群模式
2.2.1.原理
2.2.2.操作
2.2.3.测试
2.3.Standalone-HA高可用集群模式
2.3.1.原理
2.3.2.操作
2.3.3.测试
2.Flink安装部署
2.1.Local本地模式-了解
2.1.1.原理
1、Flink程序由JobClient进行提交。
2、JobClient将作业提交给JobManager
3、JobManager负责协调资源分配和作业执行。资源分配完成后,任务将提交给相应的TaskManager
4、TaskManager启动一个线程以开始执行。TaskManager会向JobManager报告状态更改,如开始执行,正在进行或已完成。
5、作业执行完成后,结果将发送回客户端(JobClient)
2.1.2.操作
1.下载安装包
https://archive.apache.org/dist/flink/
2.上传flink-1.12.0-bin-scala_2.12.tgz到node1的指定目录
3.解压
tar -zxvf flink-1.12.0-bin-scala_2.12.tgz
4.如果出现权限问题,需要修改权限
chown -R root:root /export/server/flink-1.12.0
5.改名或创建软链接
mv flink-1.12.0 flink
ln -s /export/server/flink-1.12.0 /export/server/flink
2.1.3.测试
1.准备文件/root/words.txt
vim /root/words.txt
hello me you her
hello me you
hello me
hello
2.启动Flink本地”集群”
/export/server/flink/bin/start-cluster.sh
3.使用jps可以查看到下面两个进程
- TaskManagerRunner
- StandaloneSessionClusterEntrypoint
4.访问Flink的Web UI
http://node1:8081/#/overview
slot在Flink里面可以认为是资源组,Flink是通过将任务分成子任务并且将这些子任务分配到slot来并行执行程序。
5.执行官方示例
/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar --input /root/words.txt --output /root/out
6.停止Flink
/export/server/flink/bin/stop-cluster.sh
启动shell交互式窗口(目前所有Scala2.12版本的安装包暂时都不支持Scala Shell)
/export/server/flink/bin/start-scala-shell.sh local
执行如下命令:
benv.readTextFile("/root/words.txt").flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1).print()
退出shell
:quit
2.2.Standalone独立集群模式
2.2.1.原理
2.2.2.操作
1.集群规划
- 服务器: node1(Master + Slave): JobManager + TaskManager
- 服务器: node2(Slave): TaskManager
- 服务器: node3(Slave): TaskManager
2.修改flink-conf.yaml
vim /export/server/flink/conf/flink-conf.yaml
jobmanager.rpc.address: node1
taskmanager.numberOfTaskSlots: 2
web.submit.enable: true
#历史服务器
jobmanager.archive.fs.dir: hdfs://node1:8020/flink/completed-jobs/
historyserver.web.address: node1
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://node1:8020/flink/completed-jobs/
3.修改masters
vim /export/server/flink/conf/masters
node1:8081
4.修改slaves
vim /export/server/flink/conf/workers
node1
node2
node3
5.添加HADOOP_CONF_DIR环境变量
vim /etc/profile
export HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop
6.分发
scp -r /export/server/flink node2:/export/server/flink
scp -r /export/server/flink node3:/export/server/flink
scp /etc/profile node2:/etc/profile
scp /etc/profile node3:/etc/profile
或
for i in 2..3; do scp -r flink node$i:$PWD; done
7.source
source /etc/profile
2.2.3.测试
1.启动集群,在node1上执行如下命令
/export/server/flink/bin/start-cluster.sh
或者单独启动
/export/server/flink/bin/jobmanager.sh ((start|start-foreground) cluster)|stop|stop-all
/export/server/flink/bin/taskmanager.sh start|start-foreground|stop|stop-all
2.启动历史服务器
/export/server/flink/bin/historyserver.sh start
3.访问Flink UI界面或使用jps查看
http://node1:8081/#/overview
http://node1:8082/#/overview
4.执行官方测试案例
/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar
5.停止Flink集群
/export/server/flink/bin/stop-cluster.sh
2.3.Standalone-HA高可用集群模式
2.3.1.原理
2.3.2.操作
1.集群规划
- 服务器: node1(Master + Slave): JobManager + TaskManager
- 服务器: node2(Master + Slave): JobManager + TaskManager
- 服务器: node3(Slave): TaskManager
2.启动ZooKeeper
zkServer.sh status
zkServer.sh stop
zkServer.sh start
3.启动HDFS
/export/serves/hadoop/sbin/start-dfs.sh
4.停止Flink集群
/export/server/flink/bin/stop-cluster.sh
5.修改flink-conf.yaml
vim /export/server/flink/conf/flink-conf.yaml
增加如下内容
state.backend: filesystem
state.backend.fs.checkpointdir: hdfs://node1:8020/flink-checkpoints
high-availability: zookeeper
high-availability.storageDir: hdfs://node1:8020/flink/ha/
high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181
6.修改masters
vim /export/server/flink/conf/masters
7.同步
scp -r /export/server/flink/conf/flink-conf.yaml node2:/export/server/flink/conf/
scp -r /export/server/flink/conf/flink-conf.yaml node3:/export/server/flink/conf/
scp -r /export/server/flink/conf/masters node2:/export/server/flink/conf/
scp -r /export/server/flink/conf/masters node3:/export/server/flink/conf/
8.修改node2上的flink-conf.yaml
vim /export/server/flink/conf/flink-conf.yaml
jobmanager.rpc.address: node2
9.重新启动Flink集群,node1上执行
/export/server/flink/bin/stop-cluster.sh
/export/server/flink/bin/start-cluster.sh
10.使用jps命令查看
发现没有Flink相关进程被启动
11.查看日志
cat /export/server/flink/log/flink-root-standalonesession-0-node1.log
发现如下错误
因为在Flink1.8版本后,Flink官方提供的安装包里没有整合HDFS的jar
12.下载jar包并在Flink的lib目录下放入该jar包并分发使Flink能够支持对Hadoop的操作
下载地址
https://flink.apache.org/downloads.html
13.放入lib目录
cd /export/server/flink/lib
14.分发
for i in 2..3; do scp -r flink-shaded-hadoop-2-uber-2.7.5-10.0.jar node$i:$PWD; done
15.重新启动Flink集群,node1上执行
/export/server/flink/bin/stop-cluster.sh
/export/server/flink/bin/start-cluster.sh
16.使用jps命令查看,发现三台机器已经
2.3.3.测试
1.访问WebUI
http://node1:8081/#/job-manager/config
http://node2:8081/#/job-manager/config
2.执行wc
/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar
3.kill掉其中一个master
4.重新执行wc,还是可以正常执行
/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar
5.停止集群
/export/server/flink/bin/stop-cluster.sh
以上是关于Flink的高可用原理的主要内容,如果未能解决你的问题,请参考以下文章