Flink 面试跳槽指南——带领你疏通航道

Posted 青冬

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 面试跳槽指南——带领你疏通航道相关的知识,希望对你有一定的参考价值。

Flink 面试跳槽指南(1)

作者:Hadi

时间:2022年2月7日

参考各种blog和官方文档,纯手打,如果差错请评论区见,或者提交到CSDN用户:https://blog.csdn.net/qq_36610426

如需转载,也请联系作者。
文章地址:https://blog.csdn.net/qq_36610426/article/details/122821112


Flink 基础

Flink 介绍

Flink是一个面向分布式数据流处理 和 批处理数据的开源计算引擎。用于对无界和有界的数据流进行有状态计算。可以部署在个中级群环境,对各种大小的数据规模进行快速内存计算。

Flink 和Spark Streaming的区别

Flink 和 Spark Streaming都是内存计算引擎,但Flink是标准的实时处理引擎,基于事件驱动。而Spark Streaming是微批(Micro-Batch)计算。

Flink更能对数据进行状态性的操作,设置有water mark等标识,实现的是真正的实时处理计算。

Flink的组件栈有哪些

自下而上,每一层都有具体的含义。

Deploy层

基座部署层,这一层主要是涉及到Flink的部署模式,上图我们可以看出,Flink支持在单个Single JVM的Local本地部署,也能基于集群模式的Standalone和on Yarn模式,也能基于云的GCE/EC2部署模式。

Core层

核心层只包含一个内容就是RunTime,提供了Flink的计算核心算法实现,其主要内容就是实时处理和分布式计算相关内容。为上层API层提供核心处理运算,JobGraph到ExecutionGraph的映射等等。

API层

接口层主要是实现了面向流和面向批处理API,其中面向流的就是DataStream API,面向批处理的就是DataSet API。往后这两个API会慢慢统一,合并趋近一致。

Libraries层

框架层,根据API的接口,在API层之上构建满足特定应用的实时计算框架,也分为流处理和批处理两大类。对于流处理支持的 CEP复杂时间处理、基于SQL-like的Table关系操作;面向批处理支持的FlinkML,Gelly等等。

Flink 的运行必须依赖 Hadoop组件吗

不是必须依赖,但属于大数据生态圈中,现在没Hadoop就好像西方没有耶路撒冷。

根据Deploy层可以看到,我们可以基于单机JVM运行,也能基于集群的Standlone模式运行,也可以基于EC2等的云部署运行,所以我们单单运行Flink是可以的。

你们的Flink集群规模多大

提出此开放性问题主要是看看你有没有具体使用过Flink,使用过多大的范围,最好借这个问题引出你自己擅长的下一个话题。

我们的Flink程序都是通过 on Yarn进行部署运行的,通过Yarn的队列资源来运行Flink任务。由于31省都有业务,所以我们有31省的Yarn集群,总共2000+台服务器,其中Flink任务资源占10%,大概运行的实时程序实例有700+,都是我一个人独立开发运行的。这么多实时程序任务,最大的问题其实是监控怎么做,所以我们在监控中下足了功夫(引出其他话题……)。

Flink的基础编程模型了解吗

Flink程序流程基本都是 Source -> Transformation -> Sink的流程。

每个任务都是从一个或者多个Source开始,并终止于一个或者多个Sink,中间可以有各种转换、映射、合并、拆分等操作,具体代码示例:

Flink集群有哪些角色,其各自有什么作用

Flink集群中一般有客户端、Flink Master、TaskManager,其结构如下图:

Flink Client

Flink客户端主要包括Program code,将代码进行解析,通过Graph Builder生成Job Graph,最后通过Client客户端提交给Flink集群。

Flink Master

Flink的管理中心,内部主要包含三个组件。

Dispatcher

DisPatcher的作用是从Client端接收作业,并代表Client端在集群管理器上启动作业。总体上来看可以看做分发器。

其是在AppMaster启动完毕后创建的,AppMaster的主类是YarnJobClusterEntrypoint(per-job模式)或YarnSessionClusterEntrypoint(session模式),最后通过AbstractDispatherResourceManagerComponentFactory.create方法创建并启动:

// AbstractDispatcherResourceManagerComponentFactory
public DispatcherResourceManagerComponent<T> create(
        Configuration configuration,
        ...) throws Exception 

        //创建webMonitorEndpoint并启动
        //创建resourceManager并启动

        //创建dispatcher并启动
        //Per-Job模式创建MiniDispatcher,Session模式创建StandaloneDispatcher
        dispatcher = dispatcherFactory.createDispatcher(
            configuration,
            rpcService,
            highAvailabilityServices,
            resourceManagerGatewayRetriever,
            blobServer,
            heartbeatServices,
            jobManagerMetricGroup,
            metricRegistry.getMetricQueryServiceGatewayRpcAddress(),
            archivedExecutionGraphStore,
            fatalErrorHandler,
            historyServerArchivist);
        //其实就是启动了rpc endpoint
        dispatcher.start();
    

Dispatcher的作用:

  • Dispatcher是可以夸作业运行,为Client端提供REST接口方便Client提交作业。

  • 当一个应用被提交执行的时候,分发器就会启动并将应用提交给JobManager。

  • Dispatcher会启动一个WebUI,方便战士和监控作业执行的信息。

  • Dispatcher并不是必须的,取决于应用作业的提交方式。

JobManager

是集群任务管理中心,是整个集群任务的协调者。负责接收Flink Job,协调任务调度Scheduler,管理checkpoint,Failover故障恢复等,同时管理Flink集群中的各个TaskManager从节点。

控制一个应用程序执行的主程序,也就是说同一个Flink Session被同一个的JobManager所控制。

集群中至少要又一个Master,可以使用HA,但要保证其他的是Standby。

JobMananger中包含Actor System、Scheduler、CheckPoint三个重要组件分别作用如下:

Actor System

参与者系统,是具有各种角色、演员 Actor的容器container。提供如调度、配置、日志等服务记录。还包含一个线程池,包含所有actor。

每个Actor都将分配一个父级,类似于linux,每个actor都会有写入队列。如果多个actor是同一个地址(JVM),那么他们可以通过共享内存来传递消息,如果是远程的,则通过RPC来调用传递消息。如果Actor的状态发生错误,则其父级会收到通知,如果父级能解决,则解决,不能继续上升到父级。

Scheduler

任务调度中心,主要调度任务的启停,验证任务的成功。

Checkpoint Coordinator

checkpoint 协调器协调 operators 和 state 的分布式快照。 它通过将消息发送到相关任务来触发 checkpoint,并收集 checkpoint 确认。 它还收集并维护由确认 checkpoint 的任务报告的状态句柄。

具体的代码解析可以参考 https://www.cnblogs.com/Springmoon-venn/p/13530188.html

Resource Manager
  1. 主要负责管理任务管理器( Task Manager)的 插槽(slot)Taskmanger插槽是 Flink中定义的处理资源单元。
  2. Flink 为不同的环境和资源管理工具提供了不同资源管理器,比如YARNMesos、K8s,以及 standalone部署。
  3. 当 Jobmanagerl申请插槽资源时, Resourcemanager会将有空闲插槽的Taskmanager?分配给Jobmanager。如果 Resourcemanagery没有足够的插槽来满足 Jobmanagerf的请求, 它还可以向资源提供平台发起会话,以提供启动 Taskmanageri进程的容器。

Task Manager

真正执行任务的Slave节点,负责具体的任务执行和对应任务在每个节点上的资源申请和管理。其本身就是一个RPC服务器,提供了任务提交、任务取消等接口,同时包含JobLeaderService、TaskSlot管理、资源管理、心跳检测、存储服务。

TaskSlotTable

TaskManager中最重要的就是TaskSlotTable,是指Flink任务最小的Task任务容器,他与普通线程不同的是,Slot可以分配到独立的内存和状态管理能力,不同的JobVertex可以通向Slot。

当TaskManager创建后,就会创建TaskSlotTable并添加获取Task,申请释放Slot。

JobLeaderService

JobLeaderService用于监听Master。如果Master节点改变,会通知JobLeaderService,内部是以JobId为Key来保存LeaderRetrievalService和JobManagerLeaderListener。

ResourceManager

不是Flink Master中的ResourceManager,而是TaskManager中的一个模块,用于监听ResourceManager中的主节点。如果主节点有变化,会通知ResourceManagerLeaderReriever。

HeartbeatManager

心跳检测模块包含JobManager和ResourceManager的心跳检测。

BlobCacheService

存储服务器,包含PermanentBlobService:可恢复,数据上传到BlobStore分布式文件系统;

TransientBlobService:不可恢复,数据不会上传到BlobStore分布式文件系统。

说说Flink 资源管理中 Task Slot 的概念

在TaskManager中,我们说最小任务资源单位就是TaskSlot。TaskManager会将自己管理的资源分为不同的Slot,他们享有自己独立的内存隔离,没有CPU隔离。

TaskManager其实就是一个JVM,所以Slot们还可以共享上下文信息、TCP连接、心跳消息、各种数据结构等。

在默认情况下,Flink允许子任务共享Slot,当然必须满足:它们属于同一个Job,并且不是同一个operator的子任务。所以当我们在写程序的时候,很可能一个Slot中包含一个Job的完成pipeline。这样的好处有:

Flink计算一个Job所需的Slot数量时,只需要确定pipeline上的最大并行度即可,而不用考虑每一个任务的并行度。

能够更好的利用资源,小消耗的Slot与大消耗的Slot本来任务所需的资源不一致。

Flink 资源管理中 Task Slot 的概念

DataStream 2 DataStream

map、flatMap、filter

转换、压平、筛选

其他转换的数据流

keyBy、window、reduce、fold、sum、max、windowAll、connect、Join、apply、coGroup。

Flink分区策略

Flink的分区策略应该指的就是数据分区,一般称之为Partition。

StreamPartitioner是Flink中的数据流分区抽象接口,决定了在实际运行中的数据流分发模式,将数据切分交给Task计算,每个Task负责计算一部分数据流。所有的数据分区器都实现了ChannelSelector接口,在这个接口中定义了负载均衡的选择行为。

ChannelSelector

public interfaceChannelSelector<T extends IOReadablewritable>  
    //下游可选 Channel 的数量
    void setup (intnumberOfChannels); 
    //选路方法
    int selectChannel (T record); 
    //是否向下游广播
    boolean isBroadcast();
 

在这个接口可以看到,每一个分区器都知道下游通道的数量,在某一次任务运行的时候是固定的,除非刻意更改作业的并行度,否则不会更变。

分区策略

一共有8种分区策略,数据分区体系如下图:

GlobalPartitioner

会将所有数据都分发到下游的第一个实例中。

ShufflePartitioner

每次来一个都随机一个下游进行分发,这个分发的的特点是,相同数据不一定在一个下游中,使用环境很少。

BroadcastPartitioner

发送数据到每一个下游中,一般用于dim表,小表分发,配置分发的场景。

RebalancePartitioner

轮循下发数据到下游中。这个轮循是上游每个的Partitioner的轮循,并不是全局轮循。

RescalePartitioner

通过计算下游Operator的并行度,将记录以循环的方式输出到下游的Operator的每个实例。通过下图可以看到他是

其实这个算法跟 RebalancePartitioner 是一样的算法,都是获取numberOfChannels然后取余,但是下面有个值被偷偷摸摸设置为了true ——isPointwise。

当这个值isPointwise被设置为了true以后,后续的解析行为会更变实现方法,导致后续的实现会变成特定节点只跟特定集合下游节点参与分配。详情跳转。我们知道Flink会将任务拆解为StreamGraph,提交JobGraph,最后执行ExecutionGraph。在StreamGraph转化为JobGraph中的时候,查看其代码的801行可以看到:

可以看到分布方式被指定成了POINTWISE,我们直接进入DistributrionParttern看看这个POINTWISE用到的哪里:

通过这个值,在JobGraph到真正执行的ExcutionGraph中时(EdgeManagerBuildButil.connectVertexToResult),使用connectPointwise去执行,那和connectAllToAll有什么区别呢:

connectAllToAll

很简单,上游分配所有下游节点,下游节点获取所有上游。

connectPointwise

主要考虑上下游的节点数进行分配计算,那么就会有三种情况:

sourceCount = targetCount
sourceCount > targetCount
sourceCount < targetCount

当上下游节点相等时直接一一对应即可:


当上游节点大于下游的时候:

其实就是多个上游节点要对应同一个下游节点,那么怎么分配呢?公式为:
f ( i n d e x ) = s u b A r r a y ( i n d e x ∗ S o u r c e C o u n t / t a r g e t C o u n t , ( i n d e x + 1 ) ∗ S o u r c e C o u n t / t a r g e t C o u n t ) f(index)=subArray(index*SourceCount/targetCount,(index+1)*SourceCount/targetCount) f(index)=subArray(indexSourceCount/targetCount,(index+1)SourceCount/targetCount)
注意这里的除法不保留小数。

上下游比率SourceCount/targetCount,在通过index滑动取分区。


当上游节点小于下游的时候:

意味着上游节点要对应多个下游节点,使用公式:
f ( p a r t i t i o n N u m ) = ( p a r i t i o n N u m ∗ t a r g e C o u n t + s o u r c e C o u n t − 1 ) / s o u r c e C o u n t f(partitionNum)=(paritionNum*targeCount + sourceCount -1)/sourceCount f(partitionNum)=(paritionNumtargeCount+sourceCount1)/sourceCount
通过partitionNum进行滑动,这样上游的节点就可以向固定下游进行发送数据,且下游节点有且仅有一个上游节点。这个算法考虑的还是挺不错的,可以多看看。


ForwardPartitioner

发送到下游对应的第一个task,与ResscalePartitioner相同,他们的isPointwise被指定为true:


KeyGroupStreamPartitioner

根据key的分区索引选择发送到相对应的下游subTask。

终于有一个使用真实数据来规定走哪一个下游了,具体代码看这段:

具体实现为:

keyGroupId的计算方式:

murmurHash Hash算法

一种非加密型的Hash操作,可以有较强规律的key,随机分布表现优秀。具体后续再写一篇文章介绍。


CustomPartitionerWrapper

通过自定义的方法Partition,将记录输出到下游中。

在这个中,用户必须自己提供一个Partitioner以计算正确的映射关系:


默认策略

在上下游的Operation并行度一致,且没有指定分区器的情况下,使用ForwardPartitioner;如果指定了分区器则使用RebalancePartitioner。

Flink的并行度是怎样的

并行度Parallel,是对每个subTask进行设定的。而并行度是可以用户指定的,与Java等参数一样,进行配置的时候会有层次关系:

  • Operator Level 算子层次
  • Execution Environment Level 执行环境层次
  • Client Level 客户端层次
  • System Level 系统层次

越往下,覆盖范围越大;相对的越往上,覆盖力度越大。

Operator Level

写在代码里,单独对subTask的设置:

        final SingleOutputStreamOperator<CmConfig> cm = env.readFile(cmFormat, pathString
                        , FileProcessingMode.PROCESS_CONTINUOUSLY, 60 * 60 * 1000)
                .map((MapFunction<Row, CmConfig>) CmConfig::new)
                .setParallelism(10)
                .returns(CmConfig.class);

上图setParallelism就是设置这单个subTask的并行度。

Execution Environment Level

当然我们程序一般会有一个env的默认并行度:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.failureRateRestart(5, Time.minutes(3), Time.minutes(1)));
        env.setParallelism(parallelism);
        env.disableOperatorChaining();

上面代码的env.setParallelism(parallelism);就是针对整个任务执行的默认并行度。

Client Level

当我们提交job到Flink的时候,可以通过

FLINK_HOME/bin/flink run -p 10 xxxx.jar

进行任务的提交进行设置并行度 -p 10 。

System Level

在我们部署flink的时候,FLINK_HOME/bin/flink是会加载配置文件的:flink-conf.yaml中的parallelism.default来指定全局默认并行度。一般情况下不推荐设置该值。

Flink的Slot和parallelism有什么区别

Slot是Flink中的最小计算资源,受到TaskManager的管控,在同一个TaskManager中的Slot共享一个JVM,TCP、心跳连接等都可以共享。

Parallelism本质是任务的并发数,在Flink中,一个作业会被拆分为若干个subTask,这些subTask会有自己的并发数Parallelism。

Slot唯一能和Parallelism扯上关系的就是我们一般只会指定TaskManager的数量,和每个TaskManager中的Slot数量。而计算真正计算TaskManager数量的就是通过作业的最大subTask/单TaskManager中的Slot来计算TaskManager的个数。

Flink的重启策略

Flink任务在失败的时候,会采用一定的策略来自适应这些报错。重启类在:org.apache.flink.api.common.restartstrategy.RestartStrategies中,具体的重启规则有:

NoRestartStrategy

不进行重启,当任务失败的时候直接进行失败操作,不对任务进行重启判断。

    /**
     * Generates NoRestartStrategyConfiguration.
     *
     * @return NoRestartStrategyConfiguration
     */
    public static RestartStrategyConfiguration noRestart() 
        return new NoRestartStrategyConfiguration();
    

具体的实现方式如下:

    /** Configuration representing no restart strategy. */
    public static final class NoRestartStrategyConfiguration extends RestartStrategyConfiguration 
        private static final long serialVersionUID = -5894362702943349962L;

        @Override
        public String getDescription() 
            return "Restart deactivated.";
        

        @Override
        public boolean equals(Object o) 
            if (this == o) 
                return true;
            
            return o instanceof NoRestartStrategyConfiguration;
        

        @Override
        public int hashCode() 
            return Objects.hash();
        
    

看这个方法其实看不出来什么内容,其实在org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategyFactoryLoader#createRestartBackoffTimeStrategyFactory方法中,依次判断了是哪种重启策略,遵循什么样的重启规则。当发现为noRestart的时候,则直接不重启。


FixedDelayRestartStrategy

在一定重试次数之内进行重启操作。超过重试次数之后,不再重启。

    public static final class FixedDelayRestartStrategyConfiguration
            extends RestartStrategyConfiguration 
        private static final long serialVersionUID = 4149870149673363190L;

        private final int restartAttempts;
        private final Time delayBetweenAttemptsInterval;

        FixedDelayRestartStrategyConfiguration(
                int restartAttempts, Time delayBetweenAttemptsInterval) 
            this.restartAttempts = restartAttempts;
            this.delayBetweenAttemptsInterval = delayBetweenAttemptsInterval;
        

        public int getRestartAttempts() 
            return restartAttempts;
        

        public Time getDelayBetweenAttemptsInterval() 
            return delayBetweenAttemptsInterval;
        

主要包含两个参数:重试次数&重试延迟(毫秒)。对应的实现就是org.apache.flink.runtime.executiongraph.failover.flip1.FixedDelayRestartBackoffTimeStrategy。


FailureRateRestartStrategy

一种基于失败率的重启策略。需要传入三个值:最大失败率(次数)、失败间隔、重启间隔。

    /** Configuration representing a failure rate restart strategy. */
    public static final class FailureRateRestartStrategyConfiguration
            extends RestartStrategyConfiguration 
        private static final long serialVersionUID = 1195028697539661739L;
        private final int maxFailureRate;

        private final Time failureInterval;
        private final Time delayBetweenAttemptsInterval;

        public FailureRateRestartStrategyConfiguration(
                int maxFailureRate, Time failureInterval, Time delayBetweenAttemptsInterval) 
            this.maxFailureRate = maxFailureRate;
            this.failureInterval = failureInterval;
            this.delayBetweenAttemptsInterval = delayBetweenAttemptsInterval;
        
        ……
    

只要在一段时间内,还没有达到最大失败次数,那么就间隔重启间隔后重启程序。


ExponentialDelayRestartStrategy

无限次指数递增重启策略。当任务失败的时候,会进行重启,重启间隔为失败次数的指数递增;没有最大重启次数限制,无限尝试重启任务。

    /** Configuration representing an exponential delay restart strategy. */
    public static final class ExponentialDelayRestartStrategyConfiguration
            extends RestartStrategyConfiguration 
        private static final long serialVersionUID = 1467941615941965194L;

        private final Time initialBackoff;
        private final Time maxBackoff;
        private final double backoffMultiplier;
        private final Time resetBackoffThreshold;
        private final double jitterFactor;

        public ExponentialDelayRestartStrategyConfiguration(
                Time initialBackoff,
                Time maxBackoff,
                double backoffMultiplier,
                Time resetBackoffThreshold,
                double jitterFactor) 
            this.initialBackoff = initialBackoff;
            this.maxBackoff = maxBackoff;
            this.backoffMultiplier = backoffMultiplier;
            this.resetBackoffThreshold = resetBackoffThreshold;
            this.jitterFactor = jitterFactor;
        
           ……
    

可以看到一共有五个入参,分别为:

  • initialBackoff 第一次重启时间间隔

  • maxkBackoff 最大的重启时间间隔

  • backoffMultiplier 指数系数

    每次任务失败后重启间隔乘以这个系数。

  • resetBackoffThreshold 当过了多长时间后将任务重启间隔回归到initialBackoff

  • jitterFactor 重启时间的抖动系数

    当任务重启的时候,会随机或加或减配置项系数的一个随机数。


FallbackRestartStrategy

使用集群的默认重启策略。其实还是以上的四种之一而已,并不在程序内进行设置,而是采用提交任务时,或者平台配置时的默认策略。


Flink中的分布式缓存

分布式缓存听起来总感觉怪怪的,其实就是类似于spark程序提交的时候 -f 一个文件上去,分发到各个节点中。

这个每个节点都能获取到这个文件数据。

与广播变量的不同是广播变量是分发到各个TaskManager节点的内存中,而分布式缓存是将文件缓存到各个TaskManager节点上。

使用代码:

env.registerCachedFile("hdfs:///path/file", "cachefilename");
File myFile = getRuntimeContext().getDistributedCache().getFile("cachefilename");
List<String> lines = FileUtils.readLines(file);

可以直接在RichFunction中,对open进行重写,将文件中的数据读取出来,存放到内存中。


Flink中的广播变量

在Flink中,同一个算子可能存在若干个不同的并行实例,计算过程可能不在同一个Slot中进行,甚至不在同一个TaskManager中,也就是不在一个Jvm中。比如这样一个情景:

我们需要设计一个算子,将所有数据根据一个文件中的kv关系进行映射。

将原始数据中的城市编码转换为城市名,需要根据一个离线文件中的数据进行关联。

离线文件广播

这样一个离线文件被读取后需要分发到每个下游中,并且每个下游都有这个离线文件汇总的全量数据。

需要注意的是,广播变量是将数据广播到内存汇总,广播的数据不能太大,不然会OOM,广播变量原则上来说不可修改。

		//1.env
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
        //2.Source
        //学生数据集(学号,姓名)
        DataSource<Tuple2<Integer, String>> studentDS = env.fromCollection(
                Arrays.asList(Tuple2.of(1

以上是关于Flink 面试跳槽指南——带领你疏通航道的主要内容,如果未能解决你的问题,请参考以下文章

面试指南:想跳槽,你真的做好准备了吗?

吐血整理!面向Java开发者的复习指南!请把这些学完再去面试

2021年Android开发者跳槽指南,Android工程师面试该怎么准备?附答案

面试指南:想跳槽,这些技术点你会了吗?

CURD程序员的春天!跳槽直通车:阿里面试指南(恒山版)

2022上半年《Android面试题合集+完整体系学习指南》,一步实现跳槽涨薪