使用 spark 有效过滤有序文件

Posted

技术标签:

【中文标题】使用 spark 有效过滤有序文件【英文标题】:Efficiently filter ordered file with spark 【发布时间】:2016-11-30 14:56:24 【问题描述】:

想象一下,我有一个格式如下的大日志文件:

Timestamp, text
Timestamp, text
Timestamp, text
Timestamp, text
Timestamp, text
Timestamp, text

日志文件已按时间戳排序。 如果我用 spark 作为数据帧读取文件,然后过滤 t1 和 t2 之间的时间戳, 然后他会检查每条记录的时间戳是否介于 t1 和 t2 之间,这需要很长时间。

但是,有没有一种方法可以告诉 spark df 已经订购,然后它会知道它只需要查找第一个和最后一个时间戳,并返回其间的所有行,这将是快很多?

【问题讨论】:

理想的解决方案是通过Timestamp 对输入文件进行分区,然后只读取您真正想要的文件。 Spark 支持谓词下推,因此如果您使用spark.read.parquet(myParquet.parq).filter(...),它只会为您读取相关文件。 【参考方案1】:

不,spark 中没有这样的选项。但是有不同的解决方案,广泛用于存储事件或日志的系统中,称为分区。如果您有很多天的记录,请添加仅包含日期的新列:

df.withColumn("day", df.timestamp.cast("date"))

然后使用 partitionedBy 保存此文件:

df_with_day.write.partitionBy("day").csv("partitioned")

这将为每一天创建目录(并且列 day 不会保存在重写的文件中),因此使用适当的where 过滤的下一个查询将忽略不在范围内的目录中的文件:

new_df = spark.read.csv("partitioned")
new_df.where(new.day.between("2016-11-30", "2016-12-10")).show()

【讨论】:

以上是关于使用 spark 有效过滤有序文件的主要内容,如果未能解决你的问题,请参考以下文章

在 Spark 中过滤有效和无效记录

过滤计数等于输入文件 rdd Spark 的列

从文件列表而不是 Spark 中的 PATH 读取是不是有效?

我在 s3 中有 .dat 文件。我需要通过 spark 读取文件并做一些过滤器并再次加载到 S3

在 Spark 中对巨大数据帧进行高效过滤

使用 pyspark 在循环中附加 Spark DataFrames 的有效方法