数据倾斜导致子任务积压

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了数据倾斜导致子任务积压相关的知识,希望对你有一定的参考价值。

参考技术A 业务背景:

一个流程中,有两个重要子任务:一是数据迁移,将kafka实时数据落Es,二是将kafka数据做窗口聚合落hbase,两个子任务接的是同一个Topic GroupId。上游Topic的 tps高峰达到5-6w。

问题描述:

给 24个 TaskManager(CPU) 都会出现来不及消费的情况。

问题原因:

做窗口聚合的任务的分组字段,分组粒度太小,hash不能打散,数据倾斜严重,导致少数TaskManager上压力过大,从而影响落Es的效率,导致背压。

解决方式:

将两个任务独立开来,作为不同的流程。

结果:

修改之前24个TaskManager(CPU) 来不及消费,改完之后20个CPU可完成任务。Kafka实时数据落Es的16个TaskManager,将kafka数据做窗口聚合落hbase的4个TaskManager。

另:

同样的数据、同样的Tps作为数据输入,Hbase的输出能力远超过Es,考虑实时任务落数据进Es要慎重。

Flink任务落Es时要考虑设置微批落数据,设置bulk.flush.max.actions和bulk.flush.interval.ms至合适值,否则影响吞吐量。

Flink并行度设置导致的负载倾斜 Key Groups 不均衡 计算最大并行度

1.概述

转载:使用 Flink 前需要知道的 10 个『陷阱』

我们从一个简单的问题开始:在 Flink UI 中调查某个作业的子任务时,关于每个子任务处理的数据量,你可能会遇到如下这种奇怪的情况。


每个子任务的工作负载并不均衡

这表明每个子任务的算子没有收到相同数量的 Key Groups,它代表所有可能的 key 的一部分。如果一个算子收到了 1 个 Key Group,而另外一个算子收到了 2 个,则第二个子任务很可能需要完成两倍的工作。查看 Flink 的代码,我们可以找到以下函数:

   * // 根据 maxParallelism、算子的并行度 parallelism 和 keyGroupId,
     * // 计算 keyGroupId 对应的 subtask 的 index
     */
    public static int computeOperatorIndexForKeyGroup(
            int maxParallelism, int parallelism, int keyGroupId) 
        return keyGroupId * parallelism / maxParallelism;
    

其目的是将所有 Key Groups 分发给实际的算子。Key Groups 的总数由 maxParallelism 参数决定,而算子的数量和 parallelism 相同。这里最大的问题是 maxParallelism 的默认值,它默认等于 operatorParallelism + (operatorParallelism / 2) [4]。假如我们设置 parallelism 为10,那么 maxParallelism 为 15 (实际最大并发度值的下限是 128 ,上限是 32768,这里只是为了方便举例)。这样,根据上面的函数,我们可以计算出哪些算子会分配给哪些 Key Group。

在默认配置下,部分算子分配了两个 Key Group,部分算子只分配了 1 个

解决这个问题非常容易:设置并发度的时候,还要为 maxParallelism 设置一个值,且该值为 parallelism 的倍数。这将让负载更加均衡,同时方便以后扩展

搜嘎斯奈,我说我们公司的代码有一段是计算最大并行度的,然后设置了这个值,当初一直没找到相关的解释,也没去问,实在是不该呀。

以上是关于数据倾斜导致子任务积压的主要内容,如果未能解决你的问题,请参考以下文章

Spark(19)——数据倾斜问题

大数据SQL优化之数据倾斜解决案例全集

Hive 数据倾斜

大数据SQL优化之数据倾斜解决案例全集

16Hive数据倾斜与解决方案

hive数据倾斜及处理