首发Flink新一代流式计算框架的体系架构及应用
Posted 海数据实验室
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了首发Flink新一代流式计算框架的体系架构及应用相关的知识,希望对你有一定的参考价值。
【导读】
本内容总结自TalkingData资深大数据研发工程师王剑先生在2017年7月26日海数据技术沙龙第34期上所分享的内容。
以下为分享内容:
1.Flink简介
首先Flink是一个支持分布式、高性能、高可用性、高可靠性的流式处理框架,具有也下特点:
延迟低
吞吐量高
支持状态
保证数据不出错(目前所有流式处理框架中最好的)
2、Storm、Spark Streaming及Flink流式计算的比较
在开源大数据生态中,Storm是第一代流式计算框架的代表,但它不支持状态管理,即Storm中的状态数据需要用户自己存储在外部存储系统中,数据的持久化和一致性都需要用户自己保证,这会给应用带来一定复杂度。
后续出现的Samza是支持状态管理的第二代流计算技术,内部利用leveldb和kafka来存储数据,但Samza只能保证at least once不丢数据,但无法保证exactly once的强一致性,这在一些严格的场景下是有局限性的。
近几年火爆的Spark也很快推出了配套的Spark Streaming技术,但其本质上是通过连续不间断的Mini Batch来实现流式处理的,不是纯粹的流式计算思想,时效性上也有一定局限性。micro-batch就意味着他的latency是比较高的,这一点相对于Flink是硬伤。只有最新一代的Flink是相对最为纯粹和完善的流计算技术,在理论模型上具备了一切流计算的特质,也是支持Apache Beam最好的Runner。
3.Flink的应用场景
Flink具有以下四种使用场景:
电商实时的搜索结果。
数据科学团队提供流处理服务。
网站/传感器的监控器和错误的侦察。
商业智能分析平台的数据加载。
4.Flink支持的存储特点
目前Flick支持Redis,Kafka,Casandra,Elasticsearch,Flume,Rabbitmq,Twitter,HDFS等数据存储形式,值得注意的一点是:目前Flink不支持关系型数据如MySQL等,主要原因是mysql不能保证Exactly-Once,但是Flink本身支持Exactly-Once。
5.Flink的作业和数据流
Flink程序是由streams和operator构成的,stream可以认为是数据的中间结果,而operator是对一个或多个input stream的操作,operator可以输出一个或多个out stream。operator分为source operator、transformation operator和sink operator。当任务被提交后,Flink程序被映射为streaming dataflow,也就是描述任务的DAG图。
6.Flink的并行数据流
Flink支持并行(Parallel)与分布式,在Flink中,streams被拆分为stream partitions,operators被拆分为operator subtasks。同一个operator下的operator subtask可以独立的运行在不同服务器或containers的不同线程上,operator subtask的数量等于operator的parallelism。在一个dataflow中,不同的operators可以有不同的parallelism。
7.Flink的任务核操作链
在分布式的执行环境中,Flink将operator subtasks链成task。每个task由一个线程执行。将operator链成task可以减少线程的切换、buffering,提高了整个程序的吞吐量。operator subtasks链成task可在API中配置。
8.Flink的任务槽
每个Worker是一个JVM进程,它可以在不同的线程中执行一个或多个subtasks。Worker进程中的task slots用于控制一个worker进程能够接收多少tasks。每个task slot代表了TaskManager中的固定的资源(的子集)。
通过调整task slots的数量,可以调整subtasks之间的隔离程度。在相同JVM中运行的subtasks能够共享TCP connections和心跳信息,它们之间也可以互相共享data sets和data structure,因此阻止了每个task overhand。
9.窗口
聚合计算在流式处理与批式处理有很大的不同,因为流是没有边界的,我们无法拿到所有的数据做运算并得出结果。因此,为了解决在流式系统中的数据的聚合计算,Flink引入窗口的概念。流上面的聚合操作(count, sum等)会被scope成一个个window。窗口可以分为时间驱动的窗口(tumbling window(no overlap)、sliding window(with overlap))和数据驱动的窗口(session window)。
10.State & Fault Tolerance
在Flink中,有的operations在dataflow中只需要关注一个事件信息(例如:event parser),有的operators需要记住多个不同的event的信息(比如window operator),这类的operators被称为stateful operators。stateful operators的state被存在一种类似key/value的存储中。状态和带着这些状态的stream会被stateful operators严格地进行partition和distribution以供有状态的operator进行读取。
Architecture
1.独立集群
当一个Flink集群启动的时候(./start-cluster.sh),它会启动JobManager进程和TaskManager进程。JobManger进程是Flink coordinator,而TaskManger进程是执行parallel job的worker进程。如果Flink以local的模式启动,那么JobManager和TaskManager会共用一个JVM,如果以cluster的模式启动则JobManager和TaskManager都在不同的JVM中。当一个job提交的时候,就会生成一个environment client,client会对job进行预处理并将程序转换为JobGraph,JobGraph会被提交给JobManager。JobManager 分发任务给 TaskManager 去执行,然后 TaskManager 会发送heartbeat和汇报job状态。
2.基于Yarn的Flink
基于YARN就意味着可以进行资源的隔离,而且更易部署和管理,能和上面的存储引擎(Hadoop/HBase等)进行本地交互等等。
Flink运行作业分为两步:
1.创建AppMaster,申请资源并启动进程。
2.提交作业。
Flink On YARN的执行步骤:
1、启动yarn session,分配必要资源。
2、client会首先检查是否必要的资源已经准备就绪 (包括:containers and memory). 然后,client会上传一个包含flink jar和Flink相关配置到HDFS。
3、Client发送request,并请求一个YARN container来启动ApplicationMaster
4、YARN的NodeManager将会去HDFS上下载flink jar和Flink相关配置并准备好container的资源和配置。
5、当以上步骤完成后,YARN Application Master (AM) is started。Flink JobManager和YARN Application Master是运行在同一个container上面的。在这之后,YARN Application Master将会为TaskManager分配container, 这些container将会去HDFS下载jar文件并修改configuration。
6、完成上述步骤,Flink就已经准备好接收job任务了。
3.Flink作业的生命周期
一个Flink Job的生命周期有Created,Runing,Finished,Faied,Canceled,Suspended等状态,这些状态之间是相互跳转的,图中列出了详细的转换关系。
4.阿里巴巴的Flink
Blink是阿里巴巴搜索的流计算和批处理引擎。相比于Flink,在上层,Blink具有批和流一体化的完备Table API,使得其能够支撑各类业务需求;在底层,Blink重新开发了兼容Flink以及生态的Runtime,实现了流处理和批处理完美的统一。
1.流图和作业图
下面介绍Flink的work flow。概括来讲,每个job的执行都是由Client生成StreamGraph,再根据StreamGraph优化生成对应的JobGraph,Client将JobGraph发送给JobManager,JobManager根据JobGraph生成对应的ExecutionJobGraph最后发送给TaskManager来执行。
2.执行图
ExecutionGraph是JobManager描述data flow执行的核心数据结构,本质上它就是JobGraph的并行执行版本。它表述了并行任务、中间结果以及并行任务与中间结果的联系。它由以下部分组成:
1、ExecutionJobVertex: 表示JobGraph的一个节点即 JobVertex。它由JobVertexID唯一标志。
2、ExectionVertex: 代表了一个并行的subtask。对于每个ExecutionJobVertex,其并行度的数量就是ExectionVertex的数量。
3、IntermediateDataSet 也映射为了 IntermediateResultPartition。
4、JobManager在生成了ExecutionGraph之后,就可以向TaskManager提交任务了。
3.任务
这里最重要的就是Task类了,Task类描述了真正在TaskManager上运行的任务:Task代表了一个parallel subtask在TaskManager中的一次执行,它封装了operator或者operator chain并且会执行这些operator。TaskManager会为Task的执行提供了必要的组件和服务,Task对应的就是一个ExecutionVertex,而且每个task有一个线程来执行。
4.输入门和结果分区
ResultPartition是IntermediateResultPartition的运行时表现,Flink还会进一步将RP分成ResultSubPartition。在ResultSubPartition向下一个Task发送数据时,它会向BufferPool申请Buffer,Task要传输的事件或者记录都会被序列化到Buffer中,再由Netty向下一级Task的InputGate发送数据。
TypeInformation & Serialization
1.类型信息
Flink想要尽可能掌握进出用户函数的数据类型的信息(TypeInformation)。这样做有以下几点原因:
1、为了可以使用Pojo并且可以利用Pojo的field name来进行grouping、joining等操作,Flink在job执行之前检查类型信息;
2、类型信息知道的越多,compiler和optimizer就可以编出更好的data layout schema以及更好的序列化方式。这对flink的内存使用非常重要,因为更高效的序列化和反序列会使得数据进出堆变得非常高效;
3、对于即将推出的逻辑程序(logic program),我们需要这些信息来识别函数的“scheme”;
4、TypeInformation可以让用户不再操心序列化框架的使用(例如如何将他们自定义的类型注册到序列化框架中)。
2.序列化程序
默认的,Flink支持Avro和Kyro两种序列化框架。对于基本的数据类型,Flink自带序列化器;对于Pojo类型Flink会优先使用Avro序列化;对于GenericTypeInfo,Flink使用Kryo序列化。使用Avro进行序列化会比使用Kryo序列化高效许多。我们可以通过对ExectutionEnvironment进行配置全局的序列化框架,也可以制定自己的序列化方式。
1.容错的概念
失败恢复机制,这个机制可以保证,即使线上环境job或cluster失败,依然保证不会丢数据。 Flink Job失败,重启operator后,将他们重置为最新成功了的checkpoint。输入流会被重置为latest state snapshot点。因为Flink的checkpoint是通过分布式snapshot实现的,所以这里我们对snapshot和checkpoint不进行特别区分。
2.检查点和快照
1、Checkpoint设置间隔时间,去做分布式snapshot。
2、Chandy-Lamport算法就是分布式snapshot和恢复。传统流处理都是事务性解决fail,也就是数据存内存中,要不然就全成功,要不然就全失败。
3、State少的情况下非常轻量级,速度快,不影响性能。
4、异步,AsyncCheckpointed, SyncCheckpointed. 默认异步,不影响流处理,所以有checkpoint有pending、completed状态。
5、根据Checkpoint设置间隔时间,周期触发。
6、监听时间间隔并触发checkpoint并协同,这个是JobManager职责,具体应答scheckpoint去做snapshot,这个是TaskManager。
3.障碍
1、Barriers作为data stream的一部分随着记录被注入到data stream中。
2、Barriers将数据流中的记录隔离成set of records,并将集合中的数据加入到当前的snapshot中,而后面的数据加入到下一个snapshot中。
3、每一个barriers携带着snapshot的ID,snapshot记录着ID并且将其放在snapshot数据的最前端。checkpoint barrier的ID是严格单调增长的。ID能在zookeeper中看到。
4.、JobManager 作为checkpoint coordinator主要负责调度 Job ,并协调 Task 做 checkpoint。JobManager会发送Barrier控制流,将Source读入的数据分成分批,而Source可以随时的读入并发送数据,通过checkpoint coordinator添加Barrier分批后数据发送给后面的transform operator。
4.单数据流
JobManager会发送Barrier控制流,将Source读入的数据分成分批。Barrier的发送完全是按照checkpoint时间间隔来的,我们可以根据恢复时间的需求来设置间隔。Flink在checkpoint时会把state复制一份,然后进行异步的checkpoint做snapshot,尽量不影响正常的计算。
Checkpoint有两种状态:pending, completed
5.多数据流
数据对齐是分布式一致性snapshot的关键。Flink也就是靠着Barrier这种机制来进行分布式一致性的对齐(align)。
如图描述了节点的4个对齐状态:
1、开始对齐,这时已经收到A的数据,但是B的数据还没来,需要对齐。
2、收到上游A来的下一批的数据,需要把这些数据阻塞住(也就是ResultPartition缓存起来)。
3、收完B发来的这批的数据,第n批的数据已经到齐了,开始进行checkpoint,并发送Barrier到它的下游去。
4、继续下一批的对齐。
总结对齐的一句话:一旦operator从外来流中收到barriers n,它就不能处理这个流中后面的数据,直到它从其他输入中接收到barriersn。否则,会混合属于snapshot n以及snapshot n+1的记录,无法做snapshot无法恢复exacly_once。
6.状态
当没有enable checkpoint的时候,source operator消费kafka的offset会记录在zookeeper上。如果enable checkpoint后,source operator消费kafka的offset会记录会以snapshot存在state backend存储中。
Flink提供了Exactly_once特性,就是依赖于带有barrier的分布式snapshot + 可重发的数据源source功能实现的。而分布式snapshot中,就保存了operator的state信息。
7.后台状态
第一列是source stateful operator, 记offset信息,第二列是stateful transform operator记录state信息。最后sink记录ack表示checkpoint变为completed状态。
8.数据一致性
关于Exactly Once:
1、一般的流式处理框架对Exactly Once的处理都是要求state的事务性,数据放内存中,要不然全成功,要不然失败重新来。但是Flink的做法是使用周期的进行一致性分布式snapshot的checkpoint,然后在任务失败后,恢复到上一个一致性snapshot中。
2、每条记录将在operator状态中只被replay一次。例如,如果有一个用户在一个job中使用了统计元素个数的operator。
3、这里的Exactly Once模式也并不保证Flink在跟外部系统交互也满足Exactly Once(Flink只保证自己的operator以及function的状态)。Flink通过实现akfka connector来达到这样的要求(比如Apache Kafka的offset可以实现这个需求)。
4、对齐(align)步骤可能会增加流处理的延迟。禁止align就变为At Least Once。
5、Exactly_once仅仅是为有stateful operator准备的!换句话说,没有state的operator,也无需Exactly_once。
关于At_Least_Once:
1、不使用brrier的align,一些记录可能会被replay多次。
2、At_Least_Once应对于需要低延时,但对数据的准确性要求并不高的场景。
9.数据恢复
对于unstateful operator,当失败时,只需从最近的一份snapshot开始,利用可重发的数据源source(例如:kafka)重发一次数据即可。
对于stateful operator,我们必须要保证数据被处理的是Exacly Once,因此,当失败时,不仅利用可重发的数据源source(例如:kafka)重发一次数据,并且利用snapshot把stateful operator计算的值恢复到最近的一份snapshot中的值。
checkpoint(snapshot)机制: checkpoint根据时间间隔定期触发,产生snapshot,snapshot中记录有:
1、当前checkpoint开始时数据源(例如Kafka)中消息的offset;
2、记录了所有有state的operator当前的state信息(例如sum中的数值)。
3、可重发的数据源source:Flink选择最近完成的checkpointK。然后系统replay整个分布式的数据流,然后给予每个operator他们在checkpointk的snapshot中的state。数据源被设置为从位置Sk开始重新读取流。例如在Apache Kafka中,那意味着告诉消费者从偏移量Sk开始重新消费。
10.保存点
Svaepoint具有也下几个特点:
1、Savepoint仅仅是一个指向checkpoint的指针; savepoint默认保存在JobManager的memory中,但为了高可用,建议保存到hdfs或RocksDB上(和checkpoint一样在配置文件中配置)。
2、Savepoint是用户人为触发的。它依赖于Flink提供的client,用户可以通过client(CLI)来触发一个savepoint。用户执行触发savepoint操作后,client会通过akka给JobManager发一个消息,JobManager接着通知各TaskManager触发checkpoint。checkpoint触发完成后,TaskManager会执行JobManager的回调,在回调中JobManager会告知触发savepoint的结果(也是通过akka给客户端发消息)。savepoint它不会随着新的已完成的checkpoint产生而自动失效。
3、不同于Checkpoint的是,savepoint并不像checkpoint一样将state作为自己的一部分一并保存。savepoint不存储state,它只通过一个指针指向具体的checkpoint所属的state。
1.背压
在流式处理框架中,Backpressure机制是非常重要的。当接收数据的速度远远超过处理数据速度的时候,我们就需要使用backpressure机制来阻止大量数据的涌入,防止流量陡增快速淤积数据导致的资源耗尽,甚至任务崩溃。
目前主流的流式处理框架 Storm提供了backpressure机制。Storm 是通过监控 Bolt 中的接收队列负载情况,如果超过高watermark(水位值)就会将backpressure信息写到 Zookeeper ,Zookeeper 上的 watch 会通知该topology的所有 Worker 都进入backpressure状态,最后 Spout 停止发送 tuple。
Flink没有使用外部资源(Zookeeper)来处理记录backpressure信息,而是利用本身的结构提供backpressure的功能。Flink的Task和Task之间独立一条TCP(Netty)连接,这样Flink可以直接依靠TCP的反压,可以很好的解决Strom backpressure的震荡问题。
Flink利用自身的buffer pool的物理结构来实现在 Task 之间传输数据以及backpressure的自然降速。
Flink job在运行时,逻辑结构主要由 operators 和 streams 两大组件构成。
Flink是有stream和operator组成。对于这个streams的概念,逻辑上可以理解为是数据流,而实际上streams是由LocalBufferPool中的buffer组成的,而每个buffer中存的是对象序列化后的二进制。每个operator对应的输入流和输出流都对应一个buffer pool,每个buffer pool都是由一定数量的buffer组成。这些buffer可以循环利用。 (可以类比成生产者-消费者模型中的BlockingQueue的原理,一旦队列满了发送者会被阻塞)。
一条记录的处理流程:
1、记录“A”了进入Flink 中。
2、通过Netty读取,InputChannel申请Task1的InputGate对应LocalBufferPool,申请完毕后,InputChannel将“A”记录序列化填到buffer上存到LocalBufferPool中。
3、Task1从InputGate对应LocalBufferPool中取出数据,反序列化成“A”对象,进行etl处理。
4、Task1将申请ResultPartition对应的LocalBufferPool,申请完毕后,将处理结果序列化后填到buffer(也有可能被切割到多个buffer中)上存到LocalBufferPool中。
5、这些buffer首先会被传递给BufferWriter,然后被写到ResultPartition中。一个ResultPartition包括多个ResultSubpartition。
6、那么既然buffer已经进入ResultSubpartition了,ResultSubpartition也就变成可访问的状态了。然后它会通知JobManager。
JobManager得知这个ResultSubpartition已经可以访问了,那么它就会通知下游的task这个数据会被发送到下游task的InputChannel上。
这个Inputchannel被认为是接收这个buffer的,接着通知ResultSubpartition可以初始化一个网络传输了。然后,ResultSubpartition通过TaskManager的网络栈请求该buffer,然后双方基于netty准备进行数据传输。网络连接是在TaskManager之间长时间存在的。
7、通过Netty读取,InputChannel申请Task2的InputGate对应LocalBufferPool,申请完毕后,InputChannel将Task1处理后的记录序列化填到buffer上存到LocalBufferPool中。(一个InputGate包含多个InputChannel)
8、后面的流程重复以上逻辑。
以上处理流程的前提是:必须有在LocalBufferPool中有空闲可用的 Buffer。
本地传输:如果 Task 1线程 和 Task 2 线程运行在同一个TaskManager进程中的不同task slot上。那么Task2可以直接获取Task1的ResultPartition对应的LocalBufferPool中的buffer。
远程传输:如果 Task 1线程 和 Task 2线程运行在不同的TaskManager进程上的task slot中,那么 buffer 会在发送到网络(TCP Channel)后被回收。
Flink官网中有一个backpressure demo。下面这张图显示了:随着时间的改变,生产者(黄色线)和消费者(绿色线)每5秒的平均吞吐与最大吞吐(在单一JVM进程中每秒达到8百万条记录)的百分比。我们通过衡量task每5秒钟处理的记录数来衡量平均吞吐。该实验运行在单 JVM 进程中,不过使用了完整的 Flink 功能栈。
首先,我们运行生产task到它最大生产速度的60%(我们通过Thread.sleep()来模拟降速)。消费者以同样的速度处理数据。然后,我们将消费task的速度降至其最高速度的30%。你就会看到背压问题产生了,正如我们所见,生产者的速度也自然降至其最高速度的30%。接着,停止消费task的人为降速,之后生产者和消费者task都达到了其最大的吞吐。接下来,我们再次将消费者的速度降至30%,pipeline给出了立即响应:生产者的速度也被自动降至30%。最后,我们再次停止限速,两个task也再次恢复100%的速度。总而言之,我们可以看到:生产者和消费者在 pipeline 中的处理都在跟随彼此的吞吐而进行适当的调整,这就是我们希望看到的backpressure的效果。
当 Web UI页面切换到某个 Job 的 Backpressure 页面,会trigger这个 Job backpressure检测,因为backpressure检测还是挺昂贵的, 因此JobManager 会通过 Akka 给每个 TaskManager 发送 Stack Trace 消息。默认情况下,TaskManager 会触发100次 stack trace 采样,每次间隔 50ms。并将这 100 次采样的结果返回给 JobManager,由 JobManager 来计算反压比率(反压出现的次数/采样的次数),最终展现在 Web UI 界面上。Web UI backpressure界面刷新的默认周期是一分钟,目的是不对 TaskManager 造成太大的负担。
Web UI界面job的backpressure的screenshot。该状态意味着JobManager出发了一个运行中的任务的堆栈轨迹取样,默认配置下,该操作需要约5秒。
在此界面,我们点击某行,则会触发对该Operator的所有子任务的取样操作。
1.JVM的缺点
Flink作为一个由java开发并自主实现内存管理而不完全依赖于JVM的内存管理机制。它的优势在于避免频繁GC导致的性能波动, 降低OOM的发生频率。
2.Flink的任务管理器JVM
Network Buffers: 一定数量的32KB大小的MemorySegment(buffer),主要用于数据的网络传输(在backpressure一章中讲过)。在 TaskManager 启动的时候就会分配。默认数量是 2048 个,可以通过flink配置文件中的 taskmanager.network.numberOfBuffers 来配置。
Memory Pool: 由 MemoryManager 管理,由众多MemorySegment组成的超大集合。Flink 中的operator(如 sortuffle/join)会向这个Memory Pool申请 MemorySegment,将序列化后的数据存于其中,使用完后释放回Memory Pool。默认情况下,Memory Pool占了jvm堆内存的70% 的大小。
Remaining (Free) Heap: 这部分的内存是留给用户代码以及 TaskManager 的数据结构使用的。因为这些数据结构一般都很小,所以基本上这些内存都是给用户代码使用的。从GC的角度来看,可以把这里看成的新生代,也就是说这里主要都是由用户代码生成的短期对象。
3.Flink的内存段
1、在Flink中,并不是将大量对象存在JVM堆上,而是将对象都序列化到一个预分配的内存块最小的内存分配单元MemorySegment(默认32KB)。
2、MemorySegment的底层是byte[],或是堆外 ByteBuffer。
3、Flink流中每条记录都会以序列化的形式存储在一个或多个MemorySegment中。 Flink发送数据时按照MemorySegment内存片来发送,Flink把将要发送的数据序列化到MemorySegment内存片中,当MemorySegment内存片凑满的时候,将此MemorySegment内存片发送出去。Flink将数据直接序列化到受管理的内存byte数组中,业务对象很快被YoungGC掉了,不会导致FullGC频繁。
4、如果Flink Job需要处理的数据超出了内存限制,flink会将部分数据存储到硬盘上。
4.Flink的内存管理
1、减少GC压力:
所有在TaskManager中常驻型的数据都是序列化后的二进制,存在 MemorySegment中, 一直呆在老年代而不会被GC回收。其他的数据对象基本上是由用户代码生成的短生命周期对象,这部分对象可以被 Minor GC 快速回收。只要用户不去创建大量类似缓存的常驻型对象,那么老年代的大小是不会变的,Major GC也就永远不会发生。从而有效地降低了垃圾回收的压力。另外,这里的内存块还可以是堆外内存,这可以使得 JVM 内存更小,从而加速垃圾回收。
2、避免了OOM:
所有的运行时的内存使用只能通过Memory Pool申请内存,保证了其使用的内存大小是固定的,不会因为运行时数据结构和算法而发生OOM。在内存吃紧的情况下,算法(sort/join等)会高效地将一大批MemorySegment写到磁盘,之后再读回来。因此, OutOfMemoryErrors 可以有效地被避免。
3、节省内存空间:
Segment中只存储序列化后的二进制内容,就可以避免存java对象的部分消耗。
4、高效的二进制操作 & 缓存友好的计算。
序列化后的二进制数据以定义好的格式存储,可以高效地比较“与”操作。另外,该二进制形式可以把相关的值,以及hash值,键值和指针等相邻地放进内存中。这使得数据结构可以对高速缓存更友好,可以从 L1/L2/L3 缓存获得性能的提升。
5、Flink默认使用java8, G1垃圾回收。
1.Window的介绍
Event Time, 表示事件的产生事件,也就是event数据自带的一个timestamp字段。
Ingestion Time, 表示数据进入Flink集群的时间,也就是数据进入source operator的时间。
Processing Time, 表示window operator处理时候的服务器时间,属于系统时间,与数据本身无关,也就是在window窗口内计算完成的时间。(默认的Time Characteristic就是Processing Time)
以上三种Time,何时用Event Time,何时用Processing Time,这个要看具体的业务场景。对于Event Time中的Late Element,我们要注意Watermark。
Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理。而窗window就是从 Streaming 到 Batch 的一个桥梁。
在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个Window,用来收集最近一分钟内的数据,并对这个Window内的数据进行计算。
Window按驱动分类:Time Window,例如:每30秒钟。 Count Window,例如:每一百个元素。
Window按数据分类:Tumbling Window,数据无重叠等价于DataFlow中的Fixed Window,Sliding Window,数据有重叠,和Session Window,活动间隙。
Window Case: 淘宝网会记录每个用户每次购买的商品个数,我们要做的是统计不同window中用户购买商品的总数。raw data stream 代表用户的购买行为流,圈中的数字代表该用户本次购买的商品个数,事件是按时间分布的。因为用户的操作是间断的连续(操作几个操作后去浏览内容),所以可以看出事件之间是有time gap的。
1. Tumbling Time Window,统计每一分钟中用户购买的商品的总数。将用户的行为事件按每一分钟进行切分,每隔一分钟产生一个新的Window。Tumbling Time Window能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口。
2. Sliding Time Window,统计每30秒计算一次最近一分钟用户购买的商品总数。因此这种Window是不间断的,可以平滑地进行窗口聚合。在这种类型的Window中,一个元素可以对应多个窗口。
3. Tumbling Count Window,统计每100个用户购买行为事件统计购买总数。每当Window中填满100个元素了,就会对Window进行计算,并产生一个新的Window。
4. Sliding Count Window,统计每10个元素计算一次最近100个元素的总和。每当Window中填满了10个元素了,就会对Window进行计算,计算近100个元素的总和。
5. Session Window,用户交互事件流中,我们首先想到的是将event聚合到Session Window中(一段用户持续活跃的周期),由非活跃的间隙分隔开。统计每个用户在活跃期间总共购买的商品数量,如果用户30秒没有活动则视为会话断开。
Window 是在无限的流上定义了一个有限的元素集合。这个集合可以是基于时间的,元素个数的,时间和个数结合的,Session Gap的,或者是User Defined的。Flink 的 DataStream API 提供了简洁的算子来满足常用的窗口操作,同时提供了通用的窗口机制来允许用户自己定义窗口分配逻辑。
2.Window的组件
Flink 中定义一个Window主要需要以下三个组件:
(1)Window Assigner:用来决定某个元素被分配到哪个/哪些窗口中去。
(2)Trigger:触发器。决定了一个窗口何时能够被计算(fire)或清除(purge),每个窗口都会拥有Trigger。
(3)Evictor:可以译为“驱逐者”。在Trigger触发之后,在窗口被处理之前,Evictor(如果有Evictor的话)会用来剔除窗口中不需要的元素,相当于一个filter。
三个组件都位于一个window operator中,数据流进入window operator,每一个到达的元素都会被交给 WindowAssigner。WindowAssigner 会决定元素被放到哪个或哪些窗口(window)。
每一个Window都拥有一个属于自己的 Trigger,Trigger上会有定时器,用来决定一个Window何时能够被计算或清除。每当有元素加入到该Window中,或者之前注册的定时器超时了,那么Trigger都会被调用。Trigger的返回结果可以是 continue(time或count没到,不做任何操作),fire(time或count到了,处理窗口数据), purge(移除窗口和窗口中的数据),或者 fire + purge。对于 event time 来说,Window的fire是要等到大于Window结束时间的 watermark 到达,当watermark没有到,窗口会一直缓存着。所以基于这种机制,可以做到对Late Element乱序消息的支持。一个Window可以被重复计算多次直到它被 purge 了。在purge之前,Window会一直占用着内存。
Flink 对于一些聚合类的Window计算(如sum,min)做了优化,因为聚合类的计算不需要将Window中的所有数据都保存下来,只需要保存一个result值就可以了。每个进入Window的元素都会执行一次聚合函数并修改result值。这样可以大大降低内存的消耗并提升性能。但是如果用户定义了 Evictor,则不会启用对聚合窗口的优化,因为 Evictor 需要遍历窗口中的所有元素,必须要将窗口中所有元素都存下来。
Sliding Count Window,每2个元素计算一次最近4个元素的总和。图中所示的各个窗口逻辑上是不同的窗口,但在物理上是同一个窗口。该滑动计数窗口,trigger的触发条件是元素个数达到2个(每进入2个元素就会触发一次),evictor保留的元素个数是4个,每次计算完窗口总和后会保留剩余的元素。所以第一次触发trigger是当元素5进入,第三次触发trigger是当元素2进入,并驱逐5和2,计算剩余的4个元素的总和(22)并发送出去,保留下2,4,9,7元素供下个逻辑窗口使用。那么窗口工作示意图如下。
Sliding Time Window,每5秒计算最近10秒的元素之和。每个进入窗口的元素根据系统时间分配到(size / slide)个不同的窗口,并会在每个窗口上根据窗口结束时间注册一个定时器(相同窗口只会注册一份),当定时器超时时意味着该窗口完成了,这时会回调对应窗口的Trigger的onProcessingTime方法,返回FIRE_AND_PURGE,也就是会执行窗口计算并清空窗口。如下图所示横轴代表时间戳,第一条record会被分配到[-5,5)和[0,10)两个窗口中,当系统时间到5时,就会计算[-5,5)窗口中的数据,并将结果发送出去,最后清空窗口中的数据,释放该窗口资源。
3.Session窗口
当我们需要分析用户的一段交互的行为事件时,通常的想法是将用户的事件流按照“session”来分组。session 是指一段持续活跃的期间,由活跃间隙分隔开。消息之间的间隔小于sessionGap的,则被分配到同一个Window,间隔大于sessionGap的,则被分配到不同的Window。在Flink 1.1.0 版加入了这种session window 的引擎。
4.Window合并
Flink1.1.0之前,WindowAssigner 负责将元素分配到哪个/哪些Window中去,Trigger 决定了一个Window何时能够被计算或清除。当元素被分配到Window之后,这些Window是固定的不会改变的,而且Window之间不会相互作用。
Flink1.1.0之后开始支持Session Window,基本的思想是这样的: SessionWindows assigner 会为每个进入的元素分配一个窗口,该窗口以元素的时间戳作为起始点,时间戳加会话超时时间为结束点,也就是该窗口为 [timestamp, timestamp+sessionGap)。比如我们现在到了两个元素,它们被分配到两个独立的窗口中,两个窗口目前不相交。当第三个元素进入时,分配到的Window与现有的两个Window发生了叠加。
对于每一个新进入的元素,都会分配一个属于该元素的窗口,都会检查并合并现有的窗口。在触发窗口计算之前,每一次都会检查该窗口是否可以和其他窗口合并,直到trigger触发后,会将该窗口从窗口列表中移除。对于 event time 来说,窗口的触发是要等到大于窗口结束时间的 watermark 到达,当watermark没有到,窗口会一直缓存着。所以基于这种机制,可以做到对乱序消息的支持。
这里做了一个优化:因为每一个新进入的元素都会创建属于该元素的窗口,然后合并。如果新元素连续不断地进来,并且新元素的窗口一直都是可以和之前的窗口重叠合并的,那么其实这里多了很多不必要的创建窗口、合并窗口的操作,我们可以直接将新元素放到那个已存在的窗口,然后扩展该窗口的大小,看起来就像和新元素的窗口合并了一样。
5.事件清单
关于Event Time,需要指出的是:数据产生的时间,编程时首先就是要告诉Flink,哪一列作为Event Time列,同时分配时间戳(TimeStamp)并发出水位线(WaterMark),来跟踪Event Time。简单理解,就是以Event Time列作为时间。水位线既然是用来标记Event Time的,那么Event Time在产生时有可能因为网络或程序错误导致的时间乱序,即Late Element的产生,因此WaterMark分为有序与无序2种。
数据随着时间的流逝而产生,即数据的产生本是升序顺序的,当Flink采用Event Time作为时间模型时,理论上也应该是升序的数据不断的进行计算。但是突然有个“延迟的”数据进入到了Flink,此时时间Time window已过,普通的流式处理框架对于这个“延迟的”数据就不会被正确的计算,但是Flink提供了Watermark机制,虽然Time Window时间已经过了,但是依然不会fire触发计算,一直等待Watermark,直到Watermark到了才去计算。但是终究有一些Very Late Element,实在是太Late了,对于这些Very Late Element,Flink流处理可能无法实时正确计算,因为WarterMark不可能无限制的等待Late Element的到来,所以可以通过之后的批处理(batch)对已经计算的数据进行更正或每个数据自己trigger fire一个window。
6.水印
当我们使用Event Time作为timestamp的时候,一定会有乱序Late Element的可能。于是就有了Time skew的概念。在现实场景中,Processing Time总是落后于Event Time,这就造成了Time Skew。在Flink系统中,借鉴了Google DataFlow的思想,watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。
我们以Periodic watemark为例,使用Event Time作为业务timestamp,抽取timestamp生成periodic watermark。event time每3秒触发一次Sliding Time Window。
当第3条数据进入window operator的时候,其watermark已经升至19:34:24秒,正好是最早的一条记录所在window的window_end_time,所以window就被fire触发了。第5条数据进入window operator的时候,其watermark已经升至19:34:27秒,正好是其下一个window的window_end_time。当最后一条数据进入window operator的时候,这是一个Late Element,可以看到watermark时间以上一条相等,不会fire触发Window。
这里做一个说明: window的触发机制,是先按照自然时间将window划分,如果window大小是3秒,那么1分钟内会把window划分成[0,3),[3,6)[6,9)…., window的设定无关数据本身,而是系统定义好了的。
输入的数据中,根据自身的Event Time,将数据划分到不同的window中,如果window中有数据,则当watermark时间>=Event Time时,就符合了window触发的条件了,最终决定window触发,还是由数据本身的Event Time所属的window中的window_end_time决定。上面的例子中,最后一条数据到达后,其水位线已经升至19:34:24秒,正好是最早的一条记录所在window的window_end_time,所以window就被触发了。
7.Flink的SQL
应用变更的流到动态表:
1、追加模式:流中的每条记录对动态表而言都是一个“插入”更改。因此,流的所有记录都被追加到动态表中。
2、更新模式:流中的记录可以表示在动态表上的一个插入、更新或删除的更改(追加模式可看作更新模式的一个特例)。
从动态表中复原变更日志流:
1、重做模式:流会记录一个修改元素的新值以重现已完成的事务丢失的变更,这张模式相当于只记录变更的结果。
2、重做+撤销模式:流回记录一个修改过的元素的旧值和新值以撤销未完成的事务并重现已完成的事务丢失的变更,这种模式保留了记录变更的过程。
动态表概括了静态表的概念,SQL作为统一的方式来描述在批和流中的数据处理。在流中,SQL的执行是经由动态表来转化的过程。
1.异步I/O
当与外部系统交互时(例如,当用存储在数据库中的数据来丰富流事件时),需要注意与外部系统的通信延迟并不支配流应用程序的总工作。直接访问外部数据库的数据,例如在地图功能,通常意味着同步互动:发送一个请求到数据库和地图功能等到收到响应。在许多情况下,这种等待占了函数时间的绝大部分。与数据库的异步交互意味着单个并行函数实例可以同时处理多个请求,并同时接收响应。这样,等待的时间可以通过与其他请求和接收响应发送。至少,等待时间是在多个请求上进行摊销的。这导致在大多数情况下更高的流吞吐量。
1.监听器
Flink提供了一些基础指标的监控(集群状态、job相关指标)。我们的外部监控系统可以通过向Flink JobManager(port:8081)发送rest请求拿到对应的结果,请求和响应都是JSON格式。
1.Flink在实际应用中的表现
Flink集群在本公司的运行情况。
我分享的内容就到这里,谢谢大家!
Q1: 可以对一个Running的Job做一个动态的扩容吗?
A1: 对于从Kafka读取的数据的Job,可以重新再起一个Job来完成。如果你不想维护两个Job的话,你可以修改zookeeper的并行数量,然后重启zookeeper,重启集群,由于有容错机制,Job会自动重启,并不会出错。
Q2:CheckPonit具体使用场景?
A2:假如一个数据它在开始0秒的时候值是100并做了checkpoint操作,它在10秒的时候它的100个线程对这份数据做操作,其中有1个线程执行操作失败了,那么这份数据都会回到0秒时的执行checkpoint的后状态,并不会保留0到100之间的变化,下一次处理可以直接从这里这个时刻的数据状态开始处理。
Q3:Flink与Redis的怎样结合去使用?
A3:首先不建议把Redis相关的业务逻辑放到Flink的Job里面,应该把业务逻辑都布置到一个单独的服务中,然后结合结Flink的异步IO去请求这个独立的服务完成与Redis数据交互。
【嘉宾介绍】
王剑先生,东北大学硕士,高级Java工程师,现就职于TalkingData,对分布式高并发的后端服务、支付业务开发、大数据流式处理Flink有较深入的研究,曾经在用友软件、Zuora工作,及在Zuora负责Payment与Accounts Receivable的相关开发工作。
本文为海数据社区原创作品,未经授权不得转载,转载请与海数据小秘书联系
扫描“海数据学院”二维码获取更多资讯
以上是关于首发Flink新一代流式计算框架的体系架构及应用的主要内容,如果未能解决你的问题,请参考以下文章
Apache 流框架 Flink,Spark Streaming,Storm对比分析