数据倾斜导致子任务积压
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了数据倾斜导致子任务积压相关的知识,希望对你有一定的参考价值。
参考技术A 业务背景:一个流程中,有两个重要子任务:一是数据迁移,将kafka实时数据落Es,二是将kafka数据做窗口聚合落hbase,两个子任务接的是同一个Topic GroupId。上游Topic的 tps高峰达到5-6w。
问题描述:
给 24个 TaskManager(CPU) 都会出现来不及消费的情况。
问题原因:
做窗口聚合的任务的分组字段,分组粒度太小,hash不能打散,数据倾斜严重,导致少数TaskManager上压力过大,从而影响落Es的效率,导致背压。
解决方式:
将两个任务独立开来,作为不同的流程。
结果:
修改之前24个TaskManager(CPU) 来不及消费,改完之后20个CPU可完成任务。Kafka实时数据落Es的16个TaskManager,将kafka数据做窗口聚合落hbase的4个TaskManager。
另:
同样的数据、同样的Tps作为数据输入,Hbase的输出能力远超过Es,考虑实时任务落数据进Es要慎重。
Flink任务落Es时要考虑设置微批落数据,设置bulk.flush.max.actions和bulk.flush.interval.ms至合适值,否则影响吞吐量。
Flink并行度设置导致的负载倾斜 Key Groups 不均衡 计算最大并行度
1.概述
我们从一个简单的问题开始:在 Flink UI 中调查某个作业的子任务时,关于每个子任务处理的数据量,你可能会遇到如下这种奇怪的情况。
每个子任务的工作负载并不均衡
这表明每个子任务的算子没有收到相同数量的 Key Groups,它代表所有可能的 key 的一部分。如果一个算子收到了 1 个 Key Group,而另外一个算子收到了 2 个,则第二个子任务很可能需要完成两倍的工作。查看 Flink 的代码,我们可以找到以下函数:
* // 根据 maxParallelism、算子的并行度 parallelism 和 keyGroupId,
* // 计算 keyGroupId 对应的 subtask 的 index
*/
public static int computeOperatorIndexForKeyGroup(
int maxParallelism, int parallelism, int keyGroupId)
return keyGroupId * parallelism / maxParallelism;
其目的是将所有 Key Groups 分发给实际的算子。Key Groups 的总数由 maxParallelism 参数决定,而算子的数量和 parallelism 相同。这里最大的问题是 maxParallelism 的默认值,它默认等于 operatorParallelism + (operatorParallelism / 2) [4]。假如我们设置 parallelism 为10,那么 maxParallelism 为 15 (实际最大并发度值的下限是 128 ,上限是 32768,这里只是为了方便举例)。这样,根据上面的函数,我们可以计算出哪些算子会分配给哪些 Key Group。
在默认配置下,部分算子分配了两个 Key Group,部分算子只分配了 1 个
解决这个问题非常容易:设置并发度的时候,还要为 maxParallelism 设置一个值,且该值为 parallelism 的倍数。这将让负载更加均衡,同时方便以后扩展
。
搜嘎斯奈,我说我们公司的代码有一段是计算最大并行度的,然后设置了这个值,当初一直没找到相关的解释,也没去问,实在是不该呀。
以上是关于数据倾斜导致子任务积压的主要内容,如果未能解决你的问题,请参考以下文章