HIVE:小文件合并

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HIVE:小文件合并相关的知识,希望对你有一定的参考价值。

参考技术A HDFS非常容易存储大数据文件,如果Hive中存在过多的小文件会给namecode带来巨大的性能压力。同时小文件过多会影响JOB的执行,hadoop会将一个job转换成多个task,即使对于每个小文件也需要一个task去单独处理,task作为一个独立的jvm实例,其开启和停止的开销可能会大大超过实际的任务处理时间。

同时我们知道hive输出最终是mr的输出,即reducer(或mapper)的输出,有多少个reducer(mapper)输出就会生成多少个输出文件,根据shuffle/sort的原理,每个文件按照某个值进行shuffle后的结果。

为了防止生成过多小文件,hive可以通过配置参数在mr过程中合并小文件。而且在执行sql之前将小文件都进行Merge,也会提高程序的性能。我们可以从两个方面进行优化,其一是map执行之前将小文件进行合并会提高性能,其二是输出的时候进行合并压缩,减少IO压力。

HDFS的文件元信息,包括位置、大小、分块信息等,都是保存在NameNode的内存中的。每个对象大约占用150个字节,因此一千万个文件及分块就会占用约3G的内存空间,一旦接近这个量级,NameNode的性能就会开始下降了。此外,HDFS读写小文件时也会更加耗时,因为每次都需要从NameNode获取元信息,并与对应的DataNode建立连接。对于MapReduce程序来说,小文件还会增加Mapper的个数,每个脚本只处理很少的数据,浪费了大量的调度时间。当然,这个问题可以通过使用CombinedInputFile和JVM重用来解决。

汇总后的数据量通常比源数据要少得多。而为了提升运算速度,我们会增加Reducer的数量,Hive本身也会做类似优化——Reducer数量等于源数据的量除以hive.exec.reducers.bytes.per.reducer所配置的量(默认1G)。Reducer数量的增加也即意味着结果文件的增加,从而产生小文件的问题。

【参考】
https://blog.csdn.net/djd1234567/article/details/51581201

Hive任务优化—— 小文件合并相关参数

文章目录

一、Hive提供的文件合并功能

熟悉hdfs的都知道,hdfs不建议存储小文件,原因是大量的小文件会给namenode带来太大的负担。因此,我们在使用hdfs过程中应该尽量保证输出到hdfs的文件不会有大量零碎的小文件。

在大多数情况下,hive都是将数据文件存储在hdfs上。当hive执行类似insert overwrite directory或者insert table等语句时,都会往hdfs写文件。具体生成的文件数量和文件大小则和任务的数据量、执行计划有关。如果我们可以准确的知道一个任务的输入数据量以及对应的执行计划,那么我们则可以通过调整hive任务的map数量和reduce数量来控制最终落地到hdfs的文件数量(一个任务落地的数据量大小肯定是固定的,我们能调整的只有文件数量)。这样就可以尽量保证这个任务落地的文件不会是零碎的小文件了。

但是作为集群管理者,我们肯定无法知道所有任务的数据量和执行计划,也不可能对每个任务进行这些参数调整的优化。Hive也考虑到这一点,因此在早期的版本中就加入了相关参数来控制任务输出的文件数量。

参数名作用
hive.merge.mapfiles是否在纯Map的任务(没有reduce task)后开启小文件合并
hive.merge.mapredfiles是否在mapreduce任务后开启小文件合并
hive.merge.sparkfiles是否在hive on spark任务后开启小文件合并
hive.merge.smallfiles.avgsize如果原先输出的文件平均大小小于这个值,则开启小文件合并。比如输出原本有100个文件,总大小1G,那平均每个文件大小只有10M,如果我们这个参数设置为16M,这时就会开启文件合并
hive.merge.size.per.task开启小文件合并后,预期的一个合并文件的大小。比如原先的总大小有1G,我们预期一个文件256M的话,那么最终经过合并会生成4个文件。

hive文件合并的实现原理

Hive在任务结束后,不同的引擎根据不同的参数来判断是否需要进行文件合并检查。比如如果使用的spark引擎,则需要设置hive.merge.sparkfiles为true,如果mapreduce引擎,则设置hive.merge.mapredfiles为true。

是否开启文件合并检查我们可以通过hive的执行计划来判断。我们先将hive.merge.sparkfiles设置为false,然后执行以下语句输出执行计划:

explain insert overwrite directory '/tmp/' select * from yjbtest.test;

最后得到如下执行计划:

+----------------------------------------------------+
|                      Explain                       |
+----------------------------------------------------+
| STAGE DEPENDENCIES:                                |
|   Stage-1 is a root stage                          |
|   Stage-0 depends on stages: Stage-1               |
|                                                    |
| STAGE PLANS:                                       |
|   Stage: Stage-1                                   |
|     Spark                                          |
|       DagName: hadoopuser_20190909151818_944f4632-cd15-4263-ad0e-f210aff3d6b3:3028 |
|       Vertices:                                    |
|         Map 1                                      |
|             Map Operator Tree:                     |
|                 TableScan                          |
|                   alias: test                      |
|                   Statistics: Num rows: 141 Data size: 903 Basic stats: COMPLETE Column stats: NONE |
|                   Select Operator                  |
|                     expressions: id (type: int), name (type: string) |
|                     outputColumnNames: _col0, _col1 |
|                     Statistics: Num rows: 141 Data size: 903 Basic stats: COMPLETE Column stats: NONE |
|                     File Output Operator           |
|                       compressed: false            |
|                       Statistics: Num rows: 141 Data size: 903 Basic stats: COMPLETE Column stats: NONE |
|                       table:                       |
|                           input format: org.apache.hadoop.mapred.TextInputFormat |
|                           output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat |
|                           serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe |
|                                                    |
|   Stage: Stage-0                                   |
|     Move Operator                                  |
|       files:                                       |
|           hdfs directory: true                     |
|           destination: /tmp                        |
|                                                    |
+----------------------------------------------------+

上面的执行计划也很简单,就是分成两个阶段,Stage-1完成后会将结果输出到某个临时目录,之后Stage-0直接Move到最终的输出目录即可(不会进行小文件合并)。

之后我们把hive.merge.sparkfiles参数设置为true,再输出执行计划,会得到:

+----------------------------------------------------+
|                      Explain                       |
+----------------------------------------------------+
| STAGE DEPENDENCIES:                                |
|   Stage-1 is a root stage                          |
|   Stage-6 depends on stages: Stage-1 , consists of Stage-3, Stage-2, Stage-4 |
|   Stage-3                                          |
|   Stage-0 depends on stages: Stage-3, Stage-2, Stage-5 |
|   Stage-2                                          |
|   Stage-4                                          |
|   Stage-5 depends on stages: Stage-4               |
|                                                    |
| STAGE PLANS:                                       |
|   Stage: Stage-1                                   |
|     Spark                                          |
|       DagName: hadoopuser_20190909152045_69d1724e-184a-4883-b38d-3aad1334f787:3029 |
|       Vertices:                                    |
|         Map 1                                      |
|             Map Operator Tree:                     |
|                 TableScan                          |
|                   alias: test                      |
|                   Statistics: Num rows: 141 Data size: 903 Basic stats: COMPLETE Column stats: NONE |
|                   Select Operator                  |
|                     expressions: id (type: int), name (type: string) |
|                     outputColumnNames: _col0, _col1 |
|                     Statistics: Num rows: 141 Data size: 903 Basic stats: COMPLETE Column stats: NONE |
|                     File Output Operator           |
|                       compressed: false            |
|                       Statistics: Num rows: 141 Data size: 903 Basic stats: COMPLETE Column stats: NONE |
|                       table:                       |
|                           input format: org.apache.hadoop.mapred.TextInputFormat |
|                           output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat |
|                           serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe |
|                                                    |
|   Stage: Stage-6                                   |
|     Conditional Operator                           |
|                                                    |
|   Stage: Stage-3                                   |
|     Move Operator                                  |
|       files:                                       |
|           hdfs directory: true                     |
|           destination: hdfs://test45cluster/tmp/.hive-staging_hive_2019-09-09_15-20-45_204_1372713564087112462-2811/-ext-10000 |
|                                                    |
|   Stage: Stage-0                                   |
|     Move Operator                                  |
|       files:                                       |
|           hdfs directory: true                     |
|           destination: /tmp                        |
|                                                    |
|   Stage: Stage-2                                   |
|     Spark                                          |
|       DagName: hadoopuser_20190909152045_69d1724e-184a-4883-b38d-3aad1334f787:3030 |
|       Vertices:                                    |
|         Spark Merge File Work                      |
|             Map Operator Tree:                     |
|                 TableScan                          |
|                   File Output Operator             |
|                     compressed: false              |
|                     table:                         |
|                         input format: org.apache.hadoop.mapred.TextInputFormat |
|                         output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat |
|                         serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe |
|                                                    |
|   Stage: Stage-4                                   |
|     Spark                                          |
|       DagName: hadoopuser_20190909152045_69d1724e-184a-4883-b38d-3aad1334f787:3030 |
|       Vertices:                                    |
|         Spark Merge File Work                      |
|             Map Operator Tree:                     |
|                 TableScan                          |
|                   File Output Operator             |
|                     compressed: false              |
|                     table:                         |
|                         input format: org.apache.hadoop.mapred.TextInputFormat |
|                         output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat |
|                         serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe |
|                                                    |
|   Stage: Stage-5                                   |
|     Move Operator                                  |
|       files:                                       |
|           hdfs directory: true                     |
|           destination: hdfs://test45cluster/tmp/.hive-staging_hive_2019-09-09_15-20-45_204_1372713564087112462-2811/-ext-10000 |
|                                                    |
+----------------------------------------------------+

我们看到多了很多Stage,这些Stage就是和文件合并相关的。其中关键是Stage-6(Stage 1执行完后执行它),它是一个ConditionalTask,主要根据一些条件判断要执行哪个Stage:

  • Stage 3:不执行文件合并,直接Move数据到目标目录
  • Stage 2:开启一个Job,读取之前临时目录的数据,然后输出到另一个临时目录
  • Stage 4:开启一个Job,读取之前临时目录的数据,然后输出到另一个临时目录(和Stage 2不同的是它后面还接了一个Stage 5。这种情况主要是在动态分区时,有些分区目录的文件已经不需要合并了,所以把整个合并流程分成merge and move)

总之,如果根据参数 hive.merge.smallfiles.avgsize 发现需要做小文件合并,则开启一个新的Job进行小文件合并。这就是Hive进行文件合并的真相。

二、文件合并在Hive on Spark中的失效问题

Hive中的hive.merge.size.per.task参数是用来控制合并后的文件的预期大小的,但是我们在hive on spark的测试过程中发现这个参数并没有生效,而在hive on mr中却可以生效。

一开始怀疑是不是hive on spark中没开启文件合并的job,但是看了hive的执行日志和job的执行记录,都表明已经开启了一个新的Job来合并文件,只是这个合并Job的map task数量和预期的有出入。

我们已经知道Hive会提交一个新的Job来合并文件,很明显,这个Job应该是一个Map Only的任务,因此Map task的数量就是最终文件的数量。但是通过实际观察,在Hive on Spark中,无论hive.merge.size.per.task参数怎么调整,合并Job的Map task数量都不会调整。

既然和Map task的数量有关,那么我们可以回想一下正常情况下,我们要怎么调整一个Job的Map task数量。主要和以下参数有关:

  • maxSize: 通过mapreduce.input.fileinputformat.split.maxsize参数设置
  • minSizePerNode: 通过mapreduce.input.fileinputformat.split.minsize.per.node参数设置
  • minSizePerRack:通过mapreduce.input.fileinputformat.split.minsize.per.rack参数设置

具体的可以看这篇博客:【Hive任务优化】—— Map、Reduce数量调整

所以正常情况下,Map数量和hive.merge.size.per.task是没有关系的。那么hive.merge.size.per.task是怎么作用到这几个参数上,来修改合并Job的Map task数量呢?

因为Hive on Mr是可以生效的,因此看了下相关代码。发现Hive在确定需要合并时,会往相关的Task的MapWork中设置hive.merge.size.per.task的值。之后提交MapReduce任务时,会获取这个值,然后设置到Job参数中(就是设置影响map task的那几个参数):

//ExecDriver.java
public static void propagateSplitSettings(JobConf job, MapWork work) 
  if (work.getNumMapTasks() != null) 
    job.setNumMapTasks(work.getNumMapTasks().intValue());
  
 
  if (work.getMaxSplitSize() != null) 
    HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMAXSPLITSIZE, work.getMaxSplitSize().longValue());
  
 
  if (work.getMinSplitSize() != null) 
    HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZE, work.getMinSplitSize().longValue());
  
 
  if (work.getMinSplitSizePerNode() != null) 
    HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERNODE, work.getMinSplitSizePerNode().longValue());
  
 
  if (work.getMinSplitSizePerRack() != null) 
    HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERRACK, work.getMinSplitSizePerRack().longValue());
  

后面又找了下Hive on spark提交Job的相关代码,并没有找到设置相关参数的代码

解决方案

修改Hive源码,在Hive on spark提交Job的代码加上设置相关参数的代码:

//RemoteHiveSparkClient.java类的submit方法
//update the credential provider location in the jobConf
HiveConfUtil.updateJobCredentialProviders(jobConf);
//插入代码段开始
try 
  //从work中获取相关的参数,提前设置相关参数。不然spark在进行文件合并时不会生效
  MapWork mapWork = (MapWork) sparkWork.getAllWork().get(0);
  propagateSplitSettings(jobConf,mapWork);
 catch (Exception e) 
  e.printStackTrace();

//插入代码段结束
// Create temporary scratch dir
final Path emptyScratchDir = ctx.getMRTmpPath();

三、开启文件合并的优缺点

我们知道,文件合并需要开启一个新的Job,因此任务整体性能肯定会降低,使用的资源也会变多,这是文件合并的缺点

但是从长远来看,还是开启文件合并比较好。因为过多的小文件如果不断积累,会给hdfs集群带来难以想象的负担。当集群因为小文件数量达到瓶颈,要处理起这些小文件也需要一定的工作量。与其这样,不如提前开启文件合并,避免过多的小文件生成。

或者使用折中方案,我们可以调低hive.merge.smallfiles.avgsize的值,这样就可以只针对那些真正很零散的小文件进行合并。

以上是关于HIVE:小文件合并的主要内容,如果未能解决你的问题,请参考以下文章

Hive任务优化—— 小文件合并相关参数

如何合并 sparksql 保存在 hive 上的小文件?

text Hive优化小文件合并

hive合并小文件

HIVE优化和数据倾斜合并小文件

hive小文件合并设置参数