Flink 源码:从 KeyGroup 到 Rescale

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 源码:从 KeyGroup 到 Rescale相关的知识,希望对你有一定的参考价值。

参考技术A 本文仅为笔者平日学习记录之用,侵删
原文: https://mp.weixin.qq.com/s/J9MNkJ6uwNW7AReELTQeTw

通过阅读本文你能 get 到以下点:

Flink 中 KeyedState 恢复时,是按照 KeyGroup 为最小单元恢复的,每个 KeyGroup 负责一部分 key 的数据。这里的 key 指的就是 Flink 中 keyBy 中提取的 key。

每个 Flink 的 subtask 负责一部分相邻 KeyGroup 的数据,即一个 KeyGroupRange 的数据,有个 start 和 end(这里是闭区间)。

看到这里可能有点蒙,没关系后面有例子帮助读者理解这两个概念。

maxParallelism 表示当前算子设置的 maxParallelism,而不是 Flink 任务的并行度。maxParallelism 为 KeyGroup 的个数。

当设置算子的并行度大于 maxParallelism 时,有些并行度就分配不到 KeyGroup,此时 Flink 任务是无法从 Checkpoint 处恢复的。

如果设置了,就是设定的值。当然设置了,也需要检测合法性。如下图所示,Flink 要求 maxParallelism 应该介于 1 到 Short.MAX_VALUE 之间。

如果没有设置,则 Flink 引擎会自动通过 KeyGroupRangeAssignment 类的 computeDefaultMaxParallelism 方法计算得出,computeDefaultMaxParallelism 源码如下所示:

computeDefaultMaxParallelism 会根据算子的并行度计算 maxParallelism,计算规则:将算子并行度 * 1.5 后,向上取整到 2 的 n 次幂,同时保证计算的结果在最小值和最大值之间。

最小值 DEFAULT_LOWER_BOUND_MAX_PARALLELISM 是 2 的 7 次方 = 128。

最大值 UPPER_BOUND_MAX_PARALLELISM 是 2 的 15 次方 = 32768。

即:Flink 自动生成的 maxParallelism 介于 128 和 32768 之间。

新开发的 Job 业务数据量较小,所以初期设置的并行度也会很小。同时没有给每个 Job 主动设置 maxParallelism,根据上面的规则,Flink 自动生成的 maxParallelism 为 128,后期随着业务数据量暴涨,当 Job 的并发数调大 128 以上时,发现 Job 无法从 Checkpoint 或 Savepoint 中恢复了,这就是所谓的 "并发调不上去了"。当然可以选择不从状态恢复,选择直接启动的方式去启动任务。但是有些 Flink 任务对状态是强依赖的,即:必须从 State 中恢复,对于这样的 Job 就不好办了。

所以按照开发规范,应该结合业务场景主动为每个 Job 设置合理的 maxParallelism,防止出现类似情况。

根据 key 计算其对应的 subtaskIndex,即应该分配给哪个 subtask 运行,计算过程包括以下两步,源码都在相应的 KeyGroupRangeAssignment 类中:

computeKeyGroupForKeyHash 源码如下所示:

根据 Key 的 hash 值进行 murmurHash 后,对 maxParallelism 进行求余,就是对应的 KeyGroupIndex。

computeOperatorIndexForKeyGroup 源码如下所示:

假如 maxParallelism 为 50,parallelism 为 10,那么数据是如何分布的?

MathUtils.murmurHash(key.hashCode()) % maxParallelism:所有 key 的 hashCode 通过 Murmurhash 对 50 求余得到的范围为 0~49,也就是说:总共有 keyGroupId 为 0~49 的这 50 个 KeyGroup。

subtask 与 KeyGroupId 对应关系:

这里我们看到了每个 subtask 对应一个 KeyGroupRange 的数据,且是闭区间。

计算某个并行度上负载哪些 KeyGroup?等价于求某个 subtask 负载的 KeyGroupRange。

在 KeyGroupRangeAssignment 类中有 computeKeyGroupRangeForOperatorIndex 方法可以完成这个操作:

如下图所示是 Flink 依赖 KeyGroup 修改并发的 Rescale 过程(并发度从 3 改成 4):

由图中可得知 key 的范围是 0~19, maxParallelism = 10。

0->0,10 表示 key 为 0 和 10 的数据,对应的 KeyGroupId 为 0。

1->1,11 表示 key 为 1 和 11 的数据,对应的 KeyGroupId 为 1。

以此类推。。。

并发度是 3:

并发度是 4:

KeyGroup 的数量为 maxParallelism,一旦 maxParallelism 变了,说明 KeyGroup 的分组完全变了,而 KeyedState 恢复是以 KeyGroup 为最小单元的,所以 maxParallelism 改变后,任务将无法恢复。在 Checkpoint 恢复过程中也会对新旧 Job 的 maxParallelism 进行检查匹配,如果某个算子的 maxParallelism 变了,则任务将不能恢复。

本文主要介绍了 KeyGroup、KeyGroupRange 和 maxParallelism 的一些概念,及他们之间的关系。最后讲述了改并发的情况状态的 Rescale 流程。其实在 Flink 内部不只是状态恢复时需要用到 KeyGroup,数据 keyBy 后进行 shuffle 数据传输时也需要按照 KeyGroup 的规则来将分配数据,将数据分发到对应的 subtask 上。

本文比较简单,主要是为后续 State 恢复流程做一个铺垫。

以上是关于Flink 源码:从 KeyGroup 到 Rescale的主要内容,如果未能解决你的问题,请参考以下文章

Flink 的键组 KeyGroup 与 缩放 Rescale

Flink 的键组 KeyGroup 与 缩放 Rescale

Flink源码分析

FlinkFlink中Key Groups与最大并行度

FlinkFlink状态的缩放(rescale)与键组(Key Group)设计

Flink 源码解析 —— 项目结构一览