编写拼花文件时如何避免空文件?

Posted

技术标签:

【中文标题】编写拼花文件时如何避免空文件?【英文标题】:How to avoid empty files while writing parquet files? 【发布时间】:2017-09-26 21:25:10 【问题描述】:

我正在使用 Spark 结构化流从 Kafka 队列中读取数据。从 Kafka 阅读后,我在 dataframe 上应用 filter。我将此过滤后的数据框保存到镶木地板文件中。这会生成许多空的镶木地板文件。有什么办法可以停止写一个空文件?

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KafkaServer) \
    .option("subscribe", KafkaTopics) \
    .load()

Transaction_DF = df.selectExpr("CAST(value AS STRING)")

decompDF = Transaction_DF.select(zip_extract("value").alias("decompress"))
filterDF = decomDF.filter(.....) 

query = filterDF .writeStream \
    .option("path", outputpath) \
    .option("checkpointLocation", RawXMLCheckpoint) \
    .start()

【问题讨论】:

这与我面临的类似挑战然后我决定将数据写入 hbase 然后向下流使用 hbase。在您的情况下,您也可以将其写入一些 Nosql 数据库以避免许多小文件。 【参考方案1】:

有什么办法可以停止写空文件。

是的,但你宁愿这样做。

许多空 parquet 文件的原因是 Spark SQL(结构化流的底层基础设施)试图猜测要加载数据集的分区数量(每批包含来自 Kafka 的记录)并且这样做“很差”,即很多分区没有数据。

当你保存一个没有数据的分区时,你会得到一个空文件。

您可以使用repartitioncoalesce 运算符来设置适当的分区数并减少(甚至完全避免)空文件。见Dataset API。

您为什么这样做? repartitioncoalesce 可能会由于在分区(可能还有 Spark 集群中的节点)之间混洗数据的额外步骤而导致性能下降。这可能很昂贵且不值得这样做(因此我说你宁愿不这样做)。

然后您可能会问自己,如何知道正确的分区数?这是任何 Spark 项目中的一个非常好的问题。答案相当简单(如果您了解 Spark 进行处理的内容和方式,则显而易见):“了解您的数据”,这样您就可以计算出有多少是完全正确的。

【讨论】:

【参考方案2】:

我建议在 Dataframe 上使用 repartition(partitioningColumns)。数据集和之后的partitionBy(partitioningColumns)writeStream 操作以避免写入空文件。

原因: 如果你有很多数据,瓶颈通常是 Spark 的读取性能,如果你有很多小(甚至是空的)文件并且没有分区。因此,您绝对应该使用文件/目录分区(这与 RDD 分区不同)。 这在使用 AWS S3 时尤其成问题。 在读取时间戳/日期、消息类型/Kafka 主题等数据时,partitionColumns 应该适合您的常见查询...

另请参阅 http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter 上的 partitionBy 文档

按文件系统上的给定列对输出进行分区。如果指定,则输出布局在文件系统上,类似于 Hive 的分区方案。例如,当我们按年和月对数据集进行分区时,目录布局如下所示:

年=2016/月=01/,年=2016/月=02/

分区是最广泛使用的优化物理数据布局的技术之一。当查询对分区列有谓词时,它提供了一个粗粒度索引,用于跳过不必要的数据读取。为了使分区正常工作,每列中不同值的数量通常应少于数万。

这适用于所有基于 Spark 2.1.0 的基于文件的数据源(例如 Parquet、JSON)。

【讨论】:

在将结构化流保存到目录中时,每个流都作为新的部分文件附加到目录中。如果我在流上添加过滤条件并且如果流不满足则流变为空,这个空流被写入单独的文件【参考方案3】:

你可以试试 repartitionByRange(column)..

我在将数据帧写入 HDFS 时使用了它。它解决了我的空文件创建问题。

【讨论】:

【参考方案4】:

如果您使用的是yarn客户端模式,那么将执行器核心数设置为1即可解决问题。这意味着每个执行程序在任何时候都只会运行 1 个任务。

【讨论】:

以上是关于编写拼花文件时如何避免空文件?的主要内容,如果未能解决你的问题,请参考以下文章

如何在MapReduce作业中以拼花文件格式编写输出?

调试时文件上传返回空数组

从 Python 编写嵌套拼花格式

如何避免 XmlWriter 创建默认的“xmlns”空属性?

Node.js - 编写了空文件,但为啥呢?

如何有效地将大型数据框拆分为多个拼花文件?