如何计算Flink集群规模:信封背计算法

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何计算Flink集群规模:信封背计算法相关的知识,希望对你有一定的参考价值。

参考技术A 2017年柏林Flink Forward大会上Robert Metger的"Keep It Going: How to Reiably and Efficiently Operate Apache Flink"的演讲很受欢迎。Robert的其中一个主题演讲涉及到了如何估算Flink集群规模。Flink Forward大会的观众们认为这个计算方法对他们很有用,因此我们把他的演讲主题转变成这篇博客。

Flink社区上经常被问起的一个问题是当从开发转到线上时,如何估算集群规模大小。当然,最准确的的答案是根据需要,但是这并没有什么用。这篇博客提出了一系列问题使你能够计算出一些基准。

首先,思考一下你的应用操作需要的资源基准的指标。

关键指标如下:

最后,考虑一下你的服务等级协议(SLAS),比如宕机时间、延迟和最大的吞吐量。这些指标将直接影响你的容量计算。

接下来,看一下基于预算可用的资源大小。比如:

基于上述这些因素,你现在能够估算正常流程的的资源基准。另外,还需要增加一些资源用作异常的恢复和checkpointing。

我现在通过一个集群上的虚拟job部署来描述整个资源基准的建立过程。信封背计算法的所用到的数字是不精准的,同时并没有考虑的很全面。在后面,我会指出在做计算时的忽视的一些点。

在这个案列中,我将部署典型的Flink流式应用,Kafka Topic的数据作为数据源。这个流接着使用keyed, aggregating window操作转换。窗口操作执行5分钟的窗口聚合。同时假设有源源不断的数据进来,window被设置成1分钟滑动一次。

这表示每分钟执行一次过去5分钟内的窗口聚合。这个流式应用根据userId字段进行聚合。Kafka Topic中消息的大小平均是2KB。

吞吐量是每秒100万条消息。为了理解窗口操作的state大小,你需要知道distinct Keys的数量,就是userIds的数量,这边大约是5000万个不同的用户ID。对于每个用户,你需要计算4个数字,通过longs(8 byte)存储。

现在,让我们总结一下这个任务的关键指标:

— 硬件:

— Kafka 独立部署

总共有5台机器运行这个job,每台机器上运行一个TaskManager。磁盘通过网络挂载,同时有10 gigabit的以太网接入。同时Kafka是独立部署在其他机器上。

每台机器有16CPU核。为了简化的需要,这边不考虑CPU和内存的使用情况。在实际情况下,你需要根据应用逻辑和state backend的使用,来考虑内存的使用。这个例子使用RocksDB state backend。(它是健壮的,同时对内存需求比较低)。

为了理解整个job运行部署的资源需求,最容易的方式是关注单台机器和TaskManager的操作。你可以通过单台机器计算出来的数字来推断整个集群的资源需求。

默认(所有的操作都有并行度和没有特殊的调度限制)所有的操作在每台机器上都有运行。

在这个例子中,Kafka source, 窗口操作和Kafka sink都运行在每台机器上。

keyBy是一个分离的操作,因此资源需求计算比较容易。在现实中,keyBy是一个API,连接了Kafak Source和窗口操作。

我现在将从头到底理解这些操作的网络资源需求。

为了计算Kafka source收到的数据量,首先需要计算Kafka的聚合输入。sources每秒收到100万消息,每条消息2KB大小。

2GB/s除以5台机器,得到如下结果:

集群中每台机器上TaskManager的source收到400MB/s的数据。

接下来,你需要确保同一个key的所有事件落在一些机器上。这边,你从kafka中读取的数据可能被重新分区。
shuffle过程发送所有拥有相同key的数据到同一台机器,因此这边把400MB/s的数据分割成一个根据userId分区的流。

平均来看,你将发送80MB/s的数据到每一台机器。这个分析是从单台机器的角度,但是一些数据已经在目标机器上了,因此要减去80MB/s。

接下去的问题是窗口操作发送多少数据到Kafka Sink。结果是67MB/s,让我们看一下如何计算。
窗口操作为每个key保持了4个数字(longs)聚合。每一分钟,操作将发送当前的聚合值。每个key发送2ints(user_id, window_ts)和4 longs。

然后乘以keys数量(500000000除以机器数量)

然后计算每秒的大小:

这表示每个TaskManager从窗口操作中平均发送67MB/s的用户数据。因为Kafka sink运行在每个TaskManager上,所以没有进一步的分区操作。这就是从Flink到Kafka的发送的数据量。

从窗口操作中得到的数据每分钟会发送一次。在实际中,这个操作不会发以67MB/s的发送数据,而是在一分钟之内的几秒间到达最大带宽。

现在,总结一下:

到目前为止,我们仅仅计算了Flink处理的用户数据。你同时还需要考虑磁盘的使用,比如存储state 和checkpointing。为了计算磁盘的花销,你需要查看窗口计算如何进入state。Kafka Source也需要保持一些state,但是跟窗口操作的state相比,可以忽略不计。

为了理解窗口操作的state大小,让我们换一个角度看这个问题。Flink计算5分钟的时间窗口,并且1分钟滑动一次。Flink是通过保持5个窗口来实现滑动窗口。根据先前提到的,在使用窗口时,你需要为每个窗口保持40bytes的状态,并且窗口是提前聚合的。对于每一条到来的事件,你首先需要取出当前聚合值,再更新聚合值,然后把新值写回去。

这意味着:

有40MB/s的磁盘读写(每台机器上)。根据先前说的,磁盘是通过网络挂载的。因此需要在先前的基础上增加这个值。

现在总共需要的资源如下:

上述的计算只考虑了事件到达窗口操作时触发时state的进入。此外,你还需要checkpoint和容错机制。因为,如果一台机器或者其他任何东西挂掉,你需要恢复你的窗口并继续处理。

根据先前所说,Checkpointing是每隔1分钟执行一次,并且每个checkpoint会复制整个job的状态到(通过网络挂载)文件系统。

现在,让我们快速的计算一下每台机器的state大小:

接着算每秒的值:

和窗口操作类似,checkpointing也是每分钟执行一次。它尝试全速发送数据到外部存储。Checkpointing引起了额外的state进入。(自从Flink1.3后,RocksDB支持增量checkpointing来降低每次checkpoint时所需的网络传输。)

计算更新如下:

这意味着整个集群网络流量是:

400是80MB的state读写乘以5台机器。2335是Kafka进和出的总值。

整个硬件的网络可用容量如下:

免责声明:上述这些计算没有包含协议的花销,比如TCP、Ethernet和RPC(在Flink、Kafka和HDFS等中)。但是上述的计算仍旧对如何计算一个job的资源有指导意义。

基于我的分析,这个例子中,5个节点的集群,在典型的操作中,每个机器需要处理760MB/s的数据进出,同时每台机器可以处理的容量是1250MB/s。这样保留了40%的网络容量来应对我刚才提到的复杂度,比如网络协议花销,事件重放,数据倾斜引起的不平均的负载等。

当然,没有一个标准答案来说明留40%的余量是否合适。但是这个算法可以给你一个好的开始。尝试上述的计算,修改上述的参数为你自己的参数。Happy scaling!

How To Size Your Apache Flink® Cluster: A Back-of-the-Envelope Calculation

以上是关于如何计算Flink集群规模:信封背计算法的主要内容,如果未能解决你的问题,请参考以下文章

Flink实战之StreamingFileSink如何写数据到其它HA的Hadoop集群

集群规模计算

Flink流计算开发

Flink 入门

flink01

FlinkWhat is Apache Flink?