Spark-SQL 数据帧外部数据源效率低

Posted

技术标签:

【中文标题】Spark-SQL 数据帧外部数据源效率低【英文标题】:Spark-SQL dataframe External Datasource low efficiency 【发布时间】:2016-01-09 02:47:53 【问题描述】:

当我尝试对 Spark-SQL 外部数据源进行一些测试时会发生此问题。

我以两种方式构建数据框,并比较收集动作的速度。而且我发现如果列数太大,从外部数据源构建的数据框会滞后。我想知道这是否是 Spark-SQL 的外部数据源的限制。 :-)

为了更清楚地提出问题,我写了一段代码:

https://github.com/sunheehnus/spark-sql-test/

在我的 External Datasource API 基准代码中,它实现了一个假的外部数据源(实际上是一个 RDD[String, Array[Int]] ),并通过

val cmpdf = sqlContext.load("com.redislabs.test.dataframeRP", Map[String, String]())

然后我构建相同的RDD并通过

val rdd = sqlContext.sparkContext.parallelize(1 to 2048, 3)
val mappedrdd = rdd.map(x =>(x.toString, (x to x + colnum).toSeq.toArray))
val df = mappedrdd.toDF()
val dataColExpr = (0 to colnum).map(_.toString).zipWithIndex.map  case (key, i) => s"_2[$i] AS `$key`" 
val allColsExpr = "_1 AS instant" +: dataColExpr
val df1 = df.selectExpr(allColsExpr: _*)

当我运行测试代码时,我可以看到结果(在我的笔记本电脑上):

9905
21427

但是当我把column less(512),我可以看到结果:

4323
2221

看起来问题是如果Schema中的列数很少,External Datasource API会受益,但是随着Schema中列数的增长,External Datasource API最终会落后......我想知道如果这是 Spark-SQL 对外部数据源 API 的限制,还是我以错误的方式使用 API? 非常感谢。 :-)

【问题讨论】:

【参考方案1】:

您没有在此处进行基准测试。这个基准简单地得出结论,代码的更昂贵版本(即您编写的代码)比内置代码更昂贵。

你的结果有两个原因:

    当您声明一个字段(常量或普通列)时,Spark SQL 实际上会生成相当优化的代码来展开循环。当您自己实现数据源时,您没有展开循环。事实上,您不仅没有展开循环,而且还对每一行进行了一些昂贵的操作,例如“(x to x + colnum).toSeq.toArray”。所有这些都在 Spark SQL 生成的代码中被消除了。

    如果“needConversion”为 false,Spark SQL 在从数据源获取数据时会引入入站转换。此入站转换将外部行格式转换为内部行格式(例如,字符串不再是 Java 字符串,而是 UTF8 编码的字符串)。

【讨论】:

你好rxin,非常感谢你的帮助。将override val needConversion: Boolean = false 添加到case class SCAN,使用外部数据源API 的结果更加高效。 :-) 你好rxin,切换到Spark-1.6,如果添加override val needConversion: Boolean = false,会遇到异常Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to org.apache.spark.sql.catalyst.InternalRow。将needConversion添加到Spark-1.4效果很好,使用外部数据源的效果更好,但是切换到Spark-1.6后就不能再使用了……是needConversion相关的功能导致延迟的原因吗? col数很大吗?有什么好的方法可以避免这种情况吗?非常感谢

以上是关于Spark-SQL 数据帧外部数据源效率低的主要内容,如果未能解决你的问题,请参考以下文章

控制 spark-sql 和数据帧中的字段可空性

Spark-sql读取hive分区表限制分区过滤条件及限制分区数量

spark-sql的概述以及编程模型的介绍

Spark-SQL:如何将 TSV 或 CSV 文件读入数据框并应用自定义模式?

如何在火花流中刷新加载的数据帧内容?

如何将列添加到依赖于数据帧或外部数据帧中组的平均值的数据帧?