Spark DataSet 过滤器性能

Posted

技术标签:

【中文标题】Spark DataSet 过滤器性能【英文标题】:Spark DataSet filter performance 【发布时间】:2016-12-20 19:36:44 【问题描述】:

我一直在尝试不同的方法来过滤类型化的数据集。事实证明,性能可能完全不同。

数据集是基于 1.6 GB 的数据行创建的,其中包含 33 列和 4226047 行。 DataSet 通过加载 csv 数据创建并映射到案例类。

val df = spark.read.csv(csvFile).as[FireIncident]

UnitId = 'B02' 上的过滤器应返回 47980 行。我测试了以下三种方法: 1) 使用类型化列(本地主机约 500 毫秒)

df.where($"UnitID" === "B02").count()

2)使用临时表和sql查询(~与选项1相同)

df.createOrReplaceTempView("FireIncidentsSF")
spark.sql("SELECT * FROM FireIncidentsSF WHERE UnitID='B02'").count()

3) 使用强类型类字段(14,987ms,即慢 30 倍)

df.filter(_.UnitID.orNull == "B02").count()

我用python API再次测试,同样的数据集,时间为17046ms,与scala API option 3的性能相当。

df.filter(df['UnitID'] == 'B02').count()

有人能解释一下 3) 和 python API 的执行方式与前两个选项的不同吗?

【问题讨论】:

【参考方案1】:

这是因为步骤 3 here。

在前两个中,spark 不需要反序列化整个 Java/Scala 对象 - 它只查看一列并继续前进。

在第三个中,由于您使用的是 lambda 函数,spark 无法判断您只需要一个字段,因此它会为每一行从内存中提取所有 33 个字段,以便您可以检查一个字段.

我不确定为什么第四个这么慢。似乎它的工作方式与第一个相同。

【讨论】:

非常有见地的答案。如果你在 Dataset<Row> 上写 java:datasetRdd.filter(r -> r.<String>getAs("event_type_id").equals("LOG")) 会发生什么? @DusanVasiljevic 相同,只要您使用的是 lambda。您可以保留类型,但您必须对其执行无类型操作以避免加载到内存中。【参考方案2】:

在运行 python 时,首先将代码加载到 JVM 上,进行解释,然后最终编译成字节码。使用 Scala API 时,Scala 原生在 JVM 上运行,因此您将整个加载 python 代码剪切到 JVM 部分。

【讨论】:

Python API 和具有强类型类字段的 Scala API 过滤器具有可比的性能结果。你知道为什么选项 3) 比 1) 或 2) 慢 30 倍吗?

以上是关于Spark DataSet 过滤器性能的主要内容,如果未能解决你的问题,请参考以下文章

如何对dataset做一下过滤操作

无分区列性能的 Spark 下推过滤器

大数据Spark DataFrame/DataSet常用操作

大数据Spark DataFrame/DataSet常用操作

在 Spark 中过滤 Big number 上的数据

Spark布隆过滤器(bloomFilter)