如何从 S3 存储桶中仅读取最近 7 天的 csv 文件

Posted

技术标签:

【中文标题】如何从 S3 存储桶中仅读取最近 7 天的 csv 文件【英文标题】:How to read only latest 7 days csv files from S3 bucket 【发布时间】:2022-01-22 14:50:59 【问题描述】:

我正在尝试弄清楚,我们如何使用 Spark Scala 从 s3 存储桶中的文件夹中仅读取最近 7 天的文件。

我们拥有的目录:

假设今天的日期(Date_1)我们有 2 个客户和 1-1 个 csv 文件

Source/Date_1/Client_1/sample_1.csv
Source/Date_1/Client_2/sample_1.csv

明天会生成一个新文件夹,我们会得到如下:

Source/Date_2/Client_1/sample_1.csv
Source/Date_2/Client_2/sample_1.csv
Source/Date_2/Client_3/sample_1.csv
Source/Date_2/Client_4/sample_1.csv

注意:我们希望在任何日期添加更新的客户数据。

同样在第 7 天我们可以有:

Source/Date_7/Client_1/sample_1.csv
Source/Date_7/Client_2/sample_1.csv
Source/Date_7/Client_3/sample_1.csv
Source/Date_7/Client_4/sample_1.csv

所以,现在如果我们得到第 8 天的数据,我们需要丢弃 Date_1 文件夹才能读取。

在使用 s3 存储桶中的 spark scala 读取 csv 文件时,我们如何做到这一点? 我正在尝试阅读整个 "source/*" 文件夹,以便我们不会错过任何时间/一天添加任何客户端。

【问题讨论】:

您能否分享一些详细信息,例如 (1) 您是否使用 AWS Glue? (2) 什么是日期格式? (3) 这个路径是 partitionBy 还是像 directory/subdirectoty 这样的普通路径? (4) 最终的csv文件名总是一样的? 1:不只是在 S3 存储桶中获取文件并从那里直接读取文件 2:日期格式-“20211221”(例如今天的日期,2021 年 12 月 21 日) 3:普通路径,如目录/子目录4:最终文件名应取决于附加日期的客户端名称,例如(Client_1_20211221,Client_2_20211221 为今天的日期,同样对于第二天的日期部分将更改 - Client_1_20211222,Client_2_20211222) 您是否在保存时对数据进行分区?我相信你是根据我看到的文件夹结构来做的……你能确认一下吗? 不,我正在尝试在覆盖模式下相同的数据,对我的要求是根据最多 7 天的旧数据计算并保存在表中。所以,没有分区。 【参考方案1】:

有多种方法可以做到这一点。下面提到了其中一种方法: 您可以从路径中提取日期,过滤器基于 7 天。

下面是pyspark的sn-p代码,同样可以用Scala在Spark中实现。

>>> from datetime import datetime, timedelta
>>> from pyspark.sql.functions import *
 
#Calculate date 7 days before date
>>> lastDate = datetime.now() + timedelta(days=-7)
>>> lastDate = int(lastDate.strftime('%Y%m%d'))

# Source Path
>>> srcPath = "s3://<bucket-name>/.../Source/"
>>> df1 = spark.read.option("header", "true").csv(srcPath + "*/*").withColumn("Date", split(regexp_replace(input_file_name(), srcPath, ""),"/")[0].cast("long"))
>>> df2 = df1.filter(col("Date") >= lit(lastDate))

在您的最终实现中可能会发生一些变化,例如如果路径结构不同,索引值 [0] 可能会有所不同,最后一个条件 >= 可以是 > 根据要求。

【讨论】:

以上是关于如何从 S3 存储桶中仅读取最近 7 天的 csv 文件的主要内容,如果未能解决你的问题,请参考以下文章

从 S3 存储桶中读取大量 CSV 文件

如何将 s3 数据从一个 EMR 集群读取到另一个 EMR 集群?

Pyspark 从 S3 存储桶读取 csv 文件:AnalysisException:路径不存在

当我们从 s3 中的 csv 文件读取数据并在 aws athena 中创建表时如何跳过标题。

使用 lambda 中的 pandas 从 s3 读取 excel 文件并转换为 csv

使用 aws.s3 包从 AWS S3 一次读取多个 CSV 文件对象