你如何控制输出文件的大小?
Posted
技术标签:
【中文标题】你如何控制输出文件的大小?【英文标题】:How do you control the size of the output file? 【发布时间】:2017-01-04 09:14:40 【问题描述】:在 spark 中,控制输出文件大小的最佳方法是什么。例如,在 log4j 中,我们可以指定最大文件大小,之后文件会旋转。
我正在为镶木地板文件寻找类似的解决方案。写入文件时是否有最大文件大小选项可用?
我有几个解决方法,但没有一个是好的。如果我想将文件限制为 64mb,那么一种选择是重新分区数据并写入临时位置。然后使用临时位置中的文件大小将文件合并在一起。但是获得正确的文件大小是很困难的。
【问题讨论】:
只是想知道输出文件中相同大小的用例是什么。 试图保持文件大小一致。例如,当我在不同的分区中写入文件时,某些分区文件要大 10 倍。 df.repartition(35).write.mode(SaveMode.Overwrite).partitionBy(list:_*).parquet("tmp5") 【参考方案1】:Spark 无法控制 Parquet 文件的大小,因为内存中的 DataFrame 需要在写入磁盘之前进行编码和压缩。在此过程完成之前,无法估计磁盘上的实际文件大小。
所以我的解决方案是:
将 DataFrame 写入 HDFS,df.write.parquet(path)
获取目录大小并计算文件数
val fs = FileSystem.get(sc.hadoopConfiguration)
val dirSize = fs.getContentSummary(path).getLength
val fileNum = dirSize/(512 * 1024 * 1024) // let's say 512 MB per file
读取目录并重新写入HDFS
val df = sqlContext.read.parquet(path)
df.coalesce(fileNum).write.parquet(another_path)
不要重复使用原来的df
,否则会触发你的工作两次。
删除旧目录并重新命名新目录
fs.delete(new Path(path), true)
fs.rename(new Path(newPath), new Path(path))
这个方案有个缺点,就是需要两次写入数据,磁盘IO翻倍,但目前只有这个方案。
【讨论】:
我可以用 Spark SQL 做类似的事情吗?我想控制 fileNum 并且不太关心每个文件的文件大小。 @soulmachine - 您能否详细说明“不要重复使用原始 df,否则会触发您的工作两次。”【参考方案2】:这是我的解决方案,对我来说很有趣。
val repartition_num = 20
val hqc = new org.apache.spark.sql.hive.HiveContext(sc)
val t1 = hqc.sql("select * from customer")
// 20 parquet files will be generated in hdfs dir
// JUST control your file with partition number
t1.repartition(repartition_num ).saveAsParquetFile(parquet_dir)
结果如下:
> hadoop fs -ls /tpch-parquet/customer/*.parquet | wc -l
20
【讨论】:
-1。这没有回答 OP 的问题(控制文件大小),而是一个完全不同的问题(控制文件数量) 如果您重新分区到更大的数字,它也会缩小平均文件大小。 wfm【参考方案3】:正如其他人所提到的,您不能明确地达到每个文件的目标大小。但是,您可以让所有输出文件的行数大致相同。如果您平均知道压缩比是什么样的,那么在输出文件中均匀分布行(最大为 max_rows)将使您的目标大小保持一致。
如果你在写之前做一个partitionBy,这说起来容易做起来难。下面是我们如何做到这一点的一些伪代码:
-- #3 distribute partitionC's rows based on partitions plus random integer that pertains to file number
select * from dataframe_table as t4
inner join
-- #2 calculate the number of output files per partition
((select t1.partitionA, t1.partitionB, cast(t2.partition_num_rows / max_rows as int) + 1 as partition_num_files from dataframe_table) as t1
inner join
-- #1 determine number of rows in output partition
(select partitionA, partitionB, count(*) as partition_num_rows from dataframe_table group by (partitionA, partitionB)) as t2
on t1.partitionA = t2.partitionA and t1.partitionB = t2.partitionB) as t3
on t3.partitionA = t4.partitionA and t3.partitionB=t4.partitionB
distribute by (t4.partitionA, t4.partitionC, floor(rand() * t3.partition_num_files)) sort by (partitionC, sortfield)
我在这里对分区进行了排序,因为在我们的用例中,这极大地提高了压缩率,同时对性能的影响最小。
如果您从第 1 步和第 2 步得到的结果足够小,Spark 可能能够广播加入它们以加快它们的速度。
【讨论】:
【参考方案4】:好的,考虑到目标文件的大小、内存使用和执行时间,这是我完善的方法。这些文件还包括快速压缩和字典编码。
我的 HDFS 块大小是 128 兆 (128 * 1024 * 1024):
<property>
<name>dfs.blocksize</name>
<value>134217728</value>
</property>
这是我最终的 parquet 文件,它们都非常接近 hdfs 块的大小。
133916650 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0001.parquet
133459404 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0002.parquet
133668445 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0003.parquet
134004329 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0004.parquet
134015650 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0005.parquet
132053162 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0006.parquet
132917851 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0007.parquet
122594040 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0008.parquet
我就是这样做的..
A.想出一个粗略的行数来生成一堆 10 兆左右的 SMALL parquet 文件。就我而言,我选择了 200,000 条记录。许多较小的 parquet 文件比一个大型 parquet 文件更节省空间,因为如果单个文件中的数据种类更多,字典编码和其他压缩技术就会被放弃。一次写大约 10 兆也可以释放内存。
您的文件将如下所示:
07916650 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0001.parquet
12259404 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0002.parquet
11368445 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0003.parquet
07044329 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0004.parquet
13145650 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0005.parquet
08534162 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0006.parquet
12178451 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0007.parquet
11940440 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0008.parquet
09166540 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0009.parquet
12594044 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0010.parquet
11684245 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0011.parquet
07043129 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0012.parquet
13153650 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0013.parquet
08533162 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0014.parquet
12137851 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0015.parquet
11943040 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0016.parquet
B.创建所有较小 parquet 文件的列表,文件大小加在一起时不超过 HDFS 块大小。在上面的例子中:
/year=2018/month=01/HoldingDetail_201801_0001.parquet
to
/year=2018/month=01/HoldingDetail_201801_0012.parquet
plus
/year=2018/month=01/HoldingDetail_201801_0014.parquet
占用 133,408,651 字节。
C.打开一个名为 HoldingDetail_201801_temp.parquet 的新文件
一次读取一个列表中的所有较小文件,并将它们作为 parquet ROW GROUP 写入临时文件。将每个文件作为一个行组写入非常重要,这样可以保留压缩编码并保证写入的字节数(减去架构元数据)与原始文件大小相同。
删除列表中所有较小的文件。 将临时文件重命名为 HoldingDetail_201801_0001.parquet。
对剩余的较小文件重复步骤 B 和 C,以创建 *_0002.parquet、*_0003.parquet、*_0004.parquet 等,这些文件将是大小略低于 hdfs 块大小的目标文件。
(我还添加了一个检查,如果文件大小的总和 > 0.95 * dfs.blocksize 然后继续合并找到的文件)
【讨论】:
你已经很久没有做这个练习了,但我认为如果你能分享你使用的 Spark 代码,读者会发现它很有用。 :D 无论如何,很好的解决方案。【参考方案5】:Spark 中还没有 roll-after-specific-size 选项,但最好的选择是:在特定数量的记录后滚动。
由于Spark 2.2,可以设置maxRecordsPerFile
。
另见https://***.com/a/48143315/630269
【讨论】:
你知道有没有像 minRecordsPerFile 这样的东西吗? 对不起,我没看到。也许您可以在那里创建一个问题并描述您的用例?以上是关于你如何控制输出文件的大小?的主要内容,如果未能解决你的问题,请参考以下文章