DataSkew —— 数据倾斜问题解析及解决方案实践总结小记

Posted 扫地增

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了DataSkew —— 数据倾斜问题解析及解决方案实践总结小记相关的知识,希望对你有一定的参考价值。

文章目录

注意,要区分开数据倾斜与数据量过量这两种情况,数据倾斜是指少数task被分配了绝大多数的数据,因此少数task运行缓慢;数据过量是指所有task被分配的数据量都很大,相差不多,所有task都运行缓慢。

什么是数据倾斜

简单的讲,数据倾斜就是我们在计算数据的时候,数据的分散度不够,导致大量的数据集中到了一台或者几台机器上计算,这些数据的计算速度远远低于平均计算速度,导致整个计算过程过慢。
hash的时候,如果有一个值特别多,那么hash的操作是均分不掉数据的,只能放到一个机器上去处理,这就是数据倾斜。
相信大部分做过数据的朋友都会遇到数据倾斜,数据倾斜会发生在数据开发的各个环节中,比如:

  • Hive算数据的时候reduce阶段卡在99.99%
  • 用SparkStreaming做实时算法时候,一直会有executor出现OOM的错误,但是其余的executor内存使用率却很低

数据倾斜有一个关键因素是数据量大,可以达到千亿级。其实所谓的数据倾斜就是数据分布不均匀。

数据倾斜的现象

Hadoop中的数据倾斜

Hadoop中直接贴近用户使用使用的时Mapreduce程序和Hive程序,虽说Hive最后也是用MR来执行(至少目前Hive内存计算并不普及),但是毕竟写的内容逻辑区别很大,一个是程序,一个是Sql,因此这里稍作区分。Hadoop中的数据倾斜主要表现在ruduce阶段卡在99.99%,一直99.99%不能结束。

这里如果详细的看日志或者和监控界面的话会发现:

  • 有一个多几个reduce卡住
  • 各种container报错OOM
  • 读写的数据量极大,至少远远超过其它正常的reduce

伴随着数据倾斜,会出现任务被kill等各种诡异的表现。

Spark中的数据倾斜

Spark中的数据倾斜也很常见,这里包括Spark StreamingSpark Sql,表现主要有下面几种:

  • Executor lostOOMShuffle过程出错
  • Driver OOM
  • 单个Executor执行时间特别久,整体任务卡在某个阶段不能结束
  • 正常运行的任务突然失败

补充一下,在Spark streaming程序中,数据倾斜更容易出现特别是在程序中包含一些类似sql的join、group这种操作的时候因为Spark Streaming程序在运行的时候,我们一般不会分配特别多的内存,因此一旦在这个过程中出现一些数据倾斜,就十分容易造成OOM。

Hive中的数据倾斜

任务进度长时间维持在99%(或100%),查看任务监控页面,发现只有少量(1个或几个)reduce子任务未完成。因为其处理的数据量和其他reduce差异过大。单一reduce的记录数与平均记录数差异过大,通常可能达到3倍甚至更多。 最长时长远大于平均时长。
经验: Hive的数据倾斜,一般都发生在SqlGroup ByJoin On上,而且和数据逻辑绑定比较深。

数据倾斜产生的原因

Hive数仓为何会出现DateSkew

数仓中发生数据倾斜的根本原因就是:某值的的数量过多。导致谋职的数量过多的原因有三个。
产生原因:

  • key 分布不均匀
  • 业务数据本身的特性
  • 空值,大量的游客用户登录访问。
  • 数据类型不匹配,使用从mysql数据导过来的数据和hive数仓里的数据做联合查询的时候,可能某个字段mysql里的是int类型,hive里是string类型,这些string类型的数据就会积压在一起。
  • 表结构本身有问题,比如,这个地区字段,都是市,北京市是一个值,太原市是一个值,到时候肯定会北京市的数据量特别大导致数据倾斜。这个例子比较极端,只是想说地区人口差异很大,数据量也会有很大差距。

Spark为何会出现DateSkew

数据倾斜的原因:

出现数据倾斜的原因,基本只可能是因为发生了shuffle操作,在shuffle的过程中,出现了数据倾斜的问题。因为某个或者某些key对应的数据,远远的高于其他的keyShuffle数据之后导致数据分布不均匀,但是所有节点的机器的性能都是一样的,程序也是一样的,就是数据量不一致,所以决定了task的执行时长就被数据量决定了。

数据分区的策略:

  • 随机分区: 每一个数据分配的任意一个分区的概率是均等的
  • Hash分区: 使用数据的Hash分区值,%分区数。(导致数据倾斜的原因)
  • 范围分区: 将数据范围划分,数据分配到不同的范围中(分布式的全局排序)

定位数据倾斜问题

  1. 查阅代码中会产生shuffle的算子,例如distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等算子,根据代码逻辑判断此处是否会出现数据倾斜。
  2. 查看Spark作业的log文件,log文件对于错误的记录会精确到代码的某一行,可以根据异常定位到的代码位置来明确错误发生在第几个stage(哪一个stage生成的task特别慢),通过stage定位到对应的shuffle算子是哪一个,从而确定是什么地方发生数据倾斜。

查看数据倾斜的key的分布情况:

//使用spark中的抽样算子sample,查看相应的key的分布
val sampledPairs = pairs.sample(false, 0.1)  //抽样
val sampledWordCounts = sampledPairs.countByKey()
sampledWordCounts.foreach(println(_))

数据倾斜产生的原理

如果大家对MapReduceshuffle过程或者Sparkshuffle很熟悉的话完全不用看这个图的,因为图片也是笔者随便找的本不想放在这里。多说一句以MR为例就是由于shuffle后数据经过HashPartitioner进行分组后形成数据集的数量均衡造成的。数据分组效果大致如下图:
在这里插入图片描述

产生数据倾斜的操作

关键操作情形结果
Join其中一个表较小,但是key集中分发到某一个或几个Reduce上的数据远高于平均值
大表与大表,但是分桶的判断字段0值或者空值过多这些空值都由一个redice处理,很慢
group bygroup by维度过小,某值得数量过多处理某值的reduce十分耗时
Count(DISTINCT XXX)某特殊值过多处理此特殊值的reduce十分耗时
reduceByKey某值得数量过多处理某值的reduce十分耗时
countByKey某值得数量过多处理某值的reduce十分耗时
groupByKey某值得数量过多处理某值的reduce十分耗时

不同情形倾斜数据处理方案

Hql和SparkSql中处理

Join

小表join大表

主要考虑使用Map Join代替Reduce Join
使用map join让小的维度表(1000条以下的记录条数) 先进内存。在map端完成reduce
由于笔者之前总结过map join如何使用,这里就不过多陈述。可以参照hive sql数据倾斜——大表join小表如何使用map join

大表join大表skewjoin
  • key存在无效值时:
    空key变成一个字符串加上随机数,把倾斜的数据分到不同的reduce上,由于null值关联不上,处理后并不影响最终结果。处理方式如下:
SELECT *
FROM log a
LEFT JOIN bmw_users b 
ON CASE WHEN a.user_id IS NULL THEN concat(‘dp_hive’,rand()) >ELSE a.user_id END = b.user_id;
  • key值都是有效值时
    可使用hive配置:
-- 指定是否开启数据倾斜的join运行时优化,默认不开启即false。
set hive.optimize.skewjoin=true;
-- 判断数据倾斜的阈值,如果在join中发现同样的key超过该值,则认为是该key是倾斜key。默认100000。
-- 一般可以设置成处理的总记录数/reduce个数的2-4倍。
set hive.skewjoin.key=100000;
-- 指定是否开启数据倾斜的join编译时优化,默认不开启即false。
set hive.optimize.skewjoin.compiletime=true;

具体来说,会基于存储在原数据中的倾斜key,在编译时为导致倾斜的key单独创建执行计划,而其他key也有一个执行计划用来join。然后,对上面生成的两个join执行后求并集。因此,除非相同的倾斜key同时存在于这两个join表中,否则对于引起倾斜的keyjoin就会优化为map-side join。此外,该参数与hive.optimize.skewjoin之间的主要区别在于,此参数使用存储在metastore中的倾斜信息在编译时来优化执行计划。如果元数据中没有倾斜信息,则此参数无效。一般可将这两个参数都设为true。如果元数据中有倾斜信息,则hive.optimize.skewjoin不做任何操作。

数据类型不匹配

数据类型不匹配,可以直接把两个表的主键直接统一。

select * 
from users a
left outer join logs b
on a.usr_id = cast(b.user_id as string)
关联主键含有大量空key

这种情况需要按照场景进行处理,分为单条件联合多条件联合查询。如果是多条件联合查询可以将多个主键进行拼接产生新的主键来尽可能避免主键空key的情况,当然这种情况需要与业务数据紧密结合,尽可能多的确保数据中多个关连键有一个不为空,是的空key数量保持在正常分组水平,也可以极大提高查询性能。如果实在不能确保那么我们应该对每一个关联主键按照单条件关联的方式进行处理。实例如下:

--发生数据倾斜的SQL
select *
from advertisement_log log
left join user_center user
on log.open_id = user.open_id
and log.union_id = user.union_id

--调整后的sql
select *
from advertisement_log log
left join user_center user
on concat_ws('_',log.open_id,log.union_id) = concat_ws('_',user.open_id,user.union_id)

如果是单条件联合查询建议使用如下实例两个方式:

解决方式1:user_id为空的不参与关联

SELECT *
FROM log a
JOIN bmw_users b
  ON a.user_id IS NOT NULL
 AND a.user_id = b.user_id
UNION ALL
SELECT *
FROM log a
WHERE a.user_id IS NULL;

解决方式2:赋与空值分新的key值

SELECT *
FROM log a
LEFT JOIN bmw_users b 
ON CASE WHEN a.user_id IS NULL THEN concat(‘dp_hive’,rand()) ELSE a.user_id END = b.user_id;

结论:

方法2比方法1效率更好,不但io少了,而且作业数也少了。解决方法1 log表被读取了两次,jobs2。这个适合优化无效 id(比如-99 , ’’, null等) 产生的倾斜问题。把空key 变成一个字符串加上随机数,就能把倾斜的数据分到不同的reduce上 ,解决数据倾斜问题。

关联主键中少部分key的数据量巨大

在业务逻辑优化效果的不大情况下,有些时候是可以将倾斜的数据单独拿出来处理。最后union回去。

这种情况笔者是在对日志表表数据做处理时遇到的,主要是不同埋点的日志数据,热埋点和冷埋点被点击的数据量偏差非常巨大,相差四五个数据级。笔者是按照需求设计埋点表字典表,按照数据量的级别对埋点进行划分,分别进行join,最后使用union all将数据进行合并。这种处理方式需要对数据进行深入的分析,但是处理比较简单有效,原理和空key的解决方式1类似。实例如下:

字典表:

CREATE EXTERNAL TABLE dim.event_v1`
(
   `event_id` STRING COMMENT '事件ID', 
   `describe` STRING COMMENT '事件(埋点)描述', 
   `district` STRING COMMENT '事件区域', 
   `part` STRING COMMENT '事件所属部分', 
   `action` STRING COMMENT '行为[点击,展示]', 
   `work_table` STRING COMMENT '使用表'
)
STORED AS textfile
LOCATION '/big-data/dim/event_v1';

解决方式:

SELECT /*+MAPJOIN(event)*/
   user_id,
   app_id,
   provice,
   city,
   district
FROM fct.log log
JOIN (
     SELECT 
     FROM dim.event_1
     WHERE work_table = 'big_event_log'
)event
ON log.event_id = event.event_id
UNION ALL
SELECT /*+MAPJOIN(event)*/
   user_id,
   app_id,
   provice,
   city,
   district
FROM fct.log log
JOIN (
     SELECT 
     FROM dim.event_1
     WHERE work_table = 'small_event_log'
)event
ON log.event_id = event.event_id
Join驱动表选取和数据量优化:
  1. 关于驱动表的选取,选用join key分布最均匀的表作为驱动表
  2. 做好列裁剪和filter操作,以达到两表做join的时候,数据量相对变小的效果。
left semi join

这种join的应用场景主要是用来代替sql中的in,来提升性能,使用于大表join小表的一些场景中,如果关联的结果只需要保留左表中的字段数据且重复数据在结果中只需要出现一次,那么这个时候就可以使用left semi join来避免因为关联主键某些值大量出现引起的倾斜。在后续的文章中笔者会对left semi join用法进行总结。

Join 倾斜总结

关于join的倾斜总结目前笔者遇到基本就是这些,如果后续遇到其他情况笔者在更。

Count(DISTINCT XXX)

count distinct时,会将值为空的情况单独处理,比如可以直接过滤空值的行,在最后结果中加1。如果还有其他计算,需要进行group by,可以先将值为空的记录单独处理,再和其他计算结果进行union
能先进行 group操作的时候先进行group操作,把key先进行一次 reduce,之后再进行 count或者distinct count 操作。

group by

注:group by优于distinct group
采用sum() + group by的方式来替换count(distinct)完成计算。

增加Reuducer个数

默认是由参数hive.exec.reducers.bytes.per.reducer来推断需要的Reducer个数。可通过mapred.reduce.tasks控制。

调优

hive.map.aggr=true
--开启map端combiner
set hive.map.aggr=true

开启map combiner。在map中会做部分聚集操作,效率更高但需要更多的内存。
假如map各条数据基本上不一样, 聚合没什么意义,做combiner反而画蛇添足,hive里也考虑的比较周到通过参数可以进行相关的设置:

hive.groupby.mapaggr.checkinterval = 100000 (默认)
hive.map.aggr.hash.min.reduction=0.5(默认)
hive.groupby.skewindata=true
--开启数据倾斜时负载均衡
set hive.groupby.skewindata=true;

就是先随机分发并处理,再按照key group by来分发处理。
当选项设定为true,生成的查询计划会有两个MRJob

  • 第一个MRJob中,Map的输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的GroupBy Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;
  • 第二个MRJob再根据预处理的数据结果按照GroupBy Key分布到Reduce中(这个过程可以保证相同的原始GroupBy Key被分布到同一个Reduce中),最后完成最终的聚合操作。
    它使计算变成了两个mapreduce,先在第一个中在shuffle过程 partition 时随机给 key 打标记,使每个key随机均匀分布到各个 reduce 上计算,但是这样只能完成部分计算,因为相同key没有分配到相同reduce上。所以需要第二次的mapreduce,这次就回归正常shuffle,但是数据分布不均匀的问题在第一次mapreduce已经有了很大的改善,因此基本解决数据倾斜。因为大量计算已经在第一次mr中随机分布到各个节点完成。

增加并行度

  • 场景: 两个大表,数据分布均匀,为了提高效率,使用mapjoin,采用切分大表的方法。
  • 方法: 采用将大表切分为小表,然后进行连接。

原始测试表

+----------+------------+
| test.id  | test.name  |
+----------+------------+
| 1        | aa         |
| 2        | bb         |
| 3        | cc         |
| 4        | dd         |
+----------+------------+

将其切分为两个:

 select * from test tablesample(bucket 1 out of 2 on id);

+----------+------------+
| test.id  | test.name  |
+----------+------------+
| 2        | bb         |
| 4        | dd         |
+----------+------------+
 select * from test tablesample(bucket 2 out of 2 on id);

+----------+------------+
| test.id  | test.name  |
+----------+------------+
| 1        | aa         |
| 3        | cc         |
+----------+------------+

切分为四个:

select * from test tablesample(bucket 1 out of 4 on id);

+----------+------------+
| test.id  | test.name  |
+----------+------------+
| 4        | dd         |
+----------+------------+
select * from test tablesample(bucket 2 out of 4 on id);
+----------+------------+
| test.id  | test.name  |
+----------+------------+
| 1        | aa         |
+----------+------------+
select * from test tablesample(bucket 3 out of 4 on id);

+----------+------------+
| test.id  | test.name  |
+----------+------------+
| 2        | bb         |
+----------+------------+
select * from test tablesample(bucket 4 out of 4 on id);
+----------+------------+
| test.id  | test.name  |
+----------+------------+
| 3        | cc         |
+----------+------------+

tablesample(bucket 3 out of 4 on id),其中tablesample为关键字,bucket关键字,3为要去的分表,4为拆分表的数目,id拆分依据。

*多表 union all 会优化成一个 job

推广效果表要和商品表关联,效果表中的 auction id 列既有商品 id,也有数字 id,和商品表关联得到商品的信息。

SELECT *
FROM effect a
JOIN (
      SELECT auction_id AS auction_id
      FROM auctions
      UNION ALL
      SELECT auction_string_id AS auction_id
      FROM auctions
     ) b
ON a.auction_id = b.auction_id;

这样子比分别过滤数字 id,字符串 id,然后分别和商品表关联性能要好。这样写的好处:1个 MR 作业,商品表只读取一次,推广效果表只读取一次。把这个 sql 换成 MR 代码的话,map 的时候,把 a 表的记录打上标签 a ,商品表记录每读取一条,打上标签 t,变成两个<key,value> 对,<t,数字id,value>,<t,字符串id,value>。所以商品表的 HDFS(Hadoop Distributed File System) 读只会是一次。

消灭子查询内的 group by

原写法:

SELECT *
FROM (
     SELECT *
     FROM t1
     GROUP BY c1, c2, c3
     UNION ALL                                                                                                             
     SELECT *
     FROM t2
     GROUP BY c1, c2, c3
     ) t3
GROUP BY c1, c2, c3;

优化写法:

SELECT *
FROM (
      SELECT *
      FROM t1
      UNION ALL
      SELECT *
      FROM t2
     ) t3
GROUP BY c1, c2, c3;

从业务逻辑上说,子查询内的 group by 功能与外层的 group by 重复,除非子查询内有 count(distinct)。经过测试,并未出现 union allhive bug,数据是一致的。MR 的作业数由3减少到1t1相当于一个目录,t2相当于一个目录,对map reduce程序来说,t1,t2可以做为 map reduce 作业的 mutli inputs。这可以通过一个 map reduce 来解决这个问题。Hadoop的计算框架,不怕数据多,怕作业数多。

消灭子查询内的count(distinct),max,min

原写法:

SELECT c1, c2, c3, sum(pv)
FROM (
    SELECT c1, c2, c3, COUNT(c4)
    FROM t1
    GROUP BY c1, c2, c3
    UNION ALL
    SELECT c1, c2, c3, COUNT(DISTINCT c4)
    FROM t2
    GROUP BY c1, c2, c3
) t3
GROUP BY c1, c2, c3;

这种我们不能直接uniongroup by,因为其中有一个表的操作用到了去重,这种情况,我们可以通过建立临时表来消灭这种数据倾斜问题。

优化写法:

INSERT INTO t4
SELECT c1, c2, c3, COUNT(DISTINCT c4)
FROM t2
GROUP BY c1, c2, c3;

SELECT c1, c2, c3, SUM(pv)
FROM (
    SELECT c1, c2, c3, COUNT(c4)
    FROM t1
    UNION ALL
    SELECT *
    FROM t4
) t3
GROUP BY c1, c2, c3;

reduce的时间过长

假设一个memberid对应的log里有很多数据,那么最后合并的时候,也是十分耗时的,所以,这里需要找到一个方法来解决这种reduce分配不均的问题。

解决方法:

SELECT *
FROM log a
LEFT JOIN (
           SELECT memberid, number
           FROM users d
           JOIN num e
        ) b
ON a.memberid = b.memberid
AND mod(a.pvtime, 30) + 1 = b.number;

解释一下,上面的num是一张1列30行的表,对应1-30的正整数,把users表膨胀成N份(基于倾斜程度做一个合适的选择),然后把log数据根据memberid和pvtime分到不同的reduce里去,这样可以保证每个reduce分配到的数据可以相对均匀

过多的where条件

有的时候,我们会写超级多的where条件来限制查询,其实这样子是非常低效的,主要原因是因为这个and条件hive在生成执行计划时产生了一个嵌套层次很多的算子。

  • 解决方案:
    1)把筛选条件对应的值写入一张小表,再一次性join到主表;
    2)或者写个udfuser-defined function,用户定义函数),把这些预设值读取进去,udf来完成这个and数据过滤操作。

分组结果很多,但是你只需要topK

原写法:

SELECT mid, url, COUNT(1) AS cnt
FROM (
      SELECT *
      FROM r_atpanel_log
      WHERE pt = '20190610'
      AND pagetype = 'normal'
    ) subq
GROUP BY mid, url
ORDER BY cnt DESC
LIMIT 15;

优化写法:

SELECT *
FROM (
      SELECT mid, url, COUNT(1) AS cnt
      FROM (
            SELECT *
            FROM r_atpanel_log
            WHERE pt = '20190610'
            AND pagetype = 'normal'
          ) subq
      GROUP BY mid, url
) subq2
WHERE cnt > 100
ORDER BY cnt DESC
LIMIT 15;

可以看出,我们先过滤掉无关的内容,再进行排序,这样子快很多。

Spark解决数据倾斜具体方法

解决方案一:使用Hive ETL预处理数据

方案适用场景:

导致数据倾斜的是Hive表。如果该Hive表中的数据本身很不均匀(比如某个key对应了100万数据,其他key才对应了10条数据),而且业务场景需要频繁使用SparkHive表执行某个分析操作,那么比较适合使用这种技术方案。

方案实现思路:

此时可以评估一下,是否可以通过Hive来进行数据预处理(即通过Hive ETL预先对数据按照key进行聚合,或者是预先和其他表进行join),然后在Spark作业中针对的数据源就不是原来的Hive表了,而是预处理后的Hive表。此时由于数据已经预先进行过聚合或join操作了,那么在Spark作业中也就不需要使用原先的shuffle类算子执行这类操作了。

方案实现原理:

这种方案从根源上解决了数据倾斜,因为彻底避免了在Spark中执行shuffle类算子,那么肯定就不会有数据倾斜的问题了。但是这里也要提醒一下大家,这种方式属于治标不治本。因为毕竟数据本身就存在分布不均匀的问题,所以Hive ETL中进行group by或者joinshuffle操作时,还是会出现数据倾斜,导致Hive ETL的速度很慢。我们只是把数据倾斜的发生提前到了Hive ETL中,避免Spark程序发生数据倾斜而已。

方案优点:

实现起来简单便捷,效果还非常好,完全规避掉了数据倾斜,Spark作业的性能会大幅度提升。

方案缺点:

治标不治本,Hive ETL中还是会发生数据倾斜

<

以上是关于DataSkew —— 数据倾斜问题解析及解决方案实践总结小记的主要内容,如果未能解决你的问题,请参考以下文章

Spark性能优化之道——解决Spark数据倾斜(Data Skew)的N种姿势

实操 | Hive 数据倾斜问题定位排查及解决

实操 | Hive 数据倾斜问题定位排查及解决

hive数据倾斜及解决方案

大数据之Hive:Hive数据倾斜问题及解决方案

Spark常见数据倾斜情况及调优方案