从 S3 存储桶中读取大量 CSV 文件

Posted

技术标签:

【中文标题】从 S3 存储桶中读取大量 CSV 文件【英文标题】:Read large number of CSV files from S3 bucket 【发布时间】:2020-09-08 23:16:42 【问题描述】:

我想从 S3 存储桶中读取大量 csv 文件。 CSV 文件位于不同的分区中。我正在使用 Boto3 列出所有到 csv 的路径。然后使用 for 循环在列表上迭代,将 csv 文件读入 spark 数据帧。我需要一种更好的优化方法来从 S3 路径读取大量文件,因为循环是一种线性方法,需要大量时间才能完成。 列出所有对象:

self.all_objects = [file_path['Key'] for resp_content in self.s3.get_paginator("list_objects_v2").paginate(Bucket='bucketName') for file_path in resp_content['Contents']]

循环读取每个 CSV 文件:

csv_df = self.spark.read.format("csv").option("header", "true").load(s3_path)

我还想将所有数据框合并在一起以创建一个镶木地板文件。

提前感谢,如果有人对此有好的解决方案,请提出建议。

【问题讨论】:

【参考方案1】:

读取包含多个文件夹的 csv 文件(Spark + Scala):

import org.apache.spark.sql.types.IntegerType, StringType, StructField, StructType

val sourcesFolders = List("/home/mykolavasyliv/tmp/source1/", "/home/mykolavasyliv/tmp/source2/", "/home/mykolavasyliv/tmp/source3/")

//  :~/tmp$ tree
//    .
//  ├── source1
//  │   └── person-data-1.csv
//  ├── source2
//  │   └── person-data-2.csv
//  └── source3
//      └── person-data-3.csv

//  person-data-1.csv:
//  source-1-1,Mykola ,23,100
//  source-1-2,Jon,34,76
//  source-1-3,Marry,25,123

//  person-data-2.csv
//  source-2-1,Mykola ,23,100
//  source-2-2,Jon,34,76
//  source-2-3,Marry,25,123

//  person-data-3.csv
//  source-3-1,Mykola ,23,100
//  source-3-2,Jon,34,76
//  source-3-3,Marry,25,123




// Read csv files not use schema

val sourceDF = spark.read.csv(sourcesFolders:_*)

sourceDF.show(false)
//  +----------+-------+---+---+
//  |_c0       |_c1    |_c2|_c3|
//  +----------+-------+---+---+
//  |source-1-1|Mykola |23 |100|
//  |source-1-2|Jon    |34 |76 |
//  |source-1-3|Marry  |25 |123|
//  |source-2-1|Mykola |23 |100|
//  |source-2-2|Jon    |34 |76 |
//  |source-2-3|Marry  |25 |123|
//  |source-3-1|Mykola |23 |100|
//  |source-3-2|Jon    |34 |76 |
//  |source-3-3|Marry  |25 |123|
//  +----------+-------+---+---+



// Read csv files use schema

val schema = StructType(
  List(
    StructField("id", StringType, true),
    StructField("name", StringType, true),
    StructField("age", IntegerType, true),
    StructField("NotKnow", IntegerType, true)
  )
)

val source2DF = spark.read.schema(schema).csv(sourcesFolders:_*)

source2DF.show(false)
//  +----------+-------+---+-------+
//  |id        |name   |age|NotKnow|
//  +----------+-------+---+-------+
//  |source-1-1|Mykola |23 |100    |
//  |source-1-2|Jon    |34 |76     |
//  |source-1-3|Marry  |25 |123    |
//  |source-2-1|Mykola |23 |100    |
//  |source-2-2|Jon    |34 |76     |
//  |source-2-3|Marry  |25 |123    |
//  |source-3-1|Mykola |23 |100    |
//  |source-3-2|Jon    |34 |76     |
//  |source-3-3|Marry  |25 |123    |
//  +----------+-------+---+-------+

【讨论】:

虽然只有代码的答案可能会回答这个问题,但您可以通过为您的代码提供上下文、此代码工作的原因以及一些文档参考以供进一步阅读,从而显着提高您的答案质量. @PranavHosangadi 对不起。谢谢你。固定。

以上是关于从 S3 存储桶中读取大量 CSV 文件的主要内容,如果未能解决你的问题,请参考以下文章

如何从 S3 存储桶中仅读取最近 7 天的 csv 文件

Pyspark 从 S3 存储桶读取 csv 文件:AnalysisException:路径不存在

如何将 s3 数据从一个 EMR 集群读取到另一个 EMR 集群?

使用 lambda 中的 pandas 从 s3 读取 excel 文件并转换为 csv

使用 aws.s3 包从 AWS S3 一次读取多个 CSV 文件对象

使用 Node 从 s3 存储桶中读取 txt 文件的内容