如何从 S3 存储桶中仅读取最近 7 天的 csv 文件
Posted
技术标签:
【中文标题】如何从 S3 存储桶中仅读取最近 7 天的 csv 文件【英文标题】:How to read only latest 7 days csv files from S3 bucket 【发布时间】:2022-01-22 14:50:59 【问题描述】:我正在尝试弄清楚,我们如何使用 Spark Scala 从 s3 存储桶中的文件夹中仅读取最近 7 天的文件。
我们拥有的目录:
假设今天的日期(Date_1)我们有 2 个客户和 1-1 个 csv 文件
Source/Date_1/Client_1/sample_1.csv
Source/Date_1/Client_2/sample_1.csv
明天会生成一个新文件夹,我们会得到如下:
Source/Date_2/Client_1/sample_1.csv
Source/Date_2/Client_2/sample_1.csv
Source/Date_2/Client_3/sample_1.csv
Source/Date_2/Client_4/sample_1.csv
注意:我们希望在任何日期添加更新的客户数据。
同样在第 7 天我们可以有:
Source/Date_7/Client_1/sample_1.csv
Source/Date_7/Client_2/sample_1.csv
Source/Date_7/Client_3/sample_1.csv
Source/Date_7/Client_4/sample_1.csv
所以,现在如果我们得到第 8 天的数据,我们需要丢弃 Date_1 文件夹才能读取。
在使用 s3 存储桶中的 spark scala 读取 csv 文件时,我们如何做到这一点? 我正在尝试阅读整个 "source/*" 文件夹,以便我们不会错过任何时间/一天添加任何客户端。
【问题讨论】:
您能否分享一些详细信息,例如 (1) 您是否使用 AWS Glue? (2) 什么是日期格式? (3) 这个路径是 partitionBy 还是像 directory/subdirectoty 这样的普通路径? (4) 最终的csv文件名总是一样的? 1:不只是在 S3 存储桶中获取文件并从那里直接读取文件 2:日期格式-“20211221”(例如今天的日期,2021 年 12 月 21 日) 3:普通路径,如目录/子目录4:最终文件名应取决于附加日期的客户端名称,例如(Client_1_20211221,Client_2_20211221 为今天的日期,同样对于第二天的日期部分将更改 - Client_1_20211222,Client_2_20211222) 您是否在保存时对数据进行分区?我相信你是根据我看到的文件夹结构来做的……你能确认一下吗? 不,我正在尝试在覆盖模式下相同的数据,对我的要求是根据最多 7 天的旧数据计算并保存在表中。所以,没有分区。 【参考方案1】:有多种方法可以做到这一点。下面提到了其中一种方法: 您可以从路径中提取日期,过滤器基于 7 天。
下面是pyspark的sn-p代码,同样可以用Scala在Spark中实现。
>>> from datetime import datetime, timedelta
>>> from pyspark.sql.functions import *
#Calculate date 7 days before date
>>> lastDate = datetime.now() + timedelta(days=-7)
>>> lastDate = int(lastDate.strftime('%Y%m%d'))
# Source Path
>>> srcPath = "s3://<bucket-name>/.../Source/"
>>> df1 = spark.read.option("header", "true").csv(srcPath + "*/*").withColumn("Date", split(regexp_replace(input_file_name(), srcPath, ""),"/")[0].cast("long"))
>>> df2 = df1.filter(col("Date") >= lit(lastDate))
在您的最终实现中可能会发生一些变化,例如如果路径结构不同,索引值 [0] 可能会有所不同,最后一个条件 >= 可以是 > 根据要求。
【讨论】:
以上是关于如何从 S3 存储桶中仅读取最近 7 天的 csv 文件的主要内容,如果未能解决你的问题,请参考以下文章
如何将 s3 数据从一个 EMR 集群读取到另一个 EMR 集群?
Pyspark 从 S3 存储桶读取 csv 文件:AnalysisException:路径不存在
当我们从 s3 中的 csv 文件读取数据并在 aws athena 中创建表时如何跳过标题。