Hive3 - 性能优化

Posted 小毕超

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hive3 - 性能优化相关的知识,希望对你有一定的参考价值。

一、存储格式修改

Hive数据存储的本质还是HDFS,所有的数据读写都基于HDFS的文件来实现,为了提高对HDFS文件读写的性能,Hive中提供了多种文件存储格式:TextFile、SequenceFile、RCFile、ORC、Parquet等,Hive 默认情况下为了避免各种编码及数据错乱的问题使用的是 TextFile 格式存储。

Hive 中指定存储格式通过stored关键字进行:

1. SequenceFile格式存储

SequenceFileHadoop里用来存储序列化的键值对即二进制的一种文件格式,以二进制的KV形式存储数据,与底层交互更加友好,性能更快,可已被压缩、分割,从而优化磁盘利用率和I/O,支持并行操作数据,查询效率高。

create table test_xbc10(
       id int,
       name string
)
row format delimited
fields terminated by ","
lines terminated by '\\n'
stored as sequencefile;

2. Parquet 格式存储

ParquetHadoop生态圈中主流的列式存储格式,是一种支持嵌套数据模型对的列式存储系统,作为大数据系统中OLAP查询的优化方案,它已经被多种查询引擎原生支持,并且部分高性能引擎将其作为默认的文件存储格式。通过数据编码和压缩,以及映射下推和谓词下推功能,Parquet的性能也较之其它文件格式有所提升。

create table test_xbc11(
       id int,
       name string
)
row format delimited
fields terminated by ","
lines terminated by '\\n'
stored as parquet;

3. ORC 格式存储及索引

ORC 最初产生自Apache Hive,用于降低Hadoop数据存储空间和加速Hive查询速度。它并不是一个单纯的列式存储格式,仍然是首先根据行组分割整个表,在每一个行组内进行按列存储。ORC文件是自描述的,它的元数据使用Protocol Buffers序列化,并且文件中的数据尽可能的压缩以降低存储空间的消耗。

create table test_xbc12(
       id int,
       name string
)
row format delimited
fields terminated by ","
lines terminated by '\\n'
stored as orc;
ORC 文件索引

ORC提供了两种索引机制:Row Group IndexBloom Filter Index可以帮助提高查询ORC文件的性能,当用户写入数据时,可以指定构建索引,当用户查询数据时,可以根据索引提前对数据进行过滤,避免不必要的数据扫描。

1. Row Group Index

一个ORC文件包含一个或多个stripes(groups of row data),每个stripe中包含了每个columnmin/max值的索引数据,当查询中有<,>,=的操作时,会根据min/max值,跳过扫描不包含的stripes

创建该类型索引,需要在建表时,指定表参数 "orc.create.index"="true" ,需要注意的是,为了使Row Group Index有效利用,向表中加载数据时,必须对需要使用索引的字段进行排序,否则,min/max会失去意义。另外,这种索引主要用于数值型字段的范围查询过滤优化上。

开启索引配置:

set hive.optimize.index.filter=true;

创建表,并构建索引:

create table test_xbc13(
       id int,
       name string
)
row format delimited
fields terminated by ","
lines terminated by '\\n'
stored as orc tblproperties ("orc.create.index"="true");
2. Bloom Filter Index

建表时候,通过表参数”orc.bloom.filter.columns”=”columnName……”来指定为哪些字段建立BloomFilter索引,这样,在生成数据的时候,会在每个stripe中,为该字段建立BloomFilter的数据结构,当查询条件中包含对该字段的=号过滤时候,先从BloomFilter中获取以下是否包含该值,如果不包含,则跳过该stripe

创建表,并构建索引:

create table test_xbc13(
       id int,
       name string
)
row format delimited
fields terminated by ","
lines terminated by '\\n'
stored as orc tblproperties ("orc.create.index"="true","orc.bloom.filter.columns"="id,name");
ORC 矢量化查询

Hive 默认查询时,执行引擎一次处理一行,而矢量化查询执行是一种Hive针对ORC文件操作的特性,目的是按照每批1024行读取数据,并且一次性对整个记录整合(而不是对单条记录)应用操作,提升了像过滤, 联合, 聚合等等操作的性能。

注意:要使用矢量化查询执行,就必须以ORC格式存储数据。

set hive.vectorized.execution.enabled = true;
set hive.vectorized.execution.reduce.enabled = true;

二、数据压缩

Hive底层是运行的MapReduce程序,磁盘I/O操作、网络数据传输、shufflemerge要花大量的时间,在任意MapReduce阶段启用压缩都可以改善端到端处理时间并减少I/O和网络流量。

Hive中的压缩就是使用了Hadoop中的压缩实现的,所以Hadoop中支持的压缩在Hive中都可以直接使用。


配置MapReduce开启输出压缩及配置压缩类型:

-开启输出压缩
set mapreduce.output.fileoutputformat.compress=true;
--配置压缩类型为Snappy
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;

配置Hive开启中间结果压缩和输出压缩及配置压缩类型

-- 中间结果压缩
set hive.exec.compress.intermediate=true;
set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
-- 输出结果压缩
set hive.exec.compress.output=true;

Hive 遵循谁用谁配置的原则,如果需要全局配置,则需修改 hive-site.xml 文件

三、避免小文件生成

Hive 存储的本质还是HDFSHDFS是不利于小文件存储的,因为每个小文件会产生一条元数据信息,并且MapReduce中每个小文件会启动一个MapTask计算处理,导致资源的浪费。

当使用Reduce进行聚合计算时,并不能清楚的知道每个Reduce最终会生成的结果的数据大小,因此在Hive中为我们提供了一个特殊的机制,可以自动的判断是否是小文件,如果是小文件可以自动将小文件进行合并。

-- 如果hive的程序,只有maptask,将MapTask产生的所有小文件进行合并
set hive.merge.mapfiles=true;
-- 如果hive的程序,有Map和ReduceTask,将ReduceTask产生的所有小文件进行合并
set hive.merge.mapredfiles=true;
-- 每一个合并的文件的大小为256M
set hive.merge.size.per.task=268435456;
-- 平均每个文件的大小,如果小于这个值就会进行合并
set hive.merge.smallfiles.avgsize=104857600;

四、读取小文件

MapReduce中每个小文件会启动一个MapTask计算处理,造成资源的浪费,尽管上面的方式可以进行文件的合并,但是有可能因为场景的不同还是产品小文件,因此在读取时就需要进行小文件的合并,在MapReduce中可以使用 CombineTextInputFormat 用于将小文件合并,在 Hive中也提供了类似的工具:CombineHiveInputFormat

-- 设置Hive中底层MapReduce读取数据的输入类:将所有文件合并为一个大文件作为输入
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

五、预防数据倾斜

分布式计算中最常见的,最容易遇到的问题就是数据倾斜,数据倾斜的现象是,当我们提交运行一个程序时,这个程序的大多数的Task都已经运行结束了,只有某一个Task一直在运行,迟迟不能结束,导致整体的进度卡在99%或者100%,这时候我们就可以判定程序出现了数据倾斜的问题。这个Task运行过慢的原因在于这个Task的负载要比其他Task的负载要高,直观的原因在于Task的数据分配不均衡。

什么情况下会出现数据倾斜:

  • 数据本身就是倾斜的,数据中某种数据出现的次数过多。
  • 分区规则导致这些相同的数据都分配给了同一个Task。

出现了数据倾斜该怎么解决呢,可以从两个角度入手,第一个 group By 的情况下,第二个是 Join 的情况下:

group By数据倾斜

如果数据本身就是倾斜的,group ByReduces 阶段肯定也是倾斜的,这种可以进行 Map 端聚合,从而减少Reduce的输入量,在 Hive 中可以使用下面指定:

-- 开启Map端聚合:Combiner
set hive.map.aggr=true;

还可以通过随机分区的方式,默认由Hive自己选择分区的字段,也可以使用 distribute by 指定底层的MapReduce按照哪个字段作为Key实现分区、分组,通过rank函数随机值实现随机分区,避免数据倾斜

-- SQL中避免数据倾斜,构建随机分区
select * from table distribute by rand();

在 Hive 中还可以自动构建随机分区并自动聚合,当前程序会自动通过两个MapReduce来运行,第一个MapReduce自动进行随机分区,然后实现聚合,第二个MapReduce将聚合的结果再按照业务进行处理,得到结果。使用下面参数开启:

-- 开启随机分区,走两个MapReduce 
set hive.groupby.skewindata=true;

Join数据倾斜

如果两张表比较大,无法实现Map Join,只能走Reduce Join,那么当关联字段中某一种值过多的时候依旧会导致数据倾斜的问题。

这种情况,可以考虑下其中一张表是否可以过滤,使用子查询,将大数据变小数据:

例如:现在有两张表订单表A与用户表B,需要实现查询订单的用户信息,关联字段为userid

A表:订单表,1000万条,字段:orderId,userId,produceId,price
B表:用户信息表,100万条,字段:userid,username,age,phone

如果两张表直接 Join 的话,由于两张表比较大,无法走Map Join,只能走Reduce Join,容易产生数据倾斜。此时我们可以先通过子查询查出所有A表的所有的去重的 userid ,然后在 join B 表得到A表去重用户信息,然后再 join A 表,这样就可以实现一个大表,一个小表的场景:

SELECT
	a.*,
	c.* 
FROM
	(
	SELECT
		b.* 
	FROM
		( SELECT DISTINCT a.userid FROM A a )  t,
		B b 
		ON t.userid = b.userid 
	) c,
	A a 
	ON c.userid = a.userid;

有些情况过滤后的数据依旧很大,这种情况可以通过分桶,减少 Join 的大小,从而避免 Reduce Join ,但是要求:分桶字段 = Join字段 ,桶的个数相等或者成倍数

-- 开启分桶join
set hive.optimize.bucketmapjoin = true;

不过在 Hive 中考虑到了数据倾斜问题,因此在 Hive 中提供了 Skew Join ,这种Join的原理是将Map JoinReduce Join进行合并,如果某个值出现了数据倾斜,就会将产生数据倾斜的数据单独使用Map Join来实现,其他没有产生数据倾斜的数据由Reduce Join来实现,这样就避免了Reduce Join中产生数据倾斜的问题,最终将Map Join的结果和Reduce Join的结果进行Union合并:

-- 开启运行过程中skewjoin
set hive.optimize.skewjoin=true;
-- 如果这个key的出现的次数超过这个范围
set hive.skewjoin.key=100000;
-- 在编译时判断是否会产生数据倾斜
set hive.optimize.skewjoin.compiletime=true;
-- 不合并,提升性能
set hive.optimize.union.remove=true;
-- 如果Hive的底层走的是MapReduce,必须开启这个属性,才能实现不合并
set mapreduce.input.fileinputformat.input.dir.recursive=true;

六、并行执行

Hive在计算运行时,会解析为多个Stage,有时候Stage彼此之间有依赖关系,只能挨个执行,但是在一些别的场景下,很多的Stage之间是没有依赖关系的,例如Union语句,Join语句等等,这些Stage没有依赖关系,但Hive依旧默认挨个执行每个Stage,这样会导致性能非常差,我们可以通过修改参数,开启并行执行,当多个Stage之间没有依赖关系时,允许多个Stage并行执行,提高性能。

-- 开启Stage并行化,默认为false
SET hive.exec.parallel=true;
-- 指定并行化线程数,默认为8
SET hive.exec.parallel.thread.number=16; 

七、关联优化

例如执行下面这种操作场景:

select  * from table group by createTime order by createTime desc;

该SQL语句转换为MapReduce时,会生成两个 MapReduces 程序执行,第一个MapReducegroup by 然后 shuffle 后对id做分组,第二个MapReduce 对第一个MapReduce的结果做order by

这种情况下,由于分组和排序都是一个字段,完全可以使用一个MapReduceshuffle 做分组和排序,此时我们可以指定以下参数,对有关联关系的操作进行解析时,可以尽量放在同一个MapReduce中实现:

set hive.optimize.correlation=true;

八、使用CBO优化器引擎

例如:当前有一张表只有100条数据,其中 id 构建了索引,id =100的有90条,如果现在查询所有id = 10 的数据,由于id构建了索引,索引默认的优化器引擎RBO,会选择先从索引中查询id = 10的值所在的位置,再根据索引记录位置去读取对应的数据,但是这并不是最佳的执行方案。有id=10的值占了总数据的90%,这时候直接检索数据返回,要比索引后效率更高,更节省资源,这种方式就是CBO优化器引擎会选择的方案。Hive中也支持RBOCBO这两种引擎,默认使用的是RBO优化器引擎。

使用CBO引擎更加智能,可以通过下面配置,修改优化器引擎为CBO引擎:

set hive.cbo.enable=true;
set hive.compute.query.using.stats=true;
set hive.stats.fetch.column.stats=true;
set hive.stats.fetch.partition.stats=true;

九、Analyze分析优化器

CBO引擎一般搭配analyze分析优化器一起使用,因为CBO引擎需要知道构建数据的元数据,才能基于代价选择合适的处理计划,而Analyze分析优化器用于提前运行一个MapReduce程序将表或者分区的信息构建一些元数据【表的信息、分区信息、列的信息】,很好的配合了 CBO 引擎:

构建表中分区数据的元数据信息:

analyze table test_table partition(addr) compute statistics;

构建表中列的数据的元数据信息:

analyze table test_table compute statistics for columns  userid;

查看构建的列的元数据:

desc formatted test_table userid;

以上是关于Hive3 - 性能优化的主要内容,如果未能解决你的问题,请参考以下文章

【工作】Presto 集群实测,以及与Spark3、Hive3性能对比

hive3之执行计划(Explain)Fetch 抓取本地模式表的优化Group By笛卡尔积行列过滤

hive3之执行计划(Explain)Fetch 抓取本地模式表的优化Group By笛卡尔积行列过滤

Web性能优化:多种优化CSS和加快网站速度的方法

MySQL的优化点总结---通过计算多种状态的百分比看MySQL的性能情况

大数组分时加载算法 timedChunk