并行加载 S3 文件 Spark

Posted

技术标签:

【中文标题】并行加载 S3 文件 Spark【英文标题】:Load S3 files in parallel Spark 【发布时间】:2017-07-28 15:03:32 【问题描述】:

我通过以下代码成功地将文件从 S3 加载到 Spark 中。它正在工作,但是我注意到一个文件和另一个文件之间存在延迟,并且它们是按顺序加载的。我想通过并行加载来改进这一点。

        // Load files that were loaded into firehose on this day
    var s3Files = spark.sqlContext.read.schema(schema).json("s3n://" + job.awsaccessKey + ":" + job.awssecretKey + "@" + job.bucketName + "/" + job.awss3RawFileExpression + "/" + year + "/" + monthCheck + "/" + dayCheck + "/*/").rdd

    // Apply the schema to the RDD, here we will have duplicates
    val usersDataFrame = spark.createDataFrame(s3Files, schema)

    usersDataFrame.createOrReplaceTempView("results")

    // Clean and use partition by the keys to eliminate duplicates and get latest record
    var results = spark.sql(buildCleaningQuery(job, "results"))
    results.createOrReplaceTempView("filteredResults")
    val records = spark.sql("select count(*) from filteredResults")

我也尝试过通过 textFile() 方法加载,但是在将 RDD[String] 转换为 RDD[Row] 时遇到问题,因为之后我需要继续使用 Spark SQL。我以以下方式使用它;

        var s3Files = sparkContext.textFile("s3n://" + job.awsaccessKey + ":" + job.awssecretKey + "@" + job.bucketName + "/" + job.awss3RawFileExpression + "/" + year + "/" + monthCheck + "/" + dayCheck + "/*/").toJavaRDD()

将 JSON 文件(每个大约 50MB 的多个文件)加载到 Spark 中的理想方式是什么?我想根据架构验证属性,以便以后能够触发 SQL 查询来清理数据。

【问题讨论】:

是否有必要将 s3Files 更改为 rdd?我相信如果您不将其更改为 rdd,它将并行提取文件内容。 最终结果将是一个数据框,然后在其上运行 Spark SQL 查询并保存到 redshift。如果不转换为 RDD,我将无法遵循该逻辑,除非我遗漏了什么。 var s3Files = spark.sqlContext.read.schema(schema).json(...).createOrReplaceTempView("results") 应该足够了。试试看它是否仍然按顺序读取内容 @wllmtrng 请将此评论作为答案,表明我不需要转换为 RDD 即可在 Spark SQL 中创建和替换视图。对于其他有同样问题的人来说会更清楚。我不知道您可以如此轻松地访问扩展方法。谢谢! 【参考方案1】:

发生了什么是DataFrame被转换为RDD,然后再次转换为DataFrame,然后丢失了分区信息。

var s3Files = spark
  .sqlContext
  .read.schema(schema)
  .json(...)
  .createOrRepla‌​ceTempView("results"‌​)

应该足够了,并且分区信息应该仍然存在,允许同时加载json文件。

【讨论】:

以上是关于并行加载 S3 文件 Spark的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spark 中并行创建 RDD / DataFrame?

使用 spark 下载、处理、上传大量 s3 文件

Spark---并行度和分区

并行使用 scala Spark 重命名 HDFS 文件时的序列化问题

如何并行处理数据但将结果写入 Spark 中的单个文件

Spark[四]——Spark并行度