Spark s3 csv文件读取顺序

Posted

技术标签:

【中文标题】Spark s3 csv文件读取顺序【英文标题】:Spark s3 csv files read order 【发布时间】:2022-01-13 23:40:30 【问题描述】:

假设 s3 文件夹中的三个文件,通过 spark.read.csv(s3:bucketname/folder1/*.csv) 读取是否按顺序读取文件? 如果没有,有没有办法在读取整个文件夹时对文件进行排序,同时在内部不同时间收到多个文件。

File name s3 file uploaded/Last modified time
s3:bucketname/folder1/file1.csv 01:00:00
s3:bucketname/folder1/file2.csv 01:10:00
s3:bucketname/folder1/file3.csv 01:20:00

【问题讨论】:

接收时间如何指定?它是您数据中的一列,还是类似于文件中的时间戳 这是s3文件上传时间/最后修改时间。 【参考方案1】:

您可以使用以下方法实现此目的

    遍历存储桶中的所有文件并通过添加新列 last_modified 加载该 csv。保留将在dfs_list 中加载的所有 dfs 的列表。由于 pyspark 进行惰性评估,它不会立即加载数据。
import boto3

s3 = boto3.resource('s3')
my_bucket = s3.Bucket('bucketname')

dfs_list = []

for file_object in my_bucket.objects.filter(Prefix="folder1/"):
    df = spark.read.parquet('s3a://' + file_object.name).withColumn("modified_date", file_object.last_modified)
    dfs_list.append(df)
    现在使用 pyspark unionAll 函数对所有 dfs 进行联合,然后根据 modified_date 对数据进行排序。
from functools import reduce
from pyspark.sql import DataFrame

df_combined = reduce(DataFrame.unionAll, dfs_list)

df_combined = df_combined.orderBy('modified_date')

【讨论】:

以上是关于Spark s3 csv文件读取顺序的主要内容,如果未能解决你的问题,请参考以下文章

从 S3 存储桶中读取大量 CSV 文件

从 s3 读取文件时 Spark 应用程序停止

将读取文件的架构存储到 spark scala 中的 csv 文件中

使用 pyspark 从 AWS s3 Bucket 读取 csv 时出错

是否可以以相同或不同的顺序将具有相同标题或标题子集的多个 csv 文件读取到 spark 数据帧中?

无法从 S3 读取 csv 到 AWS 上 EC2 实例上的 pyspark 数据帧