你如何控制输出文件的大小?

Posted

技术标签:

【中文标题】你如何控制输出文件的大小?【英文标题】:How do you control the size of the output file? 【发布时间】:2017-01-04 09:14:40 【问题描述】:

在 spark 中,控制输出文件大小的最佳方法是什么。例如,在 log4j 中,我们可以指定最大文件大小,之后文件会旋转。

我正在为镶木地板文件寻找类似的解决方案。写入文件时是否有最大文件大小选项可用?

我有几个解决方法,但没有一个是好的。如果我想将文件限制为 64mb,那么一种选择是重新分区数据并写入临时位置。然后使用临时位置中的文件大小将文件合并在一起。但是获得正确的文件大小是很困难的。

【问题讨论】:

只是想知道输出文件中相同大小的用例是什么。 试图保持文件大小一致。例如,当我在不同的分区中写入文件时,某些分区文件要大 10 倍。 df.repartition(35).write.mode(SaveMode.Overwrite).partitionBy(list:_*).parquet("tmp5") 【参考方案1】:

Spark 无法控制 Parquet 文件的大小,因为内存中的 DataFrame 需要在写入磁盘之前进行编码和压缩。在此过程完成之前,无法估计磁盘上的实际文件大小。

所以我的解决方案是:

将 DataFrame 写入 HDFS,df.write.parquet(path)

获取目录大小并计算文件数

val fs = FileSystem.get(sc.hadoopConfiguration)
val dirSize = fs.getContentSummary(path).getLength
val fileNum = dirSize/(512 * 1024 * 1024)  // let's say 512 MB per file

读取目录并重新写入HDFS

val df = sqlContext.read.parquet(path)
df.coalesce(fileNum).write.parquet(another_path)

不要重复使用原来的df,否则会触发你的工作两次。

删除旧目录并重新命名新目录

fs.delete(new Path(path), true)
fs.rename(new Path(newPath), new Path(path))

这个方案有个缺点,就是需要两次写入数据,磁盘IO翻倍,但目前只有这个方案。

【讨论】:

我可以用 Spark SQL 做类似的事情吗?我想控制 fileNum 并且不太关心每个文件的文件大小。 @soulmachine - 您能否详细说明“不要重复使用原始 df,否则会触发您的工作两次。”【参考方案2】:

这是我的解决方案,对我来说很有趣。

val repartition_num = 20  
val hqc = new org.apache.spark.sql.hive.HiveContext(sc)
val t1 = hqc.sql("select * from customer")

// 20 parquet files will be generated in hdfs dir
// JUST control your file with partition number
t1.repartition(repartition_num ).saveAsParquetFile(parquet_dir)

结果如下:

> hadoop fs -ls /tpch-parquet/customer/*.parquet  | wc -l
20

【讨论】:

-1。这没有回答 OP 的问题(控制文件大小),而是一个完全不同的问题(控制文件数量) 如果您重新分区到更大的数字,它也会缩小平均文件大小。 wfm【参考方案3】:

正如其他人所提到的,您不能明确地达到每个文件的目标大小。但是,您可以让所有输出文件的行数大致相同。如果您平均知道压缩比是什么样的,那么在输出文件中均匀分布行(最大为 max_rows)将使您的目标大小保持一致。

如果你在写之前做一个partitionBy,这说起来容易做起来难。下面是我们如何做到这一点的一些伪代码:

-- #3 distribute partitionC's rows based on partitions plus random integer that pertains to file number
select * from dataframe_table as t4
inner join

    -- #2 calculate the number of output files per partition
    ((select t1.partitionA, t1.partitionB, cast(t2.partition_num_rows / max_rows as int) + 1 as partition_num_files from dataframe_table) as t1
        inner join 

        -- #1 determine number of rows in output partition
        (select partitionA, partitionB, count(*) as partition_num_rows from dataframe_table group by (partitionA, partitionB)) as t2
        on t1.partitionA = t2.partitionA and t1.partitionB = t2.partitionB) as t3

on t3.partitionA = t4.partitionA and t3.partitionB=t4.partitionB
distribute by (t4.partitionA, t4.partitionC, floor(rand() * t3.partition_num_files)) sort by (partitionC, sortfield)

我在这里对分区进行了排序,因为在我们的用例中,这极大地提高了压缩率,同时对性能的影响最小。

如果您从第 1 步和第 2 步得到的结果足够小,Spark 可能能够广播加入它们以加快它们的速度。

【讨论】:

【参考方案4】:

好的,考虑到目标文件的大小、内存使用和执行时间,这是我完善的方法。这些文件还包括快速压缩和字典编码。

我的 HDFS 块大小是 128 兆 (128 * 1024 * 1024):

<property>
    <name>dfs.blocksize</name>
    <value>134217728</value>
</property>

这是我最终的 parquet 文件,它们都非常接近 hdfs 块的大小。

133916650 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0001.parquet
133459404 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0002.parquet
133668445 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0003.parquet
134004329 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0004.parquet
134015650 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0005.parquet
132053162 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0006.parquet
132917851 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0007.parquet
122594040 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0008.parquet

我就是这样做的..

A.想出一个粗略的行数来生成一堆 10 兆左右的 SMALL parquet 文件。就我而言,我选择了 200,000 条记录。许多较小的 parquet 文件比一个大型 parquet 文件更节省空间,因为如果单个文件中的数据种类更多,字典编码和其他压缩技术就会被放弃。一次写大约 10 兆也可以释放内存。

您的文件将如下所示:

07916650 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0001.parquet
12259404 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0002.parquet
11368445 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0003.parquet
07044329 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0004.parquet
13145650 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0005.parquet
08534162 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0006.parquet
12178451 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0007.parquet
11940440 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0008.parquet
09166540 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0009.parquet
12594044 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0010.parquet
11684245 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0011.parquet
07043129 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0012.parquet
13153650 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0013.parquet
08533162 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0014.parquet
12137851 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0015.parquet
11943040 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0016.parquet

B.创建所有较小 parquet 文件的列表,文件大小加在一起时不超过 HDFS 块大小。在上面的例子中:

/year=2018/month=01/HoldingDetail_201801_0001.parquet
to
/year=2018/month=01/HoldingDetail_201801_0012.parquet
plus
/year=2018/month=01/HoldingDetail_201801_0014.parquet

占用 133,408,651 字节。

C.打开一个名为 HoldingDetail_201801_temp.parquet 的新文件

一次读取一个列表中的所有较小文件,并将它们作为 parquet ROW GROUP 写入临时文件。将每个文件作为一个行组写入非常重要,这样可以保留压缩编码并保证写入的字节数(减去架构元数据)与原始文件大小相同。

删除列表中所有较小的文件。 将临时文件重命名为 HoldingDetail_201801_0001.parquet。

对剩余的较小文件重复步骤 B 和 C,以创建 *_0002.parquet、*_0003.parquet、*_0004.parquet 等,这些文件将是大小略低于 hdfs 块大小的目标文件。

(我还添加了一个检查,如果文件大小的总和 > 0.95 * dfs.blocksize 然后继续合并找到的文件)

【讨论】:

你已经很久没有做这个练习了,但我认为如果你能分享你使用的 Spark 代码,读者会发现它很有用。 :D 无论如何,很好的解决方案。【参考方案5】:

Spark 中还没有 roll-after-specific-size 选项,但最好的选择是:在特定数量的记录后滚动。

由于Spark 2.2,可以设置maxRecordsPerFile

另见https://***.com/a/48143315/630269

【讨论】:

你知道有没有像 minRecordsPerFile 这样的东西吗? 对不起,我没看到。也许您可以在那里创建一个问题并描述您的用例?

以上是关于你如何控制输出文件的大小?的主要内容,如果未能解决你的问题,请参考以下文章

tmpgenc dvd author能控制文件输出大小吗?

如何在 R 中控制光栅文件的大小

如何确定空格中的制表符字符大小? [关闭]

如何知道我的下一次调用printf()是否会超过屏幕大小?

怎样控制catalina.out文件的大小

你如何改变引导轮播控制链接的大小?