如何避免Spark SQL做数据导入时产生大量小文件

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何避免Spark SQL做数据导入时产生大量小文件相关的知识,希望对你有一定的参考价值。

参考技术A 生产上,我们往往将Spark SQL作为Hive的替代方案,来获得SQL on Hadoop更出色的性能。因此,本文所讲的是指存储于HDFS中小文件,即指文件的大小远小于HDFS上块(dfs.block.size)大小的文件。

比如我们拿TPCDS测试集中的store_sales进行举例, sql如下所示

首先我们得到其执行计划,如下所示,

store_sales的原生文件包含1616逻辑分片,对应生成1616 个Spark Task,插入动态分区表之后生成1824个数据分区加一个NULL值的分区,每个分区下都有可能生成1616个文件,这种情况下,最终的文件数量极有可能达到2949200。1T的测试集store_sales也就大概300g,这种情况每个文件可能就零点几M。

比如,为了防止Shuffle阶段的数据倾斜我们可以在上面的sql中加上 distribute by rand() ,这样我们的执行计划就变成了,

这种情况下,这样我们的文件数妥妥的就是spark.sql.shuffle.partitions * N,因为rand函数一般会把数据打散的非常均匀。当spark.sql.shuffle.partitions设置过大时,小文件问题就产生了;当spark.sql.shuffle.partitions设置过小时,任务的并行度就下降了,性能随之受到影响。

最理想的情况,当然是根据分区字段进行shuffle,在上面的sql中加上 distribute by ss_sold_date_sk 。 把同一分区的记录都哈希到同一个分区中去,由一个Spark的Task进行写入,这样的话只会产生N个文件,在我们的case中store_sales,在1825个分区下各种生成了一个数据文件。
但是这种情况下也容易出现数据倾斜的问题,比如双11的销售数据就很容易在这种情况下发生倾斜。

前面已经提到根据分区字段进行分区,除非每个分区下本身的数据较少,分区字段选择不合理,那么小文件问题基本上就不存在了,但是也有可能由于shuffle引入新的数据倾斜问题。

我们首先可以尝试是否可以将两者结合使用, 在之前的sql上加上 distribute by ss_sold_date_sk,cast(rand() * 5 as int) , 这个类似于我们处理数据倾斜问题时候给字段加上后缀的形式。如,

按照之前的推算,每个分区下将产生5个文件,同时null值倾斜部分的数据也被打散成五份进行计算,缓解了数据倾斜的问题 ,我们最终将得到1825 *5=9105个文件,如下所示

如果我们将5改得更小,文件数也会越少,但相应的倾斜key的计算时间也会上去。

在我们知道那个分区键倾斜的情况下,我们也可以将入库的SQL拆成几个部分,比如我们store_sales是因为null值倾斜,我们就可以通过 where ss_sold_date_sk is not null 和 where ss_sold_date_sk is null 将原始数据分成两个部分。前者可以基于分区字段进行分区,如 distribute by ss_sold_date_sk ;后者可以基于随机值进行分区, distribute by cast(rand() * 5 as int) , 这样可以静态的将null值部分分成五个文件。

对于倾斜部分的数据,我们可以开启Spark SQL的自适应功能, spark.sql.adaptive.enabled=true 来动态调整每个相当于Spark的reduce端task处理的数据量,这样我们就不需要人为的感知随机值的规模了,我们可以直接

然后Spark在Shuffle 阶段会自动的帮我们将数据尽量的合并成 spark.sql.adaptive.shuffle.targetPostShuffleInputSize (默认64m)的大小,以减少输出端写文件线程的总量,最后减少个数。
对于 spark.sql.adaptive.shuffle.targetPostShuffleInputSize 参数而言,我们也可以设置成为 dfs.block.size 的大小,这样可以做到和块对齐,文件大小可以设置的最为合理。

在我们的 猛犸大数据平台 上面,随便的建立几个SQL作业,不用会Spark也可以用SQL把大数据玩得666!

双击每个工作节点,我们也可以对我们的SQL作业进行参数的调整

选中我们对应的实验组,点击执行后,可以查看任务的运行状态。

从各组的实验结果来看

实验组一的小文件控制还是可喜可贺的。对于我们1t的tpcds测试数据,null值分区字段下只有40个文件,其他每个数据分区也只有一个数据文件,总目录1825,总文件数1863. 在解决数据倾斜问题的基础上,也只比纯按照分区字段进行distibute by多了39个文件。

本文讲述的是如何在纯写SQL的场景下,如何用Spark SQL做数据导入时候,控制小文件的数量。

PostgreSQL数据库导入大量数据时如何优化

在使用PostgreSQL的时候,我们某些时候会往库里插入大量数据,例如,导入测试数据,导入业务数据等等。本篇文章介绍了在导入大量数据时的一些可供选择的优化手段。可以结合自己的情况进行选择。

一、关闭自动提交

关闭自动提交,并且只在每次(数据拷贝)结束的时候做一次提交。

如果允许每个插入都独立地提交,那么PostgreSQL会为所增加的每行记录做大量的处理。而且在一个事务里完成所有插入的动作的最大的好处就是,如果有一条记录插入失败, 那么,到该点为止的所有已插入记录都将被回滚,这样就不会面对只有部分数据,数据不完整的问题。

二、导入阶段不创建索引,或者导入阶段删除索引

如果你正导入一张表的数据,最快的方法是创建表,用COPY批量导入,然后创建表需要的索引。在已存在数据的表上创建索引要比递增地更新表的每一行记录要快。

如果你对现有表增加大量的数据,可以先删除索引,导入表的数据,然后重新创建索引。当然,在缺少索引的期间,其它数据库用户的数据库性能将有负面的影响。并且我们在删除唯一索引之前还需要仔细考虑清楚,因为唯一约束提供的错误检查在缺少索引的时候会消失。(慎重考虑索引带来的影响)

三、删除外键约束

和索引一样,整体地检查外键约束比检查递增的数据行更高效。所以我们也可以删除外键约束,导入表地数据,然后重建约束会更高效。

四、增大maintenance_work_mem

在装载大量的数据的时候,临时增大 maintenance_work_mem 可以改进性能。这个参数也可以帮助加速CREATE INDEX和ALTER TABLE ADD FOREIGN KEY命令。它不会对COPY本身有很大作用,但是它可以加速创建索引和外键约束。

postgres=# show maintenance_work_mem;
 maintenance_work_mem
----------------------
 64MB
(1 row)

五、单值insert改多值insert

减少SQL解析的时间。

六、关闭归档模式并降低wal日志级别

当使用WAL归档或流复制向一个安装中录入大量数据时,在导入数据结束时,执行一次新的basebackup比执行一次增量WAL更快。

为了防止录入时的增量WAL,可以将wal_level暂时调整为minimal, archive_modet关闭,max_wal_senders设置为0来禁用归档和流复制。但需修改这些设置需要重启服务。

postgres=# show wal_level;
 wal_level
-----------
 minimal
(1 row)

postgres=# show  archive_mode;
 archive_mode
--------------
 off
(1 row)

postgres=# show max_wal_senders;
 max_wal_senders
-----------------
 0
(1 row)

七、增大max_wal_size

临时增大max_wal_size配置变量也可以让大量数据载入更快。这是因为向PostgreSQL中载入大量的数据将导致检查点的发生比平常(由checkpoint_timeout配置变量指定)更频繁。

发生检查点时,所有脏页都必须被刷写到磁盘上。通过在批量数据载入时临时增加max_wal_size,减少检查点的数目。

postgres=# show max_wal_size;
 max_wal_size
--------------
 1GB
(1 row)

八、使用copy替代insert

COPY针对批量数据加载进行了优化。

COPY命令是为装载数量巨大的数据行优化过的;它没INSERT那么灵活,但是在大量装载数据的情况下,导致的荷载也少很多。因为COPY是单条命令,因此填充表的时候就没有必要关闭自动提交了。

如果不能使用COPY,可以使用PREPARE来创建一个预备INSERT,然后使用EXECUTE多次效率更高。这样就避免了重复分析和规划INSERT的开销。

九、禁用触发器

导入数据之前先DISABLE掉相关表上的触发器,导入完成后重新让他ENABLE。

ALTER TABLE tab_1 DISABLE TRIGGER ALL;
导入数据
ALTER TABLE tab_1 ENABLE TRIGGER ALL;

十、相关导数工具:pg_bulkload

pg_bulkload 是 PostgreSQL 的一个高速数据加载工具,相对于 copy 命令。最大的优势是速度。在 pg_bulkload 的直接模式下,它将跳过共享缓冲区和 WAL 缓冲区,直接写入文件。它还包括数据恢复功能,可在导入失败时进行恢复。

地址:https://github.com/ossc-db/pg_bulkload

十一、导入数据后,使用analyze

运行ANALYZE 或者VACUUM ANALYZE可以保证规划器有表数据的最新统计。

如果没有统计数据或者统计数据太陈旧,那么规划器可能选择性能很差的执行计划,导致表的查询性能较差。

以上是关于如何避免Spark SQL做数据导入时产生大量小文件的主要内容,如果未能解决你的问题,请参考以下文章

如何避免 BigQuery 中的 Power BI 增量刷新重复查询?

spark性能调优05-troubleshooting处理

转载执行sql文件限制频率避免流控

转载执行sql文件限制频率避免流控

如何对数据进行物理分区以避免 Spark SQL 连接中的洗牌

Spark SQL - 从 oracle 导入时将 oracle 日期数据类型错误转换为时间戳(java.sql)