Spark-SQL CLI:未调用 SupportsPushDownFilters.pushFilters
Posted
技术标签:
【中文标题】Spark-SQL CLI:未调用 SupportsPushDownFilters.pushFilters【英文标题】:Spark-SQL CLI: SupportsPushDownFilters.pushFilters not called 【发布时间】:2018-08-15 23:24:58 【问题描述】:尝试使用 SupportsPushDownFilters 实现 DataSourceV2。在 spark 2.3.1、spark-sql CLI 和 spark-shell 中对其进行测试。
问题:从 spark-sql 运行查询时未调用 SupportsPushDownFilters.pushFilters(我的断点未命中),但直接使用 DataFrame 时调用。
我的代码:
class DefaultSource extends ReadSupport
with DataSourceRegister
with RelationProvider
def createReader(options: DataSourceOptions) =
val path = options.get("path").get
val sc = SparkSession.builder.getOrCreate().sparkContext
val conf = sc.hadoopConfiguration
new MyDataSourceReader(path, conf)
class MyDataSourceReader(path: String, conf:Configuration)
extends DataSourceReader
with SupportsPushDownFilters
override def pushFilters(filters: Array[Filter]): Array[Filter] =
println(filters.toList)
filters
直接使用 DataFrames 或 spark.sql API 时推送的过滤器(注意控制台输出打印了过滤器):
scala> val df=spark.read.format("com.my.spark.datasource.csv2").load("test.csv2")
scala> df.filter("age>24").show
List(IsNotNull(age), GreaterThan(age,24))
+----+---+----------+
|name|age| addr|
+----+---+----------+
| Ann| 25|one st. 12|
|Mary| 27|one st. 14|
+----+---+----------+
scala> df.createOrReplaceTempView("v1")
scala> spark.sql("select * from v1 where age>24").show
List(IsNotNull(age), GreaterThan(age,24))
+----+---+----------+
|name|age| addr|
+----+---+----------+
| Ann| 25|one st. 12|
|Mary| 27|one st. 14|
+----+---+----------+
当从 SQL CLI 运行相同的查询时,过滤器不会被按下,(在 CLI 输出中没有什么可以确认的,只是显示了查询的执行方式。调试我的数据源时我的断点没有命中):
E:\git\spark-2.3.0>bin\spark-sql
Listening for transport dt_socket at address: 5005
spark-sql> CREATE TEMPORARY VIEW v1 USING com.my.spark.datasource.csv2 OPTIONS
(path "test.csv2");
Time taken: 2.188 seconds
18/08/16 09:46:52 INFO SparkSQLCLIDriver: Time taken: 2.188 seconds
spark-sql> select * from v1 where age>24;
18/08/16 09:47:22 INFO DAGScheduler: Job 0 finished: processCmd at
CliDriver.java:376, took 12.326064 s
Ann 25 one st. 12
Mary 27 one st. 14
Time taken: 13.862 seconds, Fetched 2 row(s)
18/08/16 09:47:22 INFO SparkSQLCLIDriver: Time taken: 13.862 seconds, Fetched 2 row(s)
spark-sql> select * from (select * from v1 where age>24) t1;
Ann 25 one st. 12
Mary 27 one st. 14
Time taken: 0.146 seconds, Fetched 2 row(s)
18/08/16 09:47:37 INFO SparkSQLCLIDriver: Time taken: 0.146 seconds, Fetched 2 row(s)
调试 spark 引擎,据我所知,问题是“从 spark-sql CLI 调用”路径不会在计划中生成 DataSourceV2Relation,而它是在 DataFrameReader.load 中创建的。
我是否遗漏了什么并且需要做任何额外的事情来让过滤器在 spark-sql 上下推?还是已知问题?
【问题讨论】:
我不确定我是否完全理解。我也在阅读这方面的内容,但我很确定我了解到,如果我向 mysql JDBC 调用添加 where 子句,则下推到 mySQL 将在服务器端自动发生,提供类似这样的内容已完成:sql = "(select * from mytable where day = 2016-11-25 and hour = 10) t1"。注意 t1。我注意到您在这里没有执行此类查询,但我在其他地方读到这也适用于 JDBC 源。如果你能澄清一下。 更新的问题(包装到子查询也无济于事)。我是新来的火花,所以可能是错的。 AFAIK SQL CLI(我运行它的方式)不使用 JDBC(与通过直线运行相反)并且直接针对 spark SQL 驱动程序运行类似 SQL 的文本查询,它应该以与 SCALA API 相同的方式解释它。至少它没有过滤器,过滤器甚至没有到达我的自定义数据源。我不确定 MySql DS 是如何实现的,如果它确实使用 SQL CLI 下推过滤器,那么我在这里做错了,但无法弄清楚它是什么。 我现在也很困惑。周末看看 db-blog.web.cern.ch/blog/luca-canali/… 这是一篇有趣的文章 【参考方案1】:浏览使用 Filter 的 spark 代码发现了这个 PrunedFilteredScan。事实证明,在执行 SQL CLI 查询时需要实现接收过滤器(buildScan):
class DefaultSource extends ReadSupport
with DataSourceRegister
with RelationProvider
def createReader(options: DataSourceOptions) =
val path = options.get("path").get
val sc = SparkSession.builder.getOrCreate().sparkContext
val conf = sc.hadoopConfiguration
new MyDataSourceReader(path, conf)
override def shortName(): String = "csv2"
override def createRelation(sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation =
new Csv2Relation(sqlContext, parameters("path"))
class Csv2Relation(context:SQLContext, path:String)
extends BaseRelation
with PrunedFilteredScan
val _sqlContext = context;
val _path = path;
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] =
// *** receive filters here when running in SQL CLI
val sc = SparkSession.builder.getOrCreate().sparkContext
val conf = sc.hadoopConfiguration
new DataSourceRDD(sc, new MyDataSourceReader(_path, conf).createDataReaderFactories())
因此需要实现 2 个接口才能使其在所有场景中工作:PrunedFilteredScan + (SupportsPushDownFilters 或 SupportsPushDownCatalystFilters)。
【讨论】:
以上是关于Spark-SQL CLI:未调用 SupportsPushDownFilters.pushFilters的主要内容,如果未能解决你的问题,请参考以下文章
spark-sql(spark sql cli)客户端集成hive
Spark-sql CLI 在运行查询时仅使用 1 个执行程序
Laravel 5.2 - 调用未定义的方法 Illuminate\Support\Facades\Request::session()