Flink的高可用原理

Posted 我不需要这个昵称

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink的高可用原理相关的知识,希望对你有一定的参考价值。

本篇原理是对老版的Flink高可用原理的一篇更新

1. 基于ZK的高可用

1.1 创建高可用的服务

ClusterEntrypoint.java

haServices = createHaServices(configuration, ioExecutor, rpcSystem);

有次可见,在Flink当前版本中,若使用基于Zookeeper的HA模式,此时haServices的具体实现类为:ZooKeeperMultipleComponentLeaderElectionHaServices

1.2 选举服务

进入

clusterComponent =
    dispatcherResourceManagerComponentFactory.create(
        configuration,
        resourceId.unwrap(),
        ioExecutor,
        commonRpcService,
        haServices,
        blobServer,
        heartbeatServices,
        delegationTokenManager,
        metricRegistry,
        executionGraphInfoStore,
        new RpcMetricQueryServiceRetriever(
            metricRegistry.getMetricQueryServiceRpcService()),
        this);

DefaultDispatcherResourceManagerComponentFactory.java

1.2.1 创建选举服务&高可用服务工厂

DefaultDispatcherResourceManagerComponentFactory.java

webMonitorEndpoint =
    restEndpointFactory.createRestEndpoint(
        configuration,
        dispatcherGatewayRetriever,
        resourceManagerGatewayRetriever,
        blobServer,
        executor,
        metricFetcher,
        highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
        fatalErrorHandler);

highAvailabilityServices.getClusterRestEndpointLeaderElectionService()
highAvailabilityServices具体实现是ZooKeeperMultipleComponentLeaderElectionHaServices的对象
进入getClusterRestEndpointLeaderElectionService()


AbstractHaServices.java

public LeaderElectionService getClusterRestEndpointLeaderElectionService() 
    return createLeaderElectionService(getLeaderPathForRestServer());

进入createLeaderElectionService(getLeaderPathForRestServer())

ZooKeeperMultipleComponentLeaderElectionHaServices.java

protected LeaderElectionService createLeaderElectionService(String leaderName) 
    return new DefaultLeaderElectionService(
    getOrInitializeSingleLeaderElectionService().createDriverFactory(leaderName));

因此,对于webMonitorEndpoint对象来说,leaderElectionService的具体实现是DefaultLeaderElectionService的对象。

进入:getOrInitializeSingleLeaderElectionService().createDriverFactory(leaderName))

DefaultMultipleComponentLeaderElectionService.java

public LeaderElectionDriverFactory createDriverFactory(String componentId) 
    return new MultipleComponentLeaderElectionDriverAdapterFactory(componentId, this);

进入:new DefaultLeaderElectionService(getOrInitializeSingleLeaderElectionService().createDriverFactory(leaderName))

DefaultLeaderElectionService.java

public DefaultLeaderElectionService(LeaderElectionDriverFactory leaderElectionDriverFactory) 
    this.leaderElectionDriverFactory = checkNotNull(leaderElectionDriverFactory);

    this.leaderContender = null;

    this.issuedLeaderSessionID = null;

    this.leaderElectionDriver = null;

    this.confirmedLeaderInformation = LeaderInformation.empty();

    this.running = false;

因此在DefaultLeaderElectionService中,成员变量leaderElectionDriverFactory即为MultipleComponentLeaderElectionDriverAdapterFactory的实例对象

1.2.2 选举&写ZK

DefaultDispatcherResourceManagerComponentFactory.java

webMonitorEndpoint.start();

进入webMonitorEndpoint.start();

RestServerEndpoint.java

public final void start() throws Exception 
    synchronized (lock) 
    Preconditions.checkState(
        state == State.CREATED, "The RestServerEndpoint cannot be restarted.");

    log.info("Starting rest endpoint.");

    final Router router = new Router();
    final CompletableFuture<String> restAddressFuture = new CompletableFuture<>();

    handlers = initializeHandlers(restAddressFuture);

    /* sort the handlers such that they are ordered the following:
* /jobs
* /jobs/overview
* /jobs/:jobid
* /jobs/:jobid/config
* /:*
*/
    Collections.sort(handlers, RestHandlerUrlComparator.INSTANCE);

    checkAllEndpointsAndHandlersAreUnique(handlers);
    handlers.forEach(handler -> registerHandler(router, handler, log));

    ChannelInitializer<SocketChannel> initializer =
        new ChannelInitializer<SocketChannel>() 

            @Override
            protected void initChannel(SocketChannel ch) throws ConfigurationException 
                RouterHandler handler = new RouterHandler(router, responseHeaders);

                // SSL should be the first handler in the pipeline
                if (isHttpsEnabled()) 
                    ch.pipeline()
                        .addLast(
                            "ssl",
                            new RedirectingSslHandler(
                                restAddress,
                                restAddressFuture,
                                sslHandlerFactory));
                

                ch.pipeline()
                    .addLast(new HttpServerCodec())
                    .addLast(new FileUploadHandler(uploadDir))
                    .addLast(
                        new FlinkHttpObjectAggregator(
                            maxContentLength, responseHeaders));

                for (InboundChannelHandlerFactory factory :
                     inboundChannelHandlerFactories) 
                    Optional<ChannelHandler> channelHandler =
                        factory.createHandler(configuration, responseHeaders);
                    if (channelHandler.isPresent()) 
                        ch.pipeline().addLast(channelHandler.get());
                    
                

                ch.pipeline()
                    .addLast(new ChunkedWriteHandler())
                    .addLast(handler.getName(), handler)
                    .addLast(new PipelineErrorHandler(log, responseHeaders));
            
        ;

    NioEventLoopGroup bossGroup =
        new NioEventLoopGroup(
            1, new ExecutorThreadFactory("flink-rest-server-netty-boss"));
    NioEventLoopGroup workerGroup =
        new NioEventLoopGroup(
            0, new ExecutorThreadFactory("flink-rest-server-netty-worker"));

    bootstrap = new ServerBootstrap();
    bootstrap
        .group(bossGroup, workerGroup)
        .channel(NioserverSocketChannel.class)
        .childHandler(initializer);

    Iterator<Integer> portsIterator;
    try 
        portsIterator = NetUtils.getPortRangeFromString(restBindPortRange);
     catch (IllegalConfigurationException e) 
        throw e;
     catch (Exception e) 
        throw new IllegalArgumentException(
            "Invalid port range definition: " + restBindPortRange);
    

    int chosenPort = 0;
    while (portsIterator.hasNext()) 
        try 
            chosenPort = portsIterator.next();
            final ChannelFuture channel;
            if (restBindAddress == null) 
                channel = bootstrap.bind(chosenPort);
             else 
                channel = bootstrap.bind(restBindAddress, chosenPort);
            
            serverChannel = channel.syncUninterruptibly().channel();
            break;
         catch (final Exception e) 
            // syncUninterruptibly() throws checked exceptions via Unsafe
            // continue if the exception is due to the port being in use, fail early
            // otherwise
            if (!(e instanceof java.net.BindException)) 
                throw e;
            
        
    

    if (serverChannel == null) 
        throw new BindException(
            "Could not start rest endpoint on any port in port range "
            + restBindPortRange);
    

    log.debug("Binding rest endpoint to :.", restBindAddress, chosenPort);

    final InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
    final String advertisedAddress;
    if (bindAddress.getAddress().isAnyLocalAddress()) 
        advertisedAddress = this.restAddress;
     else 
        advertisedAddress = bindAddress.getAddress().getHostAddress();
    

    port = bindAddress.getPort();

    log.info("Rest endpoint listening at :", advertisedAddress, port);

    restBaseUrl = new URL(determineProtocol(), advertisedAddress, port, "").toString();

    restAddressFuture.complete(restBaseUrl);

    state = State.RUNNING;

    startInternal();


进入:startInternal();


WebMonitorEndpoint.java

public void startInternal() throws Exception 
    leaderElectionService.start(this);
    startExecutionGraphCacheCleanupTask();

    if (hasWebUI) 
        log.info("Web frontend listening at .", getRestBaseUrl());
    

进入:leaderElectionService.start(this)


DefaultLeaderElectionService.java

public final void start(LeaderContender contender) throws Exception 
    checkNotNull(contender, "Contender must not be null.");
    Preconditions.checkState(leaderContender == null, "Contender was already set.");

    synchronized (lock) 
        running = true;
        leaderContender = contender;
        leaderElectionDriver =
                leaderElectionDriverFactory.createLeaderElectionDriver(
                        this,
                        new LeaderElectionFatalErrorHandler(),
                        leaderContender.getDescription());
        LOG.info("Starting DefaultLeaderElectionService with .", leaderElectionDriver);
    

进入:leaderElectionDriverFactory.createLeaderElectionDriver()


MultipleComponentLeaderElectionDriverAdapterFactory.java

public LeaderElectionDriver createLeaderElectionDriver(
        LeaderElectionEventHandler leaderEventHandler,
        FatalErrorHandler fatalErrorHandler,
        String leaderContenderDescription)
        throws Exception 
    return new MultipleComponentLeaderElectionDriverAdapter(
            leaderName, singleLeaderElectionService, leaderEventHandler);

进入:new MultipleComponentLeaderElectionDriverAdapter(leaderName, singleLeaderElectionService, leaderEventHandler)

MultipleComponentLeaderElectionDriverAdapter.java

MultipleComponentLeaderElectionDriverAdapter(
        String componentId,
        MultipleComponentLeaderElectionService multipleComponentLeaderElectionService,
        LeaderElectionEventHandler leaderElectionEventHandler) 
    this.componentId = Preconditions.checkNotNull(componentId);
    this.multipleComponentLeaderElectionService =
            Preconditions.checkNotNull(multipleComponentLeaderElectionService);

    multipleComponentLeaderElectionService.registerLeaderElectionEventHandler(
            this.componentId, leaderElectionEventHandler);

进入:multipleComponentLeaderElectionService.registerLeaderElectionEventHandler(this.componentId, leaderElectionEventHandler)

DefaultMultipleComponentLeaderElectionService.java

public void registerLeaderElectionEventHandler(
        String componentId, LeaderElectionEventHandler leaderElectionEventHandler) 

    synchronized (lock) 
        Preconditions.checkArgument(
                !leaderElectionEventHandlers.containsKey(componentId),
                "Do not support duplicate LeaderElectionEventHandler registration under %s",
                componentId);
        leaderElectionEventHandlers.put(componentId, leaderElectionEventHandler);

        if (currentLeaderSessionId != null) 
            final UUID leaderSessionId = currentLeaderSessionId;
            leadershipOperationExecutor.execute(
                    () -> leaderElectionEventHandler.onGrantLeadership(leaderSessionId));
        
    

进入:leaderElectionEventHandler.onGrantLeadership(leaderSessionId))


DefaultLeaderElectionService.java

public void onGrantLeadership(UUID newLeaderSessionId) 
    synchronized (lock) 
        if (running) 
            issuedLeaderSessionID = newLeaderSessionId;
            clearConfirmedLeaderInformation();

            if (LOG.isDebugEnabled()) 
                LOG.debug(
                        "Grant leadership to contender  with session ID .",
                        leaderContender.getDescription(),
                        issuedLeaderSessionID);
            

            leaderContender.grantLeadership(issuedLeaderSessionID);
         else 
            if (LOG.isDebugEnabled()) 
                LOG.debug(
                        "Ignoring the grant leadership notification since the  has "
                                + "already been closed.",
                        leaderElectionDriver);
            
        
    

进入:leaderContender.grantLeadership(issuedLeaderSessionID)


WebMonitorEndpoint.java

public void grantLeadership(final UUID leaderSessionID) 
    log.info(
            " was granted leadership with leaderSessionID=",
            getRestBaseUrl(),
            leaderSessionID);
    leaderElectionService.confirmLeadership(leaderSessionID, getRestBaseUrl());

进入:leaderElectionService.confirmLeadership(leaderSessionID, getRestBaseUrl())


DefaultLeaderElectionService.java

public void confirmLeadership(UUID leaderSessionID, String leaderAddress) 
    if (LOG.isDebugEnabled()) 
        LOG.debug(
                "Confirm leader session ID  for leader .", leaderSessionID, leaderAddress);
    

    checkNotNull(leaderSessionID);

    synchronized (lock) 
        if (hasLeadership(leaderSessionID)) 
            if (running) 
                confirmLeaderInformation(leaderSessionID, leaderAddress);
             else 
                if (LOG.isDebugEnabled()) 
                    LOG.debug(
                            "Ignoring the leader session Id  confirmation, since the "
                                    + "LeaderElectionService has already been stopped.",
                            leaderSessionID);
                
            
         else 
            // Received an old confirmation call
            if (!leaderSessionID.equals(this.issuedLeaderSessionID)) 
                if (LOG.isDebugEnabled()) 
                    LOG.debug(
                            "Receive an old confirmation call of leader session ID , "
                                    + "current issued session ID is ",
                            leaderSessionID,
                            issuedLeaderSessionID);
                
             else 
                LOG.warn(
                        "The leader session ID  was confirmed even though the "
                                +

2.Flink安装部署Local本地模式-了解Standalone独立集群模式Standalone-HA高可用集群模式(原理|操作|测试)

本文来自:Flink1.12-2021黑马程序员贺岁视频

2.Flink安装部署
2.1.Local本地模式-了解
2.1.1.原理
2.1.2.操作
2.1.3.测试
2.2.Standalone独立集群模式
2.2.1.原理
2.2.2.操作
2.2.3.测试
2.3.Standalone-HA高可用集群模式
2.3.1.原理
2.3.2.操作
2.3.3.测试

2.Flink安装部署

2.1.Local本地模式-了解

2.1.1.原理

1、Flink程序由JobClient进行提交。
2、JobClient将作业提交给JobManager
3、JobManager负责协调资源分配和作业执行。资源分配完成后,任务将提交给相应的TaskManager
4、TaskManager启动一个线程以开始执行。TaskManager会向JobManager报告状态更改,如开始执行,正在进行或已完成。
5、作业执行完成后,结果将发送回客户端(JobClient)

2.1.2.操作

1.下载安装包
https://archive.apache.org/dist/flink/

2.上传flink-1.12.0-bin-scala_2.12.tgz到node1的指定目录

3.解压
tar -zxvf flink-1.12.0-bin-scala_2.12.tgz

4.如果出现权限问题,需要修改权限
chown -R root:root /export/server/flink-1.12.0

5.改名或创建软链接
mv flink-1.12.0 flink
ln -s /export/server/flink-1.12.0 /export/server/flink

2.1.3.测试

1.准备文件/root/words.txt

vim /root/words.txt
hello me you her
hello me you
hello me
hello

2.启动Flink本地”集群”

/export/server/flink/bin/start-cluster.sh

3.使用jps可以查看到下面两个进程

- TaskManagerRunner
- StandaloneSessionClusterEntrypoint

4.访问Flink的Web UI
http://node1:8081/#/overview

slot在Flink里面可以认为是资源组,Flink是通过将任务分成子任务并且将这些子任务分配到slot来并行执行程序。

5.执行官方示例

/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar --input /root/words.txt --output /root/out

6.停止Flink

/export/server/flink/bin/stop-cluster.sh

启动shell交互式窗口(目前所有Scala2.12版本的安装包暂时都不支持Scala Shell)

/export/server/flink/bin/start-scala-shell.sh local

执行如下命令:

benv.readTextFile("/root/words.txt").flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1).print()

退出shell

:quit

2.2.Standalone独立集群模式

2.2.1.原理

2.2.2.操作

1.集群规划

  • 服务器: node1(Master + Slave): JobManager + TaskManager
  • 服务器: node2(Slave): TaskManager
  • 服务器: node3(Slave): TaskManager

2.修改flink-conf.yaml

vim /export/server/flink/conf/flink-conf.yaml
jobmanager.rpc.address: node1
taskmanager.numberOfTaskSlots: 2
web.submit.enable: true

#历史服务器
jobmanager.archive.fs.dir: hdfs://node1:8020/flink/completed-jobs/
historyserver.web.address: node1
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://node1:8020/flink/completed-jobs/

3.修改masters

vim /export/server/flink/conf/masters

node1:8081

4.修改slaves

vim /export/server/flink/conf/workers
node1
node2
node3

5.添加HADOOP_CONF_DIR环境变量

vim /etc/profile
export HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop

6.分发

scp -r /export/server/flink node2:/export/server/flink
scp -r /export/server/flink node3:/export/server/flink
scp /etc/profile node2:/etc/profile
scp /etc/profile node3:/etc/profile

for i in 2..3; do scp -r flink node$i:$PWD; done

7.source

source /etc/profile

2.2.3.测试

1.启动集群,在node1上执行如下命令

/export/server/flink/bin/start-cluster.sh

或者单独启动

/export/server/flink/bin/jobmanager.sh ((start|start-foreground) cluster)|stop|stop-all
/export/server/flink/bin/taskmanager.sh start|start-foreground|stop|stop-all

2.启动历史服务器

/export/server/flink/bin/historyserver.sh start

3.访问Flink UI界面或使用jps查看

http://node1:8081/#/overview
http://node1:8082/#/overview

4.执行官方测试案例

/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar

5.停止Flink集群

/export/server/flink/bin/stop-cluster.sh

2.3.Standalone-HA高可用集群模式

2.3.1.原理

2.3.2.操作

1.集群规划

  • 服务器: node1(Master + Slave): JobManager + TaskManager
  • 服务器: node2(Master + Slave): JobManager + TaskManager
  • 服务器: node3(Slave): TaskManager

2.启动ZooKeeper

zkServer.sh status
zkServer.sh stop
zkServer.sh start

3.启动HDFS

/export/serves/hadoop/sbin/start-dfs.sh 

4.停止Flink集群

/export/server/flink/bin/stop-cluster.sh

5.修改flink-conf.yaml

vim /export/server/flink/conf/flink-conf.yaml

增加如下内容

state.backend: filesystem
state.backend.fs.checkpointdir: hdfs://node1:8020/flink-checkpoints
high-availability: zookeeper
high-availability.storageDir: hdfs://node1:8020/flink/ha/
high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181

6.修改masters

vim /export/server/flink/conf/masters

7.同步

scp -r /export/server/flink/conf/flink-conf.yaml node2:/export/server/flink/conf/
scp -r /export/server/flink/conf/flink-conf.yaml node3:/export/server/flink/conf/
scp -r /export/server/flink/conf/masters node2:/export/server/flink/conf/
scp -r /export/server/flink/conf/masters node3:/export/server/flink/conf/

8.修改node2上的flink-conf.yaml

vim /export/server/flink/conf/flink-conf.yaml
jobmanager.rpc.address: node2

9.重新启动Flink集群,node1上执行

/export/server/flink/bin/stop-cluster.sh
/export/server/flink/bin/start-cluster.sh


10.使用jps命令查看
发现没有Flink相关进程被启动

11.查看日志

cat /export/server/flink/log/flink-root-standalonesession-0-node1.log

发现如下错误

因为在Flink1.8版本后,Flink官方提供的安装包里没有整合HDFS的jar

12.下载jar包并在Flink的lib目录下放入该jar包并分发使Flink能够支持对Hadoop的操作
下载地址

https://flink.apache.org/downloads.html

13.放入lib目录

cd /export/server/flink/lib


14.分发

for i in 2..3; do scp -r flink-shaded-hadoop-2-uber-2.7.5-10.0.jar node$i:$PWD; done

15.重新启动Flink集群,node1上执行

/export/server/flink/bin/stop-cluster.sh
/export/server/flink/bin/start-cluster.sh

16.使用jps命令查看,发现三台机器已经

2.3.3.测试

1.访问WebUI

http://node1:8081/#/job-manager/config
http://node2:8081/#/job-manager/config

2.执行wc

/export/server/flink/bin/flink run  /export/server/flink/examples/batch/WordCount.jar

3.kill掉其中一个master

4.重新执行wc,还是可以正常执行

/export/server/flink/bin/flink run  /export/server/flink/examples/batch/WordCount.jar

5.停止集群

/export/server/flink/bin/stop-cluster.sh	

以上是关于Flink的高可用原理的主要内容,如果未能解决你的问题,请参考以下文章

面试题

Flink内核原理学习内存模型

Flink内核原理学习内存模型

Flink原理学习状态

Flink原理学习状态

Flink原理学习状态