并行加载 S3 文件 Spark
Posted
技术标签:
【中文标题】并行加载 S3 文件 Spark【英文标题】:Load S3 files in parallel Spark 【发布时间】:2017-07-28 15:03:32 【问题描述】:我通过以下代码成功地将文件从 S3 加载到 Spark 中。它正在工作,但是我注意到一个文件和另一个文件之间存在延迟,并且它们是按顺序加载的。我想通过并行加载来改进这一点。
// Load files that were loaded into firehose on this day
var s3Files = spark.sqlContext.read.schema(schema).json("s3n://" + job.awsaccessKey + ":" + job.awssecretKey + "@" + job.bucketName + "/" + job.awss3RawFileExpression + "/" + year + "/" + monthCheck + "/" + dayCheck + "/*/").rdd
// Apply the schema to the RDD, here we will have duplicates
val usersDataFrame = spark.createDataFrame(s3Files, schema)
usersDataFrame.createOrReplaceTempView("results")
// Clean and use partition by the keys to eliminate duplicates and get latest record
var results = spark.sql(buildCleaningQuery(job, "results"))
results.createOrReplaceTempView("filteredResults")
val records = spark.sql("select count(*) from filteredResults")
我也尝试过通过 textFile() 方法加载,但是在将 RDD[String] 转换为 RDD[Row] 时遇到问题,因为之后我需要继续使用 Spark SQL。我以以下方式使用它;
var s3Files = sparkContext.textFile("s3n://" + job.awsaccessKey + ":" + job.awssecretKey + "@" + job.bucketName + "/" + job.awss3RawFileExpression + "/" + year + "/" + monthCheck + "/" + dayCheck + "/*/").toJavaRDD()
将 JSON 文件(每个大约 50MB 的多个文件)加载到 Spark 中的理想方式是什么?我想根据架构验证属性,以便以后能够触发 SQL 查询来清理数据。
【问题讨论】:
是否有必要将 s3Files 更改为 rdd?我相信如果您不将其更改为 rdd,它将并行提取文件内容。 最终结果将是一个数据框,然后在其上运行 Spark SQL 查询并保存到 redshift。如果不转换为 RDD,我将无法遵循该逻辑,除非我遗漏了什么。 var s3Files = spark.sqlContext.read.schema(schema).json(...).createOrReplaceTempView("results") 应该足够了。试试看它是否仍然按顺序读取内容 @wllmtrng 请将此评论作为答案,表明我不需要转换为 RDD 即可在 Spark SQL 中创建和替换视图。对于其他有同样问题的人来说会更清楚。我不知道您可以如此轻松地访问扩展方法。谢谢! 【参考方案1】:发生了什么是DataFrame被转换为RDD,然后再次转换为DataFrame,然后丢失了分区信息。
var s3Files = spark
.sqlContext
.read.schema(schema)
.json(...)
.createOrReplaceTempView("results")
应该足够了,并且分区信息应该仍然存在,允许同时加载json文件。
【讨论】:
以上是关于并行加载 S3 文件 Spark的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Spark 中并行创建 RDD / DataFrame?