使用Hive SQL插入动态分区的Parquet表OOM异常分析
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用Hive SQL插入动态分区的Parquet表OOM异常分析相关的知识,希望对你有一定的参考价值。
参考技术A1.异常描述
当运行“INSERT ... SELECT”语句向 Parquet 或者 ORC 格式的表中插入数据时,如果启用了动态分区,你可能会碰到以下错误,而导致作业无法正常执行。
Hive 客户端:
(可左右滑动)
YARN 的 8088 中查看具体 map task 报错:
(可左右滑动)
2.异常分析
Parquet 和 ORC 是列式批处理文件格式。这些格式要求在写入文件之前将批次的行(batches of rows)缓存在内存中。在执行 INSERT 语句时,动态分区目前的实现是:至少为每个动态分区目录打开一个文件写入器(file writer)。由于这些缓冲区是按分区维护的,因此在运行时所需的内存量随着分区数量的增加而增加。所以经常会导致 mappers 或 reducers 的 OOM,具体取决于打开的文件写入器(file writer)的数量。
通过 INSERT 语句插入数据到动态分区表中,也可能会超过 HDFS 同时打开文件数的限制。
如果没有 join 或聚合,INSERT ... SELECT 语句会被转换为只有 map 任务的作业。mapper 任务会读取输入记录然后将它们发送到目标分区目录。在这种情况下,每个 mapper 必须为遇到的每个动态分区创建一个新的文件写入器(file writer)。mapper 在运行时所需的内存量随着它遇到的分区数量的增加而增加。
3.异常重现与解决
3.1.生成动态分区的几个参数说明
hive.exec.dynamic.partition
默认值:false
是否开启动态分区功能,默认 false 关闭。
使用动态分区时候,该参数必须设置成 true;
hive.exec.dynamic.partition.mode
默认值:strict
动态分区的模式,默认 strict,表示必须指定至少一个分区为静态分区,nonstrict 模式表示允许所有的分区字段都可以使用动态分区。
一般需要设置为 nonstrict
hive.exec.max.dynamic.partitions.pernode
默认值:100
在每个执行 MR 的节点上,最大可以创建多少个动态分区。
该参数需要根据实际的数据来设定。
比如:源数据中包含了一年的数据,即 day 字段有 365 个值,那么该参数就需要设置成大于 365,如果使用默认值 100,则会报错。
hive.exec.max.dynamic.partitions
默认值:1000
在所有执行 MR 的节点上,最大一共可以创建多少个动态分区。
同上参数解释。
hive.exec.max.created.files
默认值:100000
整个 MR Job 中,最大可以创建多少个 HDFS 文件。
一般默认值足够了,除非你的数据量非常大,需要创建的文件数大于 100000,可根据实际情况加以调整。
mapreduce.map.memory.mb
map 任务的物理内存分配值,常见设置为 1GB,2GB,4GB 等。
mapreduce.map.java.opts
map 任务的 Java 堆栈大小设置,一般设置为小于等于上面那个值的 75%,这样可以保证 map 任务有足够的堆栈外内存空间。
mapreduce.input.fileinputformat.split.maxsize
mapreduce.input.fileinputformat.split.minsize
这个两个参数联合起来用,主要是为了方便控制 mapreduce 的 map 数量。比如我设置为 1073741824,就是为了让每个 map 处理 1GB 的文件。
3.2.一个例子
Fayson 在前两天给人调一个使用 Hive SQL 插入动态分区的 Parquet 表时,总是报错 OOM,也是折腾了很久。以下我们来看看整个过程。
1.首先我们看看执行脚本的内容,基本其实就是使用 Hive 的 insert 语句将文本数据表插入到另外一张 parquet 表中,当然使用了动态分区。
2.我们看看原始数据文件,是文本文件,一共 120 个,每个 30GB 大小,总共差不多 3.6TB。
3.我们看看报错
4.因为是一个只有 map 的 mapreduce 任务,当我们从 YARN 的 8088 观察这个作业时可以发现,基本没有一个 map 能够执行成功,全部都是失败的。报上面的错误。
5.把 mapreduce.map.memory.mb 从 2GB 增大到 4GB,8GB,16GB,相应 mapreduce.map.java.opts 增大到 3GB,6GB,12GB。依旧报错 OOM。
6.后面又将 mapreduce.input.fileinputformat.split.maxsize 从 1GB,减少为 512MB,256MB,从而增大 map 数量,缩小单个 map 处理文件的大小。依旧报错 OOM。
7.最后启用 hive.optimize.sort.dynamic.partition,增加 reduce 过程,作业执行成功。
8.最后查看结果文件大约 1.2TB,约为输入文件的三分之一。一共 1557 个分区,最大的分区文件为 2GB。
4.异常总结
对于这个异常,我们建议有以下三种方式来处理:
1.启用 hive.optimize.sort.dynamic.partition,将其设置为 true。通过这个优化,这个只有 map 任务的 mapreduce 会引入 reduce 过程,这样动态分区的那个字段比如日期在传到 reducer 时会被排序。由于分区字段是排序的,因此每个 reducer 只需要保持一个文件写入器(file writer)随时处于打开状态,在收到来自特定分区的所有行后,关闭记录写入器(record writer),从而减小内存压力。这种优化方式在写 parquet 文件时使用的内存要相对少一些,但代价是要对分区字段进行排序。
2.第二种方式就是增加每个 mapper 的内存分配,即增大 mapreduce.map.memory.mb 和 mapreduce.map.java.opts,这样所有文件写入器(filewriter)缓冲区对应的内存会更充沛。
3.将查询分解为几个较小的查询,以减少每个查询创建的分区数量。这样可以让每个 mapper 打开较少的文件写入器(file writer)。
备注:
默认情况下,Hive 为每个打开的 Parquet 文件缓冲区(file buffer)分配 128MB。这个 buffer 大小由参数 parquet.block.size 控制。为获得最佳性能,parquet 的 buffer size 需要与 HDFS 的 block size 保持对齐(比如相等),从而使每个 parquet 文件在单个 HDFS 的块中,以便每个 I/O 请求都可以读取整个数据文件,而无需通过网络传输访问后续的 block。
参考:
https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties
http://blog.cloudera.com/blog/2014/03/how-to-use-parquet-with-impala-hive-pig-mapreduce/
https://www.cloudera.com/documentation/enterprise/latest/topics/cdh_ig_parquet.html
https://issues.cloudera.org/browse/IMPALA-2521
https://issues.apache.org/jira/browse/HIVE-6455
http://blog.csdn.net/qq_26937525/article/details/54946281
保存分区 Parquet
【中文标题】保存分区 Parquet【英文标题】:Saving partitioned Parquet 【发布时间】:2016-10-31 11:09:37 【问题描述】:我想将 df 保存为分区镶木地板,因为逐条插入记录需要时间。任何人都可以帮助我如何保存它并指向蜂巢表。
【问题讨论】:
实际上我一直在增量获取数据,我正在使用动态分区,而不是插入我想要直接保存的数据,作为由一列分区的镶木地板。 【参考方案1】:df.write.parquet("/parquet/file/path")
然后你可以创建一个Hive external table
指向 parquet 文件位置。
【讨论】:
实际上它一直在增量获取数据,我使用动态分区,而不是插入我想要直接保存的数据,作为由一列分区的镶木地板。 @HEMANTHKUMAR:你说的由一列分隔的镶木地板是什么意思?你的意思是 Hive 表分区? 是的,我已经对 hive 表进行了分区 目前我正在将记录记录插入存储为镶木地板的配置单元中,但我想将 df 直接保存为分区镶木地板,而不是逐条插入记录。 @HEMANTHKUMAR:上面的代码和你期望的一样。它不会在 Hive 中插入记录。它将DF存储为镶木地板文件(分区)以上是关于使用Hive SQL插入动态分区的Parquet表OOM异常分析的主要内容,如果未能解决你的问题,请参考以下文章