DataSkew —— 数据倾斜问题解析及解决方案实践总结小记
Posted 扫地增
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了DataSkew —— 数据倾斜问题解析及解决方案实践总结小记相关的知识,希望对你有一定的参考价值。
文章目录
- 什么是数据倾斜
- 数据倾斜的现象
- 数据倾斜产生的原因
- 数据倾斜产生的原理
- 产生数据倾斜的操作
- 不同情形倾斜数据处理方案
- Hql和SparkSql中处理
- Spark解决数据倾斜具体方法
- 结语
注意,要区分开数据倾斜与数据量过量这两种情况,
数据倾斜
是指少数task被分配了绝大多数的数据,因此少数task运行缓慢;数据过量
是指所有task被分配的数据量都很大,相差不多,所有task都运行缓慢。
什么是数据倾斜
简单的讲,数据倾斜就是我们在计算数据的时候,数据的分散度不够,导致大量的数据集中到了一台或者几台机器上计算,这些数据的计算速度远远低于平均计算速度,导致整个计算过程过慢。
hash
的时候,如果有一个值特别多,那么hash的操作是均分不掉数据的,只能放到一个机器上去处理,这就是数据倾斜。
相信大部分做过数据的朋友都会遇到数据倾斜,数据倾斜会发生在数据开发的各个环节中,比如:
- 用
Hive
算数据的时候reduce
阶段卡在99.99%
- 用SparkStreaming做实时算法时候,一直会有
executor出现OOM的错误,但是其余的executor内存使用率却很低
。数据倾斜有一个关键因素是数据量大,可以达到千亿级。其实所谓的数据倾斜就是数据分布不均匀。
数据倾斜的现象
Hadoop中的数据倾斜
Hadoop
中直接贴近用户使用使用的时Mapreduce
程序和Hive
程序,虽说Hive
最后也是用MR
来执行(至少目前Hive
内存计算并不普及),但是毕竟写的内容逻辑区别很大,一个是程序,一个是Sql
,因此这里稍作区分。Hadoop
中的数据倾斜主要表现在ruduce
阶段卡在99.99%
,一直99.99%
不能结束。这里如果详细的看日志或者和监控界面的话会发现:
有一个多几个reduce卡住
各种container报错OOM
读写的数据量极大,至少远远超过其它正常的reduce
伴随着数据倾斜,会出现任务被kill等各种诡异的表现。
Spark中的数据倾斜
Spark
中的数据倾斜也很常见,这里包括Spark Streaming
和Spark Sql
,表现主要有下面几种:
Executor lost
,OOM
,Shuffle过程出错
Driver OOM
单个Executor执行时间特别久,整体任务卡在某个阶段不能结束
正常运行的任务突然失败
补充一下,在
Spark streaming程序中,数据倾斜更容易出现
,特别是
在程序中包含一些类似sql的join、group这种操作的时候
。 因为Spark Streaming程序在运行的时候,我们一般不会分配特别多的内存,因此一旦在这个过程中出现一些数据倾斜,就十分容易造成OOM。
Hive中的数据倾斜
任务进度长时间维持在
99%(或100%)
,查看任务监控页面,发现只有少量(1个或几个)reduce
子任务未完成。因为其处理的数据量和其他reduce
差异过大。单一reduce
的记录数与平均记录数差异过大,通常可能达到3
倍甚至更多。 最长时长远大于平均时长。
经验:
Hive
的数据倾斜,一般都发生在Sql
中Group By
和Join On
上,而且和数据逻辑绑定比较深。
数据倾斜产生的原因
Hive数仓为何会出现DateSkew
数仓中发生数据倾斜的根本原因就是:某值的的数量过多。导致谋职的数量过多的原因有三个。
产生原因:
- key 分布不均匀
- 业务数据本身的特性
- 空值,大量的游客用户登录访问。
- 数据类型不匹配,使用从
mysql
数据导过来的数据和hive
数仓里的数据做联合查询的时候,可能某个字段mysql
里的是int
类型,hive
里是string
类型,这些string
类型的数据就会积压在一起。- 表结构本身有问题,比如,这个地区字段,都是市,北京市是一个值,太原市是一个值,到时候肯定会北京市的数据量特别大导致数据倾斜。这个例子比较极端,只是想说地区人口差异很大,数据量也会有很大差距。
Spark为何会出现DateSkew
数据倾斜的原因:
出现数据倾斜的原因,基本只可能是因为发生了
shuffle
操作,在shuffle
的过程中,出现了数据倾斜的问题。因为某个或者某些key
对应的数据,远远的高于其他的key
。Shuffle
数据之后导致数据分布不均匀,但是所有节点的机器的性能都是一样的,程序也是一样的,就是数据量不一致,所以决定了task
的执行时长就被数据量决定了。
数据分区的策略:
随机分区:
每一个数据分配的任意一个分区的概率是均等的Hash分区:
使用数据的Hash分区值,%分区数。(导致数据倾斜的原因)范围分区:
将数据范围划分,数据分配到不同的范围中(分布式的全局排序)
定位数据倾斜问题
- 查阅代码中会产生
shuffle
的算子,例如distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition
等算子,根据代码逻辑判断此处是否会出现数据倾斜。- 查看
Spark
作业的log
文件,log
文件对于错误的记录会精确到代码的某一行,可以根据异常定位到的代码位置来明确错误发生在第几个stage
(哪一个stage
生成的task
特别慢),通过stage
定位到对应的shuffle
算子是哪一个,从而确定是什么地方发生数据倾斜。
查看数据倾斜的key的分布情况:
//使用spark中的抽样算子sample,查看相应的key的分布
val sampledPairs = pairs.sample(false, 0.1) //抽样
val sampledWordCounts = sampledPairs.countByKey()
sampledWordCounts.foreach(println(_))
数据倾斜产生的原理
如果大家对
MapReduce
的shuffle
过程或者Spark
的shuffle
很熟悉的话完全不用看这个图的,因为图片也是笔者随便找的本不想放在这里。多说一句以MR
为例就是由于shuffle
后数据经过HashPartitioner
进行分组后形成数据集的数量均衡造成的。数据分组效果大致如下图:
产生数据倾斜的操作
关键操作 | 情形 | 结果 |
---|---|---|
Join | 其中一个表较小,但是key集中 | 分发到某一个或几个Reduce上的数据远高于平均值 |
大表与大表,但是分桶的判断字段0值或者空值过多 | 这些空值都由一个redice处理,很慢 | |
group by | group by维度过小,某值得数量过多 | 处理某值的reduce十分耗时 |
Count(DISTINCT XXX) | 某特殊值过多 | 处理此特殊值的reduce十分耗时 |
reduceByKey | 某值得数量过多 | 处理某值的reduce十分耗时 |
countByKey | 某值得数量过多 | 处理某值的reduce十分耗时 |
groupByKey | 某值得数量过多 | 处理某值的reduce十分耗时 |
不同情形倾斜数据处理方案
Hql和SparkSql中处理
Join
小表join大表
主要考虑使用
Map Join
代替Reduce Join
。
使用map join
让小的维度表(1000
条以下的记录条数) 先进内存。在map
端完成reduce
。
由于笔者之前总结过map join
如何使用,这里就不过多陈述。可以参照hive sql数据倾斜——大表join小表如何使用map join
大表join大表skewjoin
- 当
key
存在无效值时:
把空key
变成一个字符串加上随机数,把倾斜的数据分到不同的reduce
上,由于null
值关联不上,处理后并不影响最终结果。处理方式如下:SELECT * FROM log a LEFT JOIN bmw_users b ON CASE WHEN a.user_id IS NULL THEN concat(‘dp_hive’,rand()) >ELSE a.user_id END = b.user_id;
- 当
key
值都是有效值时
可使用hive配置:-- 指定是否开启数据倾斜的join运行时优化,默认不开启即false。 set hive.optimize.skewjoin=true; -- 判断数据倾斜的阈值,如果在join中发现同样的key超过该值,则认为是该key是倾斜key。默认100000。 -- 一般可以设置成处理的总记录数/reduce个数的2-4倍。 set hive.skewjoin.key=100000; -- 指定是否开启数据倾斜的join编译时优化,默认不开启即false。 set hive.optimize.skewjoin.compiletime=true;
具体来说,会基于存储在原数据中的倾斜
key
,在编译时为导致倾斜的key
单独创建执行计划,而其他key
也有一个执行计划用来join
。然后,对上面生成的两个join
执行后求并集。因此,除非相同的倾斜key
同时存在于这两个join
表中,否则对于引起倾斜的key
的join
就会优化为map-side join
。此外,该参数与hive.optimize.skewjoin
之间的主要区别在于,此参数使用存储在metastore
中的倾斜信息在编译时来优化执行计划。如果元数据中没有倾斜信息,则此参数无效。一般可将这两个参数都设为true
。如果元数据中有倾斜信息,则hive.optimize.skewjoin
不做任何操作。
数据类型不匹配
数据类型不匹配,可以直接把两个表的主键直接统一。
select *
from users a
left outer join logs b
on a.usr_id = cast(b.user_id as string)
关联主键含有大量空key
这种情况需要按照场景进行处理,分为
单条件联合多条件联合查询
。如果是多条件联合查询可以将多个主键进行拼接产生新的主键来尽可能避免主键空key的情况
,当然这种情况需要与业务数据紧密结合,尽可能多的确保数据中多个关连键有一个不为空,是的空key
数量保持在正常分组水平,也可以极大提高查询性能。如果实在不能确保那么我们应该对每一个关联主键按照单条件关联的方式进行处理。实例如下:
--发生数据倾斜的SQL
select *
from advertisement_log log
left join user_center user
on log.open_id = user.open_id
and log.union_id = user.union_id
--调整后的sql
select *
from advertisement_log log
left join user_center user
on concat_ws('_',log.open_id,log.union_id) = concat_ws('_',user.open_id,user.union_id)
如果是单条件联合查询建议使用如下实例两个方式:
解决方式1:user_id为空的不参与关联
SELECT *
FROM log a
JOIN bmw_users b
ON a.user_id IS NOT NULL
AND a.user_id = b.user_id
UNION ALL
SELECT *
FROM log a
WHERE a.user_id IS NULL;
解决方式2:赋与空值分新的key值
SELECT *
FROM log a
LEFT JOIN bmw_users b
ON CASE WHEN a.user_id IS NULL THEN concat(‘dp_hive’,rand()) ELSE a.user_id END = b.user_id;
结论:
方法2比方法1效率更好,不但
io
少了,而且作业数也少了。解决方法1log
表被读取了两次,jobs
是2
。这个适合优化无效id
(比如-99 , ’’, null
等) 产生的倾斜问题。把空key
变成一个字符串加上随机数,就能把倾斜的数据分到不同的reduce
上 ,解决数据倾斜问题。
关联主键中少部分key的数据量巨大
在业务逻辑优化效果的不大情况下,有些时候是可以将倾斜的数据单独拿出来处理。最后union回去。
这种情况笔者是在对日志表表数据做处理时遇到的,主要是不同埋点的日志数据,热埋点和冷埋点被点击的数据量偏差非常巨大,相差四五个数据级。笔者是按照需求设计埋点表字典表,按照数据量的级别对埋点进行划分,分别进行
join
,最后使用union all
将数据进行合并。这种处理方式需要对数据进行深入的分析,但是处理比较简单有效,原理和空key
的解决方式1类似。实例如下:
字典表:
CREATE EXTERNAL TABLE dim.event_v1`
(
`event_id` STRING COMMENT '事件ID',
`describe` STRING COMMENT '事件(埋点)描述',
`district` STRING COMMENT '事件区域',
`part` STRING COMMENT '事件所属部分',
`action` STRING COMMENT '行为[点击,展示]',
`work_table` STRING COMMENT '使用表'
)
STORED AS textfile
LOCATION '/big-data/dim/event_v1';
解决方式:
SELECT /*+MAPJOIN(event)*/
user_id,
app_id,
provice,
city,
district
FROM fct.log log
JOIN (
SELECT
FROM dim.event_1
WHERE work_table = 'big_event_log'
)event
ON log.event_id = event.event_id
UNION ALL
SELECT /*+MAPJOIN(event)*/
user_id,
app_id,
provice,
city,
district
FROM fct.log log
JOIN (
SELECT
FROM dim.event_1
WHERE work_table = 'small_event_log'
)event
ON log.event_id = event.event_id
Join驱动表选取和数据量优化:
- 关于驱动表的选取,
选用join key分布最均匀的表作为驱动表
;- 做好
列裁剪和filter操作
,以达到两表做join
的时候,数据量相对变小
的效果。
left semi join
这种
join的应用场景主要是用来代替sql中的in
,来提升性能,使用于大表join小表的一些场景中,如果关联的结果只需要保留左表中的字段数据且重复数据在结果中只需要出现一次,那么这个时候就可以使用left semi join来避免因为关联主键某些值大量出现引起的倾斜
。在后续的文章中笔者会对left semi join
用法进行总结。
Join 倾斜总结
关于
join
的倾斜总结目前笔者遇到基本就是这些,如果后续遇到其他情况笔者在更。
Count(DISTINCT XXX)
count distinct
时,会将值为空的情况单独处理,比如可以直接过滤空值的行,在最后结果中加1
。如果还有其他计算,需要进行group by
,可以先将值为空的记录单独处理,再和其他计算结果进行union
。
能先进行group
操作的时候先进行group
操作,把key
先进行一次reduce
,之后再进行count
或者distinct count
操作。
group by
注:
group by
优于distinct group
。
采用sum() + group by
的方式来替换count(distinct)
完成计算。
增加Reuducer个数
默认是由参数
hive.exec.reducers.bytes.per.reducer
来推断需要的Reducer
个数。可通过mapred.reduce.tasks
控制。
调优
hive.map.aggr=true
--开启map端combiner
set hive.map.aggr=true
开启
map combiner
。在map
中会做部分聚集操作,效率更高但需要更多的内存。
假如map
各条数据基本上不一样, 聚合没什么意义,做combiner
反而画蛇添足,hive
里也考虑的比较周到通过参数可以进行相关的设置:hive.groupby.mapaggr.checkinterval = 100000 (默认) hive.map.aggr.hash.min.reduction=0.5(默认)
hive.groupby.skewindata=true
--开启数据倾斜时负载均衡
set hive.groupby.skewindata=true;
就是先随机分发并处理,再按照
key group by
来分发处理。
当选项设定为true
,生成的查询计划会有两个MRJob
。
- 第一个
MRJob
中,Map
的输出结果集合会随机分布到Reduce
中,每个Reduce
做部分聚合操作,并输出结果,这样处理的结果是相同的GroupBy Key
有可能被分发到不同的Reduce
中,从而达到负载均衡的目的;- 第二个
MRJob
再根据预处理的数据结果按照GroupBy Key
分布到Reduce
中(这个过程可以保证相同的原始GroupBy Key
被分布到同一个Reduce
中),最后完成最终的聚合操作。
它使计算变成了两个mapreduce
,先在第一个中在shuffle
过程partition
时随机给key
打标记,使每个key
随机均匀分布到各个reduce
上计算,但是这样只能完成部分计算,因为相同key
没有分配到相同reduce
上。所以需要第二次的mapreduce
,这次就回归正常shuffle
,但是数据分布不均匀的问题在第一次mapreduce
已经有了很大的改善,因此基本解决数据倾斜。因为大量计算已经在第一次mr
中随机分布到各个节点完成。
增加并行度
- 场景: 两个大表,数据分布均匀,为了提高效率,使用
mapjoin
,采用切分大表的方法。- 方法: 采用将大表切分为小表,然后进行连接。
原始测试表
+----------+------------+
| test.id | test.name |
+----------+------------+
| 1 | aa |
| 2 | bb |
| 3 | cc |
| 4 | dd |
+----------+------------+
将其切分为两个:
select * from test tablesample(bucket 1 out of 2 on id);
+----------+------------+
| test.id | test.name |
+----------+------------+
| 2 | bb |
| 4 | dd |
+----------+------------+
select * from test tablesample(bucket 2 out of 2 on id);
+----------+------------+
| test.id | test.name |
+----------+------------+
| 1 | aa |
| 3 | cc |
+----------+------------+
切分为四个:
select * from test tablesample(bucket 1 out of 4 on id);
+----------+------------+
| test.id | test.name |
+----------+------------+
| 4 | dd |
+----------+------------+
select * from test tablesample(bucket 2 out of 4 on id);
+----------+------------+
| test.id | test.name |
+----------+------------+
| 1 | aa |
+----------+------------+
select * from test tablesample(bucket 3 out of 4 on id);
+----------+------------+
| test.id | test.name |
+----------+------------+
| 2 | bb |
+----------+------------+
select * from test tablesample(bucket 4 out of 4 on id);
+----------+------------+
| test.id | test.name |
+----------+------------+
| 3 | cc |
+----------+------------+
tablesample(bucket 3 out of 4 on id)
,其中tablesample
为关键字,bucket
关键字,3
为要去的分表,4
为拆分表的数目,id
拆分依据。
*多表 union all 会优化成一个 job
推广效果表要和商品表关联,效果表中的 auction id 列既有商品 id,也有数字 id,和商品表关联得到商品的信息。
SELECT *
FROM effect a
JOIN (
SELECT auction_id AS auction_id
FROM auctions
UNION ALL
SELECT auction_string_id AS auction_id
FROM auctions
) b
ON a.auction_id = b.auction_id;
这样子比分别过滤
数字 id,字符串 id
,然后分别和商品表关联性能要好。这样写的好处:1个 MR 作业,商品表只读取一次,推广效果表只读取一次。把这个 sql 换成 MR 代码的话,map 的时候,把 a 表的记录打上标签 a ,商品表记录每读取一条,打上标签 t,变成两个<key,value> 对,<t,数字id,value>,<t,字符串id,value>
。所以商品表的HDFS(Hadoop Distributed File System)
读只会是一次。
消灭子查询内的 group by
原写法:
SELECT *
FROM (
SELECT *
FROM t1
GROUP BY c1, c2, c3
UNION ALL
SELECT *
FROM t2
GROUP BY c1, c2, c3
) t3
GROUP BY c1, c2, c3;
优化写法:
SELECT *
FROM (
SELECT *
FROM t1
UNION ALL
SELECT *
FROM t2
) t3
GROUP BY c1, c2, c3;
从业务逻辑上说,子查询内的 group by 功能与外层的 group by 重复,除非子查询内有 count(distinct)
。经过测试,并未出现union all
的hive bug
,数据是一致的。MR
的作业数由3
减少到1
。t1
相当于一个目录,t2
相当于一个目录,对map reduce
程序来说,t1,t2
可以做为map reduce
作业的mutli inputs
。这可以通过一个map reduce
来解决这个问题。Hadoop
的计算框架,不怕数据多,怕作业数多。
消灭子查询内的count(distinct),max,min
原写法:
SELECT c1, c2, c3, sum(pv)
FROM (
SELECT c1, c2, c3, COUNT(c4)
FROM t1
GROUP BY c1, c2, c3
UNION ALL
SELECT c1, c2, c3, COUNT(DISTINCT c4)
FROM t2
GROUP BY c1, c2, c3
) t3
GROUP BY c1, c2, c3;
这种我们不能直接
union
再group by
,因为其中有一个表的操作用到了去重,这种情况,我们可以通过建立临时表来消灭这种数据倾斜问题。
优化写法:
INSERT INTO t4
SELECT c1, c2, c3, COUNT(DISTINCT c4)
FROM t2
GROUP BY c1, c2, c3;
SELECT c1, c2, c3, SUM(pv)
FROM (
SELECT c1, c2, c3, COUNT(c4)
FROM t1
UNION ALL
SELECT *
FROM t4
) t3
GROUP BY c1, c2, c3;
reduce的时间过长
假设一个memberid对应的log里有很多数据,那么最后合并的时候,也是十分耗时的,所以,这里需要找到一个方法来解决这种reduce分配不均的问题。
解决方法:
SELECT *
FROM log a
LEFT JOIN (
SELECT memberid, number
FROM users d
JOIN num e
) b
ON a.memberid = b.memberid
AND mod(a.pvtime, 30) + 1 = b.number;
解释一下,上面的num是一张1列30行的表,对应1-30的正整数,
把users表膨胀成N份(基于倾斜程度做一个合适的选择),然后把log数据根据memberid和pvtime分到不同的reduce里去,这样可以保证每个reduce分配到的数据可以相对均匀
。
过多的where条件
有的时候,我们会写超级多的where条件来限制查询,其实这样子是非常低效的,主要原因是因为这个and条件hive在生成执行计划时产生了一个嵌套层次很多的算子。
- 解决方案:
1)把筛选条件对应的值写入一张小表,再一次性join
到主表;
2)或者写个udf
(user-defined function
,用户定义函数),把这些预设值读取进去,udf
来完成这个and
数据过滤操作。
分组结果很多,但是你只需要topK
原写法:
SELECT mid, url, COUNT(1) AS cnt
FROM (
SELECT *
FROM r_atpanel_log
WHERE pt = '20190610'
AND pagetype = 'normal'
) subq
GROUP BY mid, url
ORDER BY cnt DESC
LIMIT 15;
优化写法:
SELECT *
FROM (
SELECT mid, url, COUNT(1) AS cnt
FROM (
SELECT *
FROM r_atpanel_log
WHERE pt = '20190610'
AND pagetype = 'normal'
) subq
GROUP BY mid, url
) subq2
WHERE cnt > 100
ORDER BY cnt DESC
LIMIT 15;
可以看出,我们先过滤掉无关的内容,再进行排序,这样子快很多。
Spark解决数据倾斜具体方法
解决方案一:使用Hive ETL预处理数据
方案适用场景:
导致数据倾斜的是
Hive
表。如果该Hive
表中的数据本身很不均匀(比如某个key对应了100万数据,其他key才对应了10条数据),而且业务场景需要频繁使用Spark
对Hive
表执行某个分析操作,那么比较适合使用这种技术方案。
方案实现思路:
此时可以评估一下,是否可以通过
Hive
来进行数据预处理(即通过Hive ETL
预先对数据按照key
进行聚合,或者是预先和其他表进行join
),然后在Spark
作业中针对的数据源就不是原来的Hive
表了,而是预处理后的Hive
表。此时由于数据已经预先进行过聚合或join
操作了,那么在Spark
作业中也就不需要使用原先的shuffle
类算子执行这类操作了。
方案实现原理:
这种方案从根源上解决了数据倾斜,因为彻底避免了在
Spark
中执行shuffle
类算子,那么肯定就不会有数据倾斜的问题了。但是这里也要提醒一下大家,这种方式属于治标不治本
。因为毕竟数据本身就存在分布不均匀的问题,所以Hive ETL
中进行group by
或者join
等shuffle
操作时,还是会出现数据倾斜,导致Hive ETL
的速度很慢。我们只是把数据倾斜的发生提前到了Hive ETL
中,避免Spark
程序发生数据倾斜而已。
方案优点:
实现起来简单便捷,效果还非常好,完全规避掉了数据倾斜,
Spark
作业的性能会大幅度提升。
方案缺点:
治标不治本,
<Hive ETL中还是会发生数据倾斜
。以上是关于DataSkew —— 数据倾斜问题解析及解决方案实践总结小记的主要内容,如果未能解决你的问题,请参考以下文章