Hive On Tez小文件合并的技术调研

Posted 虎鲸不是鱼

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hive On Tez小文件合并的技术调研相关的知识,希望对你有一定的参考价值。

Hive On Tez小文件合并的技术调研

背景

在升级到CDP7.1.5之后,默认的运算引擎变成了Tez,之前这篇有讲过:

https://lizhiyong.blog.csdn.net/article/details/126688391

具体参考Cloudera的官方文档:https://docs.cloudera.com/cdp-private-cloud-base/7.1.3/hive-introduction/topics/hive-unsupported.html

并且只能用Tez,调度、血缘等重度依赖租来的阿里云DataPhin,那么最常用的离线跑批任务还是要使用HQL【也就是Hive On Tez】。HQL上手门槛极低,之前搞那种Oracle数据库开发的也可以一周内升任Sql Boy岗位,这是其一;PySpark任务或者用Java/Scala打Jar包的Spark任务由于平台的缺陷无法记录血缘,做溯源及下游影响分析时也是极不方便,只能怼人天靠人工,效率低下。HQL虽然低端并且性能不高,但是一时半会儿还不能被取缔。

原先在CDH5.16中,HQL任务是Hive On MapReduce。写在HQL最上方用于合并小文件的参数到了CDP就不能使用了:

set hive.merge.mapfiles=true;	--这种对只有map的任务有效
set hive.merge.mapredfiles=true;	--这种对有reduce的任务有效
set hive.merge.size.per.task=256000000;	--合并时每个task默认的处理的文件体积256M
set hive.merge.smallfiles.avgsize=16000000;	--写文件时平均体积小于默认的16M就合并

所以急需合并小文件,最好还是这种方式,就可以不用做太大的改动。

方式

直接set启用

既然之前的MapReduce任务有小文件合并的功能,那么找一找,还真就找到了更多的参数,在HiveConf这个Java类里。

在Apache Hive3.1.2的Java源码找到:

package org.apache.hadoop.hive.conf;

public class HiveConf extends Configuration 
HIVEMERGEMAPFILES("hive.merge.mapfiles", true,
        "Merge small files at the end of a map-only job"),
    HIVEMERGEMAPREDFILES("hive.merge.mapredfiles", false,
        "Merge small files at the end of a map-reduce job"),
    HIVEMERGETEZFILES("hive.merge.tezfiles", false, "Merge small files at the end of a Tez DAG"),
    HIVEMERGESPARKFILES("hive.merge.sparkfiles", false, "Merge small files at the end of a Spark DAG Transformation"),
    HIVEMERGEMAPFILESSIZE("hive.merge.size.per.task", (long) (256 * 1000 * 1000),
        "Size of merged files at the end of the job"),
    HIVEMERGEMAPFILESAVGSIZE("hive.merge.smallfiles.avgsize", (long) (16 * 1000 * 1000),
        "When the average output file size of a job is less than this number, Hive will start an additional \\n" +
        "map-reduce job to merge the output files into bigger files. This is only done for map-only jobs \\n" +
        "if hive.merge.mapfiles is true, and for map-reduce jobs if hive.merge.mapredfiles is true."),
    HIVEMERGERCFILEBLOCKLEVEL("hive.merge.rcfile.block.level", true, ""),
    HIVEMERGEORCFILESTRIPELEVEL("hive.merge.orcfile.stripe.level", true,
        "When hive.merge.mapfiles, hive.merge.mapredfiles or hive.merge.tezfiles is enabled\\n" +
        "while writing a table with ORC file format, enabling this config will do stripe-level\\n" +
        "fast merge for small ORC files. Note that enabling this config will not honor the\\n" +
        "padding tolerance config (hive.exec.orc.block.padding.tolerance)."),
    MERGE_CARDINALITY_VIOLATION_CHECK("hive.merge.cardinality.check", true,
      "Set to true to ensure that each SQL Merge statement ensures that for each row in the target\\n" +
        "table there is at most 1 matching row in the source table per SQL Specification.")


参考:https://lizhiyong.blog.csdn.net/article/details/126688391

这篇有扒源码,看到了Hive最终会把AST解析成Tez或者Spark的DAG,那么此处的hive.merge.tezfileshive.merge.sparkfiles按照注释的字面意思,也就很容易看明白,是要在DAG的结尾处来一次merge。这当然就是最简单的方式。由于某些配置默认是false也就是停用状态,所以必须手动set启用才能生效。

所以只需要在HQL任务头上+这些参数即可:

set hive.merge.mapfiles=true;	--这种对只有map的任务有效
set hive.merge.mapredfiles=true;	--这种对有reduce的任务有效
set hive.merge.tezfiles=true;
set hive.merge.size.per.task=256000000;	--合并时每个task默认的处理的文件体积256M
set hive.merge.smallfiles.avgsize=16000000;	--写文件时平均体积小于默认的16M就合并

使用Apache Hive自己搭建了Hive On Spark的难兄难弟们就可以照猫画虎:

set hive.merge.mapfiles=true;	--这种对只有map的任务有效
set hive.merge.mapredfiles=true;	--这种对有reduce的任务有效
set hive.merge.sparkfiles=true;
set hive.merge.size.per.task=256000000;	--合并时每个task默认的处理的文件体积256M
set hive.merge.smallfiles.avgsize=16000000;	--写文件时平均体积小于默认的16M就合并

不过Hive的Calcite和CBO、RBO比起商业化DataBrick的Catalyst优势不明显,这种方式性能并不理想,使用SparkSQL的应该才是大多数。

验证

已经在prod环境充分验证,加入参数后敲:

hadoop fs -count
hadoop fs -du -s -h
hadoop fs -du -h

可以看到小文件情况明显改善。

到这里,肤浅的SQL Boy们就可以止步,已经够用了。

调整reducer个数

有时候,出于种种原因【数据量基本可以预知】,我们希望可以像Spark那样写死文件的个数。这样后续的任务调优、资源配额就比较方便。

Spark默认200个Task:

spark.sql.shuffle.partitions=200

只需要最后写文件或者sql跑insert overwrite前来一句免Shuffle高性能的:

df.coalesce(1)

或者肤浅的Sql Boy们比较能接受的Shuffle低性能的:

df1.repartition(1)

即可。Tez当然也是有办法实现这种写死文件个数的效果,那就是限制reducer个数,利用利用distribute by打散到reducer,每个reducer会写一个文件,最终文件个数就是写死的reducer个数。

验证

create external table if not exists test_small_file1(
	id int,
    message string
)
stored as parquet
location '这里写自己集群的即可'
;

create external table if not exists test_small_file2(
	id int,
    message string
)
partitioned by(
	dt string
)
stored as parquet
location '这里写自己集群的即可'
;

作为结果表。

create external table if not exists test_data_source(
	id int,
    message string,
    dt
)
stored as parquet
location '这里写自己集群的即可'
;

create external table if not exists test_data_source_100w(
	id int,
    message string,
    dt
)
stored as parquet
location '这里写自己集群的即可'
;

create external table if not exists test_data_source_1000w(
	id int,
    message string,
    dt
)
stored as parquet
location '这里写自己集群的即可'
;

作为取数的数据源表。

然后insert数据产生小文件:

insert into test_data_source values(1,'a1',20230310);
...	--这里自己填充
insert into test_data_source values(20,'a20',20230310);

insert into test_data_source values(21,'a21',20230311);
...
insert into test_data_source values(40,'a40',20230311);

insert into test_data_source values(100,'a100',20230312);
...
insert into test_data_source values(199,'a199',20230312);

insert into test_data_source values(200,'a200',20230313);
...
insert into test_data_source values(299,'a299',20230313);

此时在Impala执行:

show files in db_name.test_data_source;

或者直接HDFS查看:

hadoop fs -du -h

都可以看到240个小文件。

直接灌数据到结果表:

insert overwrite table test_small_file1
select id,message from test_data_source;

可以看到结果是2个小文件。

灌入分区表:

insert overwrite table test_small_file2 partition(dt)
select id,message,dt from test_data_source;

可以看到是6个小文件。

这点数据就会产生小文件。

Mock出百万级别的数据量:

insert overwrite table test_data_source_100w
select * from test_date_source;
;
insert overwrite table test_data_source_100w
select * from test_data_source_100w;
;

多次执行,直到数据量超过100w。

直接灌数据到结果表:

insert overwrite table test_small_file1
select id,message from test_data_source_100w;

可以看到结果还是2个小文件。

灌入分区表:

insert overwrite table test_small_file2 partition(dt)
select id,message,dt from test_data_source_100w;

可以看到是8个小文件。

重头戏来了:

set hive.exec.reducers.bytes.per.reduce=5120000000;
set mapreduce.job.reduces=10;
insert overwrite table test_small_file1
select id,message from test_data_source_100w
distribute by cast(rand() * 100 as int)
;

打散为10个小文件。

set hive.exec.reducers.bytes.per.reduce=5120000000;
set mapreduce.job.reduces=10;
insert overwrite table test_small_file2 partition(dt)
select id,message,dt from test_data_source_100w
distribute by cast(rand() * 100 as int)
;

由于4个日期,所以打散成40个小文件。

之后Mock出千万级别的也类似。直接说结果:

最终test_small_file1表有5个小文件,test_small_file2有20个小文件。

但是使用参数后:

最终test_small_file1表有10个小文件,test_small_file2有40个小文件。

所以可以看出,distribute by cast(rand() * 100 as int)这个操作利用了Hash Partitioner,结果散步到了Reduce Task。最后文件的个数=Reduce Task个数。

分区表是每个Partition的结果再次Hash Partitioner打散,所以每个分区路径的parquet文件个数都是=Reduce Task个数。

由于hive.exec.reducers.bytes.per.reduce可以设置的很大,那么只需要修改mapreduce.job.reduces的值,就可以让Tez跑HQL任务时写固定个数的文件。

ACID表的ORC小文件

由于Sql Boy们比较顽固,总是想着把Hive当RDBMS来用,于是。。。说多了都是泪。。。

Hive的ACID表会产生大量的小文件,阈值配置的不合适时,触发合并的次数就很少,导致小文件越来越多。这有点像Hudi的MOR【merge on read】。当下游HQL任务从这些ACID表读数据时,就会由于Map Task过多,出现极其严重的性能问题,这个故事慢慢讲。于是Sql Boy们养成了手动合并小文件的好习惯:

alter table tb_name compact 'major';
alter table tb_name compact 'minor';

可以通过:

show compactions;

查看提交到Yarn的合并任务的记录,自己把unix的timestamp换算成人类能看懂的时间,就知道近期的合并情况。。。任务跑得慢,想起来了就手动合并一下。。。

总结

Hive On Tez主要就是这3种合并小文件的方式。


转载请注明出处:https://lizhiyong.blog.csdn.net/article/details/129511318

以上是关于Hive On Tez小文件合并的技术调研的主要内容,如果未能解决你的问题,请参考以下文章

Hive on Tez

记一发Hive on tez的配置(Hive 3.1.1, Hadoop 3.0.3, Tez 0.9.1)

hive on tez 错误记录

hive on tez配置

hive on spark VS SparkSQL VS hive on tez

hive on spark VS SparkSQL VS hive on tez