附加到 parquet 文件,由具有重叠时间戳的数据分区

Posted

技术标签:

【中文标题】附加到 parquet 文件,由具有重叠时间戳的数据分区【英文标题】:Appending to parquet files, partitioned by data that have overlapping timestamps 【发布时间】:2021-04-19 13:45:04 【问题描述】:

我有包含时间戳列的数据框。我将其转换为日期,按日期进行分区,并每天将其附加到不断增长的镶木地板文件中。

如果我附加一个时间戳从 2021-04-19 01:00:012021-04-19 13:00:00 的数据集,它会将其写入分区 DATE=2021-04-19 中的镶木地板。

如果在当天晚些时候我将另一个时间戳从2021-04-19 15:00:00 附加到2021-04-19 20:00:00 的数据集,它会覆盖之前从凌晨 1 点到下午 1 点有数据的分区吗?还是它实际上会附加到它上面?

我使用语法:

df.write.mode('append').partitionBy("DATE").parquet("s3://path")

【问题讨论】:

【参考方案1】:

来自Spark Documentation中的保存模式:

追加:将 DataFrame 保存到数据源时,如果数据/表已经 存在,DataFrame 的内容应附加到 现有数据。

因此,它符合您的预期。这是一个检查行为的玩具示例:

data_batch_1 = [("2021-04-19", "2021-04-19 01:00:01", 1.1), 
                ("2021-04-19", "2021-04-19 13:00:00", 1.2)]

data_batch_2 = [("2021-04-19", "2021-04-19 15:00:00", 2.1), 
                ("2021-04-19", "2021-04-19 20:00:00", 2.2)]

col_names = ["DATE", "ts", "sensor1"]

df_batch_1 = spark.createDataFrame(data_batch_1, col_names)
df_batch_2 = spark.createDataFrame(data_batch_2, col_names)

s3_path = "/tmp/67163237/"

保存批次 1

df_batch_1.write.mode("append").partitionBy("DATE").parquet(s3_path)
spark.read.parquet(s3_path).show()
+-------------------+-------+----------+
|                 ts|sensor1|      DATE|
+-------------------+-------+----------+
|2021-04-19 01:00:01|    1.1|2021-04-19|
|2021-04-19 13:00:00|    1.2|2021-04-19|
+-------------------+-------+----------+

保存批次 2

df_batch_2.write.mode("append").partitionBy("DATE").parquet(s3_path)
spark.read.parquet(s3_path).show()
+-------------------+-------+----------+
|                 ts|sensor1|      DATE|
+-------------------+-------+----------+
|2021-04-19 15:00:00|    2.1|2021-04-19|
|2021-04-19 01:00:01|    1.1|2021-04-19|
|2021-04-19 20:00:00|    2.2|2021-04-19|
|2021-04-19 13:00:00|    1.2|2021-04-19|
+-------------------+-------+----------+

【讨论】:

对@thentangler 的建议是尽可能使用 Delta 格式,特别是在您使用数据流时。 docs.databricks.com/delta/delta-streaming.html【参考方案2】:

按照 mck 的出色建议(您在尝试之前不会知道),我做到了,而且正如担心的那样,它基本上用新数据覆盖了整个分区。

我想了想,我决定总是重新流式传输前一天并覆盖分区。在我的情况下它对我有用,因为我确实可以访问 5 天的缓冲区数据,我可以重新提取这些数据。但这种解决方案不适用于那些拥有仅保留几个小时左右的临时数据的人。

【讨论】:

以上是关于附加到 parquet 文件,由具有重叠时间戳的数据分区的主要内容,如果未能解决你的问题,请参考以下文章

如何附加到镶木地板文件以及它如何影响分区?

使用 parquet 格式附加 Apache Spark 中列的描述

从具有时间戳的镶木地板蜂巢表中读取火花

附加具有多列索引和重叠列名的 DataFrame

Apache FilesMatch:如何仅匹配附加时间戳的资产

从 parquet 文件将具有默认值的数据加载到 Redshift