测试ClickHouse中写入Parquet格式数据到Hive

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了测试ClickHouse中写入Parquet格式数据到Hive相关的知识,希望对你有一定的参考价值。

参考技术A 先在HDFS创建目录

在CK创建表

创建CK表成功

写入成功,

到HDFS上查看一下

看到数据啦

创建HIVE表关联刚刚的文件

执行查询语句

数据出来啦!!

使用Hive SQL插入动态分区的Parquet表OOM异常分析

参考技术A

1.异常描述

当运行“INSERT ... SELECT”语句向 Parquet 或者 ORC 格式的表中插入数据时,如果启用了动态分区,你可能会碰到以下错误,而导致作业无法正常执行。

Hive 客户端:

(可左右滑动)

YARN 的 8088 中查看具体 map task 报错:

(可左右滑动)

2.异常分析

Parquet 和 ORC 是列式批处理文件格式。这些格式要求在写入文件之前将批次的行(batches of rows)缓存在内存中。在执行 INSERT 语句时,动态分区目前的实现是:至少为每个动态分区目录打开一个文件写入器(file writer)。由于这些缓冲区是按分区维护的,因此在运行时所需的内存量随着分区数量的增加而增加。所以经常会导致 mappers 或 reducers 的 OOM,具体取决于打开的文件写入器(file writer)的数量。

通过 INSERT 语句插入数据到动态分区表中,也可能会超过 HDFS 同时打开文件数的限制。

如果没有 join 或聚合,INSERT ... SELECT 语句会被转换为只有 map 任务的作业。mapper 任务会读取输入记录然后将它们发送到目标分区目录。在这种情况下,每个 mapper 必须为遇到的每个动态分区创建一个新的文件写入器(file writer)。mapper 在运行时所需的内存量随着它遇到的分区数量的增加而增加。

3.异常重现与解决

3.1.生成动态分区的几个参数说明

hive.exec.dynamic.partition

默认值:false

是否开启动态分区功能,默认 false 关闭。

使用动态分区时候,该参数必须设置成 true;

hive.exec.dynamic.partition.mode

默认值:strict

动态分区的模式,默认 strict,表示必须指定至少一个分区为静态分区,nonstrict 模式表示允许所有的分区字段都可以使用动态分区。

一般需要设置为 nonstrict

hive.exec.max.dynamic.partitions.pernode

默认值:100

在每个执行 MR 的节点上,最大可以创建多少个动态分区。

该参数需要根据实际的数据来设定。

比如:源数据中包含了一年的数据,即 day 字段有 365 个值,那么该参数就需要设置成大于 365,如果使用默认值 100,则会报错。

hive.exec.max.dynamic.partitions

默认值:1000

在所有执行 MR 的节点上,最大一共可以创建多少个动态分区。

同上参数解释。

hive.exec.max.created.files

默认值:100000

整个 MR Job 中,最大可以创建多少个 HDFS 文件。

一般默认值足够了,除非你的数据量非常大,需要创建的文件数大于 100000,可根据实际情况加以调整。

mapreduce.map.memory.mb

map 任务的物理内存分配值,常见设置为 1GB,2GB,4GB 等。

mapreduce.map.java.opts

map 任务的 Java 堆栈大小设置,一般设置为小于等于上面那个值的 75%,这样可以保证 map 任务有足够的堆栈外内存空间。

mapreduce.input.fileinputformat.split.maxsize

mapreduce.input.fileinputformat.split.minsize

这个两个参数联合起来用,主要是为了方便控制 mapreduce 的 map 数量。比如我设置为 1073741824,就是为了让每个 map 处理 1GB 的文件。

3.2.一个例子

Fayson 在前两天给人调一个使用 Hive SQL 插入动态分区的 Parquet 表时,总是报错 OOM,也是折腾了很久。以下我们来看看整个过程。

1.首先我们看看执行脚本的内容,基本其实就是使用 Hive 的 insert 语句将文本数据表插入到另外一张 parquet 表中,当然使用了动态分区。

2.我们看看原始数据文件,是文本文件,一共 120 个,每个 30GB 大小,总共差不多 3.6TB。

3.我们看看报错

4.因为是一个只有 map 的 mapreduce 任务,当我们从 YARN 的 8088 观察这个作业时可以发现,基本没有一个 map 能够执行成功,全部都是失败的。报上面的错误。

5.把 mapreduce.map.memory.mb 从 2GB 增大到 4GB,8GB,16GB,相应 mapreduce.map.java.opts 增大到 3GB,6GB,12GB。依旧报错 OOM。

6.后面又将 mapreduce.input.fileinputformat.split.maxsize 从 1GB,减少为 512MB,256MB,从而增大 map 数量,缩小单个 map 处理文件的大小。依旧报错 OOM。

7.最后启用 hive.optimize.sort.dynamic.partition,增加 reduce 过程,作业执行成功。

8.最后查看结果文件大约 1.2TB,约为输入文件的三分之一。一共 1557 个分区,最大的分区文件为 2GB。

4.异常总结

对于这个异常,我们建议有以下三种方式来处理:

1.启用 hive.optimize.sort.dynamic.partition,将其设置为 true。通过这个优化,这个只有 map 任务的 mapreduce 会引入 reduce 过程,这样动态分区的那个字段比如日期在传到 reducer 时会被排序。由于分区字段是排序的,因此每个 reducer 只需要保持一个文件写入器(file writer)随时处于打开状态,在收到来自特定分区的所有行后,关闭记录写入器(record writer),从而减小内存压力。这种优化方式在写 parquet 文件时使用的内存要相对少一些,但代价是要对分区字段进行排序。

2.第二种方式就是增加每个 mapper 的内存分配,即增大 mapreduce.map.memory.mb 和 mapreduce.map.java.opts,这样所有文件写入器(filewriter)缓冲区对应的内存会更充沛。

3.将查询分解为几个较小的查询,以减少每个查询创建的分区数量。这样可以让每个 mapper 打开较少的文件写入器(file writer)。

备注:

默认情况下,Hive 为每个打开的 Parquet 文件缓冲区(file buffer)分配 128MB。这个 buffer 大小由参数 parquet.block.size 控制。为获得最佳性能,parquet 的 buffer size 需要与 HDFS 的 block size 保持对齐(比如相等),从而使每个 parquet 文件在单个 HDFS 的块中,以便每个 I/O 请求都可以读取整个数据文件,而无需通过网络传输访问后续的 block。

参考:

https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties

http://blog.cloudera.com/blog/2014/03/how-to-use-parquet-with-impala-hive-pig-mapreduce/

https://www.cloudera.com/documentation/enterprise/latest/topics/cdh_ig_parquet.html

https://issues.cloudera.org/browse/IMPALA-2521

https://issues.apache.org/jira/browse/HIVE-6455

http://blog.csdn.net/qq_26937525/article/details/54946281

以上是关于测试ClickHouse中写入Parquet格式数据到Hive的主要内容,如果未能解决你的问题,请参考以下文章

AWS Glue 作业以 Parquet 格式写入 s3 并出现 Not Found 错误

Flink实战系列Flink使用StreamingFileSink写入HDFS(parquet格式snappy压缩)

由于某些字段的值未知,使用 Spark 写入 parquet 文件的数字格式异常 [重复]

Clickhouse基于TCP的一个clickhouse写入软件clickhouse_sinker性能测试

Parquet性能测试之项目实践中应用测试

使用经 EMRFS S3 优化的提交器提高 Apache Spark 写入 Apache Parquet 格式文件的性能