为啥火花延迟加载比通配符或在数组中传递文件夹慢?
Posted
技术标签:
【中文标题】为啥火花延迟加载比通配符或在数组中传递文件夹慢?【英文标题】:Why spark lazy loading slower than wildcarding or passing folders in array?为什么火花延迟加载比通配符或在数组中传递文件夹慢? 【发布时间】:2019-08-13 12:54:57 【问题描述】:我有一个数据集,它被分区并作为一组 JSON 文件写入 s3。
分区结构是: uid - 通常
一个分区中有一个 JSON 文件。普通数据量~30GB
当我传递根文件夹时,因为它需要 ~3 分 30 秒 来打印文件架构。
val df = spark.read.json("s3://path/to/root/folder")
df.printSchema()
当我通过根文件夹但执行初始过滤时,它也需要 ~3 分 30 秒。
val df = spark.read.json("s3://path/to/root/folder")
.filter(col("uid") === 1 || col("uid") === 80004)
df.printSchema()
我通过通配所有分区将加载时间减少到 2 分 31 秒
val df = spark.read.json("s3://path/to/root/folder/uid=*/type=*/date=*/hour=*")
df.printSchema()
我尝试将所有需要的 uid 括起来,它起作用了。我将加载时间减少到 ~34 秒
val df = spark.read
.option("basePath", "s3://path/to/root/folder")
.json("s3://path/to/root/folder/uid=1,80004")
df.printSchema()
我也尝试在数组中添加所需的文件夹并得到相同的时间。
val df = spark.read
.option("basePath", "s3://path/to/root/folder/")
.json("s3://path/to/root/folder/uid=1/type=OW/date=2019-07-24/",
"s3://path/to/root/folder/uid=1/type=OW/date=2019-07-25/",
"s3://path/to/root/folder/
...
"s3://path/to/root/folder/uid=80004/type=OW/date=2019-08-13")
我认为延迟加载应该对我有所帮助,但事实并非如此。谁能给我解释一下为什么?
【问题讨论】:
【参考方案1】:我不确定为什么您使用通配符的时间越来越快.. 但是对于显式文件夹与完整数据的时间差异可能是由于 spark 阅读器需要对其中的所有数据进行完整扫描为了推断架构。当您调用 read 时,架构推断步骤会立即发生,因为 spark 需要知道架构才能继续并导致 Spark 在后台为您执行操作。
您可以尝试将 samplingRatio 选项从默认值 1.0 降低:
val df = spark.read.option("samplingRatio", "0.1").json("s3://...")
或者您可以尝试显式指定架构:
import org.apache.spark.sql.types._
val schema = StructType(
StructField("uid",IntegerType,true) ::
StructField("type",StringType,true) ::
... :: Nil
)
val df = spark.read.schema(schema).json("s3://.....")
仅加载数据的某些分区的谓词下推优化确实发生了,但在您对该加载的数据帧执行操作之前它不会发生。您可以通过运行解释来确认这一点,并在 PartitionFilters 部分查看如下所示的物理计划:
df.filter($"uid" === "1" && $"type" === "bob").explain(true)
== Parsed Logical Plan ==
'Filter (('uid = 1) && ('type = bob))
+- Relation[key#70,key2#71,uid#72,type#73] json
== Analyzed Logical Plan ==
key: string, key2: string, uid: int, type: string
Filter ((uid#72 = cast(1 as int)) && (type#73 = bob))
+- Relation[key#70,key2#71,uid#72,type#73] json
== Optimized Logical Plan ==
Filter (((isnotnull(uid#72) && isnotnull(type#73)) && (uid#72 = 1)) && (type#73 = bob))
+- Relation[key#70,key2#71,uid#72,type#73] json
== Physical Plan ==
*(1) FileScan json [key#70,key2#71,uid#72,type#73] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/.../test/***], PartitionCount: 1, PartitionFilters: [isnotnull(uid#72), isnotnull(type#73), (uid#72 = 1), (type#73 = bob)], PushedFilters: [], ReadSchema: struct<key:string,key2:string>
【讨论】:
感谢您的回答。减少samplingRatio
没有效果。显式指定架构将加载时间减少到 1 分 45 秒,这比通配符要好,但比括号要慢。
所有分区总共有多少个 json 文件? S3 列出数量多的文件可能会很慢,分区发现代码可能需要列出所有文件以找出存在哪些分区。
每个分区 1 个。 ~2000 个文件,单个文件约 0.5mb以上是关于为啥火花延迟加载比通配符或在数组中传递文件夹慢?的主要内容,如果未能解决你的问题,请参考以下文章
为啥人们说 require_once 比 require 慢? [复制]
为啥 Hadoop SequenceFile 写入比读取慢得多?