如何并行化 spark.read.parquet()?
Posted
技术标签:
【中文标题】如何并行化 spark.read.parquet()?【英文标题】:How to parallelize spark.read.parquet()? 【发布时间】:2020-02-11 20:09:52 【问题描述】:我的 Spark 作业读取了一个文件夹,其中包含由列 partition 分区的 parquet 数据:
val spark = SparkSession
.builder()
.appName("Prepare Id Mapping")
.getOrCreate()
import spark.implicits._
spark.read
.parquet(sourceDir)
.filter($"field" === "ss_id" and $"int_value".isNotNull)
.select($"int_value".as("ss_id"), $"partition".as("date"), $"ct_id")
.coalesce(1)
.write
.partitionBy("date")
.parquet(idMappingDir)
我注意到只创建了一个任务,所以它非常慢。源文件夹中有很多像partition=2019-01-07这样的子文件夹,每个子文件夹都包含很多扩展名为snappy.parquet的文件。我提交了作业 --num-executors 2 --executor-cores 4,RAM 不是问题。我尝试从 S3 和本地文件系统中读取。我尝试添加 .repartition(nPartitions),删除 .coalesce(1) 和 .partitionBy("date") 但还是一样。
您能否建议我如何让 Spark 并行读取这些 parquet 文件?
【问题讨论】:
没有什么奇怪的。如果您发现差异,请尝试阅读 sourceDir/2019-01-07 @Salim 这对我来说会有所帮助,因为我需要数据框中的分区列。 “合并架构”上的好发现。请更改您的问题以添加这种担忧,即每个分区可能具有不同的架构以帮助其他人找到答案。 【参考方案1】:好吧,我找到了正确的代码:
val spark = SparkSession
.builder()
.appName("Prepare Id Mapping")
.getOrCreate()
import spark.implicits._
spark.read
.option("mergeSchema", "true")
.parquet(sourceDir)
.filter($"field" === "ss_id" and $"int_value".isNotNull)
.select($"int_value".as("ss_id"), $"partition".as("date"), $"ct_id")
.write
.partitionBy("date")
.parquet(idMappingDir)
希望这会在将来节省一些时间。
【讨论】:
聚结取出来? @thebluephantom 只是取出合并并没有解决问题,但是加上 .option("mergeSchema", "true") 就可以了。 可能会强调这一点。干杯 我试图使这两行(带有合并和选项)以粗体显示,但它在代码 sn-p 中不起作用。以上是关于如何并行化 spark.read.parquet()?的主要内容,如果未能解决你的问题,请参考以下文章