从 S3 存储桶中读取大量 CSV 文件
Posted
技术标签:
【中文标题】从 S3 存储桶中读取大量 CSV 文件【英文标题】:Read large number of CSV files from S3 bucket 【发布时间】:2020-09-08 23:16:42 【问题描述】:我想从 S3 存储桶中读取大量 csv 文件。 CSV 文件位于不同的分区中。我正在使用 Boto3 列出所有到 csv 的路径。然后使用 for 循环在列表上迭代,将 csv 文件读入 spark 数据帧。我需要一种更好的优化方法来从 S3 路径读取大量文件,因为循环是一种线性方法,需要大量时间才能完成。 列出所有对象:
self.all_objects = [file_path['Key'] for resp_content in self.s3.get_paginator("list_objects_v2").paginate(Bucket='bucketName') for file_path in resp_content['Contents']]
循环读取每个 CSV 文件:
csv_df = self.spark.read.format("csv").option("header", "true").load(s3_path)
我还想将所有数据框合并在一起以创建一个镶木地板文件。
提前感谢,如果有人对此有好的解决方案,请提出建议。
【问题讨论】:
【参考方案1】:读取包含多个文件夹的 csv 文件(Spark + Scala):
import org.apache.spark.sql.types.IntegerType, StringType, StructField, StructType
val sourcesFolders = List("/home/mykolavasyliv/tmp/source1/", "/home/mykolavasyliv/tmp/source2/", "/home/mykolavasyliv/tmp/source3/")
// :~/tmp$ tree
// .
// ├── source1
// │ └── person-data-1.csv
// ├── source2
// │ └── person-data-2.csv
// └── source3
// └── person-data-3.csv
// person-data-1.csv:
// source-1-1,Mykola ,23,100
// source-1-2,Jon,34,76
// source-1-3,Marry,25,123
// person-data-2.csv
// source-2-1,Mykola ,23,100
// source-2-2,Jon,34,76
// source-2-3,Marry,25,123
// person-data-3.csv
// source-3-1,Mykola ,23,100
// source-3-2,Jon,34,76
// source-3-3,Marry,25,123
// Read csv files not use schema
val sourceDF = spark.read.csv(sourcesFolders:_*)
sourceDF.show(false)
// +----------+-------+---+---+
// |_c0 |_c1 |_c2|_c3|
// +----------+-------+---+---+
// |source-1-1|Mykola |23 |100|
// |source-1-2|Jon |34 |76 |
// |source-1-3|Marry |25 |123|
// |source-2-1|Mykola |23 |100|
// |source-2-2|Jon |34 |76 |
// |source-2-3|Marry |25 |123|
// |source-3-1|Mykola |23 |100|
// |source-3-2|Jon |34 |76 |
// |source-3-3|Marry |25 |123|
// +----------+-------+---+---+
// Read csv files use schema
val schema = StructType(
List(
StructField("id", StringType, true),
StructField("name", StringType, true),
StructField("age", IntegerType, true),
StructField("NotKnow", IntegerType, true)
)
)
val source2DF = spark.read.schema(schema).csv(sourcesFolders:_*)
source2DF.show(false)
// +----------+-------+---+-------+
// |id |name |age|NotKnow|
// +----------+-------+---+-------+
// |source-1-1|Mykola |23 |100 |
// |source-1-2|Jon |34 |76 |
// |source-1-3|Marry |25 |123 |
// |source-2-1|Mykola |23 |100 |
// |source-2-2|Jon |34 |76 |
// |source-2-3|Marry |25 |123 |
// |source-3-1|Mykola |23 |100 |
// |source-3-2|Jon |34 |76 |
// |source-3-3|Marry |25 |123 |
// +----------+-------+---+-------+
【讨论】:
虽然只有代码的答案可能会回答这个问题,但您可以通过为您的代码提供上下文、此代码工作的原因以及一些文档参考以供进一步阅读,从而显着提高您的答案质量. @PranavHosangadi 对不起。谢谢你。固定。以上是关于从 S3 存储桶中读取大量 CSV 文件的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark 从 S3 存储桶读取 csv 文件:AnalysisException:路径不存在
如何将 s3 数据从一个 EMR 集群读取到另一个 EMR 集群?
使用 lambda 中的 pandas 从 s3 读取 excel 文件并转换为 csv