Hive3 - 性能优化
Posted 小毕超
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hive3 - 性能优化相关的知识,希望对你有一定的参考价值。
一、存储格式修改
Hive
数据存储的本质还是HDFS
,所有的数据读写都基于HDFS
的文件来实现,为了提高对HDFS
文件读写的性能,Hive
中提供了多种文件存储格式:TextFile、SequenceFile、RCFile、ORC、Parquet
等,Hive
默认情况下为了避免各种编码及数据错乱的问题使用的是 TextFile
格式存储。
在 Hive
中指定存储格式通过stored
关键字进行:
1. SequenceFile格式存储
SequenceFile
是Hadoop
里用来存储序列化的键值对即二进制的一种文件格式,以二进制的KV
形式存储数据,与底层交互更加友好,性能更快,可已被压缩、分割,从而优化磁盘利用率和I/O
,支持并行操作数据,查询效率高。
create table test_xbc10(
id int,
name string
)
row format delimited
fields terminated by ","
lines terminated by '\\n'
stored as sequencefile;
2. Parquet 格式存储
Parquet
是Hadoop
生态圈中主流的列式存储格式,是一种支持嵌套数据模型对的列式存储系统,作为大数据系统中OLAP
查询的优化方案,它已经被多种查询引擎原生支持,并且部分高性能引擎将其作为默认的文件存储格式。通过数据编码和压缩,以及映射下推和谓词下推功能,Parquet
的性能也较之其它文件格式有所提升。
create table test_xbc11(
id int,
name string
)
row format delimited
fields terminated by ","
lines terminated by '\\n'
stored as parquet;
3. ORC 格式存储及索引
ORC
最初产生自Apache Hive
,用于降低Hadoop
数据存储空间和加速Hive
查询速度。它并不是一个单纯的列式存储格式,仍然是首先根据行组分割整个表,在每一个行组内进行按列存储。ORC
文件是自描述的,它的元数据使用Protocol Buffers
序列化,并且文件中的数据尽可能的压缩以降低存储空间的消耗。
create table test_xbc12(
id int,
name string
)
row format delimited
fields terminated by ","
lines terminated by '\\n'
stored as orc;
ORC 文件索引
ORC
提供了两种索引机制:Row Group Index
和 Bloom Filter Index
可以帮助提高查询ORC
文件的性能,当用户写入数据时,可以指定构建索引,当用户查询数据时,可以根据索引提前对数据进行过滤,避免不必要的数据扫描。
1. Row Group Index
一个ORC
文件包含一个或多个stripes(groups of row data)
,每个stripe
中包含了每个column
的min/max
值的索引数据,当查询中有<,>,=
的操作时,会根据min/max
值,跳过扫描不包含的stripes
。
创建该类型索引,需要在建表时,指定表参数 "orc.create.index"="true"
,需要注意的是,为了使Row Group Index
有效利用,向表中加载数据时,必须对需要使用索引的字段进行排序,否则,min/max
会失去意义。另外,这种索引主要用于数值型字段的范围查询过滤优化上。
开启索引配置:
set hive.optimize.index.filter=true;
创建表,并构建索引:
create table test_xbc13(
id int,
name string
)
row format delimited
fields terminated by ","
lines terminated by '\\n'
stored as orc tblproperties ("orc.create.index"="true");
2. Bloom Filter Index
建表时候,通过表参数”orc.bloom.filter.columns”=”columnName……”
来指定为哪些字段建立BloomFilter
索引,这样,在生成数据的时候,会在每个stripe
中,为该字段建立BloomFilter
的数据结构,当查询条件中包含对该字段的=
号过滤时候,先从BloomFilter
中获取以下是否包含该值,如果不包含,则跳过该stripe
。
创建表,并构建索引:
create table test_xbc13(
id int,
name string
)
row format delimited
fields terminated by ","
lines terminated by '\\n'
stored as orc tblproperties ("orc.create.index"="true","orc.bloom.filter.columns"="id,name");
ORC 矢量化查询
Hive
默认查询时,执行引擎一次处理一行,而矢量化查询执行是一种Hive
针对ORC
文件操作的特性,目的是按照每批1024
行读取数据,并且一次性对整个记录整合(而不是对单条记录)应用操作,提升了像过滤, 联合, 聚合等等操作的性能。
注意:要使用矢量化查询执行,就必须以ORC
格式存储数据。
set hive.vectorized.execution.enabled = true;
set hive.vectorized.execution.reduce.enabled = true;
二、数据压缩
Hive
底层是运行的MapReduce
程序,磁盘I/O操作、网络数据传输、shuffle
和merge
要花大量的时间,在任意MapReduce阶段启用压缩都可以改善端到端处理时间并减少I/O
和网络流量。
Hive
中的压缩就是使用了Hadoop
中的压缩实现的,所以Hadoop
中支持的压缩在Hive
中都可以直接使用。
配置MapReduce
开启输出压缩及配置压缩类型:
-开启输出压缩
set mapreduce.output.fileoutputformat.compress=true;
--配置压缩类型为Snappy
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
配置Hive开启中间结果压缩和输出压缩及配置压缩类型
-- 中间结果压缩
set hive.exec.compress.intermediate=true;
set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
-- 输出结果压缩
set hive.exec.compress.output=true;
Hive
遵循谁用谁配置的原则,如果需要全局配置,则需修改 hive-site.xml
文件
三、避免小文件生成
Hive
存储的本质还是HDFS
,HDFS
是不利于小文件存储的,因为每个小文件会产生一条元数据信息,并且MapReduce
中每个小文件会启动一个MapTask
计算处理,导致资源的浪费。
当使用Reduce
进行聚合计算时,并不能清楚的知道每个Reduce
最终会生成的结果的数据大小,因此在Hive
中为我们提供了一个特殊的机制,可以自动的判断是否是小文件,如果是小文件可以自动将小文件进行合并。
-- 如果hive的程序,只有maptask,将MapTask产生的所有小文件进行合并
set hive.merge.mapfiles=true;
-- 如果hive的程序,有Map和ReduceTask,将ReduceTask产生的所有小文件进行合并
set hive.merge.mapredfiles=true;
-- 每一个合并的文件的大小为256M
set hive.merge.size.per.task=268435456;
-- 平均每个文件的大小,如果小于这个值就会进行合并
set hive.merge.smallfiles.avgsize=104857600;
四、读取小文件
MapReduce
中每个小文件会启动一个MapTask
计算处理,造成资源的浪费,尽管上面的方式可以进行文件的合并,但是有可能因为场景的不同还是产品小文件,因此在读取时就需要进行小文件的合并,在MapReduce
中可以使用 CombineTextInputFormat
用于将小文件合并,在 Hive
中也提供了类似的工具:CombineHiveInputFormat
:
-- 设置Hive中底层MapReduce读取数据的输入类:将所有文件合并为一个大文件作为输入
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
五、预防数据倾斜
分布式计算中最常见的,最容易遇到的问题就是数据倾斜,数据倾斜的现象是,当我们提交运行一个程序时,这个程序的大多数的Task
都已经运行结束了,只有某一个Task
一直在运行,迟迟不能结束,导致整体的进度卡在99%
或者100%
,这时候我们就可以判定程序出现了数据倾斜的问题。这个Task
运行过慢的原因在于这个Task
的负载要比其他Task
的负载要高,直观的原因在于Task
的数据分配不均衡。
什么情况下会出现数据倾斜:
- 数据本身就是倾斜的,数据中某种数据出现的次数过多。
- 分区规则导致这些相同的数据都分配给了同一个Task。
出现了数据倾斜该怎么解决呢,可以从两个角度入手,第一个 group By
的情况下,第二个是 Join
的情况下:
group By数据倾斜
如果数据本身就是倾斜的,group By
到 Reduces
阶段肯定也是倾斜的,这种可以进行 Map
端聚合,从而减少Reduce的输入量,在 Hive
中可以使用下面指定:
-- 开启Map端聚合:Combiner
set hive.map.aggr=true;
还可以通过随机分区的方式,默认由Hive
自己选择分区的字段,也可以使用 distribute by
指定底层的MapReduce
按照哪个字段作为Key
实现分区、分组,通过rank函数随机值实现随机分区,避免数据倾斜
-- SQL中避免数据倾斜,构建随机分区
select * from table distribute by rand();
在 Hive 中还可以自动构建随机分区并自动聚合,当前程序会自动通过两个MapReduce
来运行,第一个MapReduce
自动进行随机分区,然后实现聚合,第二个MapReduce
将聚合的结果再按照业务进行处理,得到结果。使用下面参数开启:
-- 开启随机分区,走两个MapReduce
set hive.groupby.skewindata=true;
Join数据倾斜
如果两张表比较大,无法实现Map Join
,只能走Reduce Join
,那么当关联字段中某一种值过多的时候依旧会导致数据倾斜的问题。
这种情况,可以考虑下其中一张表是否可以过滤,使用子查询,将大数据变小数据:
例如:现在有两张表订单表A与用户表B,需要实现查询订单的用户信息,关联字段为userid
。
A表:订单表,1000万条,字段:orderId,userId,produceId,price
等
B表:用户信息表,100万条,字段:userid,username,age,phone
等
如果两张表直接 Join 的话,由于两张表比较大,无法走Map Join,只能走Reduce Join,容易产生数据倾斜。此时我们可以先通过子查询查出所有A表的所有的去重的 userid
,然后在 join
B 表得到A表去重用户信息,然后再 join
A 表,这样就可以实现一个大表,一个小表的场景:
SELECT
a.*,
c.*
FROM
(
SELECT
b.*
FROM
( SELECT DISTINCT a.userid FROM A a ) t,
B b
ON t.userid = b.userid
) c,
A a
ON c.userid = a.userid;
有些情况过滤后的数据依旧很大,这种情况可以通过分桶,减少 Join
的大小,从而避免 Reduce Join
,但是要求:分桶字段 = Join字段 ,桶的个数相等或者成倍数
-- 开启分桶join
set hive.optimize.bucketmapjoin = true;
不过在 Hive
中考虑到了数据倾斜问题,因此在 Hive
中提供了 Skew Join
,这种Join
的原理是将Map Join
和Reduce Join
进行合并,如果某个值出现了数据倾斜,就会将产生数据倾斜的数据单独使用Map Join
来实现,其他没有产生数据倾斜的数据由Reduce Join
来实现,这样就避免了Reduce Join
中产生数据倾斜的问题,最终将Map Join
的结果和Reduce Join
的结果进行Union
合并:
-- 开启运行过程中skewjoin
set hive.optimize.skewjoin=true;
-- 如果这个key的出现的次数超过这个范围
set hive.skewjoin.key=100000;
-- 在编译时判断是否会产生数据倾斜
set hive.optimize.skewjoin.compiletime=true;
-- 不合并,提升性能
set hive.optimize.union.remove=true;
-- 如果Hive的底层走的是MapReduce,必须开启这个属性,才能实现不合并
set mapreduce.input.fileinputformat.input.dir.recursive=true;
六、并行执行
Hive
在计算运行时,会解析为多个Stage
,有时候Stage
彼此之间有依赖关系,只能挨个执行,但是在一些别的场景下,很多的Stage
之间是没有依赖关系的,例如Union
语句,Join
语句等等,这些Stage
没有依赖关系,但Hive
依旧默认挨个执行每个Stage
,这样会导致性能非常差,我们可以通过修改参数,开启并行执行,当多个Stage
之间没有依赖关系时,允许多个Stage
并行执行,提高性能。
-- 开启Stage并行化,默认为false
SET hive.exec.parallel=true;
-- 指定并行化线程数,默认为8
SET hive.exec.parallel.thread.number=16;
七、关联优化
例如执行下面这种操作场景:
select * from table group by createTime order by createTime desc;
该SQL语句转换为MapReduce
时,会生成两个 MapReduces
程序执行,第一个MapReduce
做 group by
然后 shuffle
后对id
做分组,第二个MapReduce
对第一个MapReduce
的结果做order by
。
这种情况下,由于分组和排序都是一个字段,完全可以使用一个MapReduce
的shuffle
做分组和排序,此时我们可以指定以下参数,对有关联关系的操作进行解析时,可以尽量放在同一个MapReduce
中实现:
set hive.optimize.correlation=true;
八、使用CBO优化器引擎
例如:当前有一张表只有100条数据,其中 id
构建了索引,id =100
的有90
条,如果现在查询所有id = 10
的数据,由于id
构建了索引,索引默认的优化器引擎RBO
,会选择先从索引中查询id = 10
的值所在的位置,再根据索引记录位置去读取对应的数据,但是这并不是最佳的执行方案。有id=10
的值占了总数据的90%
,这时候直接检索数据返回,要比索引后效率更高,更节省资源,这种方式就是CBO
优化器引擎会选择的方案。Hive
中也支持RBO
与CBO
这两种引擎,默认使用的是RBO
优化器引擎。
使用CBO
引擎更加智能,可以通过下面配置,修改优化器引擎为CBO
引擎:
set hive.cbo.enable=true;
set hive.compute.query.using.stats=true;
set hive.stats.fetch.column.stats=true;
set hive.stats.fetch.partition.stats=true;
九、Analyze分析优化器
CBO
引擎一般搭配analyze分析优化器一起使用,因为CBO
引擎需要知道构建数据的元数据,才能基于代价选择合适的处理计划,而Analyze
分析优化器用于提前运行一个MapReduce
程序将表或者分区的信息构建一些元数据【表的信息、分区信息、列的信息】,很好的配合了 CBO
引擎:
构建表中分区数据的元数据信息:
analyze table test_table partition(addr) compute statistics;
构建表中列的数据的元数据信息:
analyze table test_table compute statistics for columns userid;
查看构建的列的元数据:
desc formatted test_table userid;
以上是关于Hive3 - 性能优化的主要内容,如果未能解决你的问题,请参考以下文章
【工作】Presto 集群实测,以及与Spark3、Hive3性能对比
hive3之执行计划(Explain)Fetch 抓取本地模式表的优化Group By笛卡尔积行列过滤
hive3之执行计划(Explain)Fetch 抓取本地模式表的优化Group By笛卡尔积行列过滤