Spark-SQL CLI:未调用 SupportsPushDownFilters.pushFilters

Posted

技术标签:

【中文标题】Spark-SQL CLI:未调用 SupportsPushDownFilters.pushFilters【英文标题】:Spark-SQL CLI: SupportsPushDownFilters.pushFilters not called 【发布时间】:2018-08-15 23:24:58 【问题描述】:

尝试使用 SupportsPushDownFilters 实现 DataSourceV2。在 spark 2.3.1、spark-sql CLI 和 spark-shell 中对其进行测试。

问题:从 spark-sql 运行查询时未调用 SupportsPushDownFilters.pushFilters(我的断点未命中),但直接使用 DataFrame 时调用。

我的代码:

class DefaultSource extends ReadSupport
   with DataSourceRegister
   with RelationProvider 

  def createReader(options: DataSourceOptions) = 
     val path = options.get("path").get
     val sc = SparkSession.builder.getOrCreate().sparkContext
     val conf = sc.hadoopConfiguration
     new MyDataSourceReader(path, conf)
  


class MyDataSourceReader(path: String, conf:Configuration)
  extends DataSourceReader
  with SupportsPushDownFilters 

  override def pushFilters(filters: Array[Filter]): Array[Filter] = 
    println(filters.toList)
    filters
  

直接使用 DataFrames 或 spark.sql API 时推送的过滤器(注意控制台输出打印了过滤器):

scala> val df=spark.read.format("com.my.spark.datasource.csv2").load("test.csv2")
scala> df.filter("age>24").show
List(IsNotNull(age), GreaterThan(age,24))
+----+---+----------+
|name|age|      addr|
+----+---+----------+
| Ann| 25|one st. 12|
|Mary| 27|one st. 14|
+----+---+----------+

scala> df.createOrReplaceTempView("v1")
scala> spark.sql("select * from v1 where age>24").show
List(IsNotNull(age), GreaterThan(age,24))
+----+---+----------+
|name|age|      addr|
+----+---+----------+
| Ann| 25|one st. 12|
|Mary| 27|one st. 14|
+----+---+----------+

当从 SQL CLI 运行相同的查询时,过滤器不会被按下,(在 CLI 输出中没有什么可以确认的,只是显示了查询的执行方式。调试我的数据源时我的断点没有命中):

E:\git\spark-2.3.0>bin\spark-sql
Listening for transport dt_socket at address: 5005
spark-sql> CREATE TEMPORARY VIEW v1 USING com.my.spark.datasource.csv2 OPTIONS 
(path "test.csv2");
Time taken: 2.188 seconds
18/08/16 09:46:52 INFO SparkSQLCLIDriver: Time taken: 2.188 seconds

spark-sql> select * from v1 where age>24;
18/08/16 09:47:22 INFO DAGScheduler: Job 0 finished: processCmd at 
CliDriver.java:376, took 12.326064 s
Ann     25      one st. 12
Mary    27      one st. 14
Time taken: 13.862 seconds, Fetched 2 row(s)
18/08/16 09:47:22 INFO SparkSQLCLIDriver: Time taken: 13.862 seconds, Fetched 2 row(s)

spark-sql> select * from (select * from v1 where age>24) t1;
Ann     25      one st. 12
Mary    27      one st. 14
Time taken: 0.146 seconds, Fetched 2 row(s)
18/08/16 09:47:37 INFO SparkSQLCLIDriver: Time taken: 0.146 seconds, Fetched 2  row(s)

调试 spark 引擎,据我所知,问题是“从 spark-sql CLI 调用”路径不会在计划中生成 DataSourceV2Relation,而它是在 DataFrameReader.load 中创建的。

我是否遗漏了什么并且需要做任何额外的事情来让过滤器在 spark-sql 上下推?还是已知问题?

【问题讨论】:

我不确定我是否完全理解。我也在阅读这方面的内容,但我很确定我了解到,如果我向 mysql JDBC 调用添加 where 子句,则下推到 mySQL 将在服务器端自动发生,提供类似这样的内容已完成:sql = "(select * from mytable where day = 2016-11-25 and hour = 10) t1"。注意 t1。我注意到您在这里没有执行此类查询,但我在其他地方读到这也适用于 JDBC 源。如果你能澄清一下。 更新的问题(包装到子查询也无济于事)。我是新来的火花,所以可能是错的。 AFAIK SQL CLI(我运行它的方式)不使用 JDBC(与通过直线运行相反)并且直接针对 spark SQL 驱动程序运行类似 SQL 的文本查询,它应该以与 SCALA API 相同的方式解释它。至少它没有过滤器,过滤器甚至没有到达我的自定义数据源。我不确定 MySql DS 是如何实现的,如果它确实使用 SQL CLI 下推过滤器,那么我在这里做错了,但无法弄清楚它是什么。 我现在也很困惑。周末看看 db-blog.web.cern.ch/blog/luca-canali/… 这是一篇有趣的文章 【参考方案1】:

浏览使用 Filter 的 spark 代码发现了这个 PrunedFilteredScan。事实证明,在执行 SQL CLI 查询时需要实现接收过滤器(buildScan):

class DefaultSource extends ReadSupport
      with DataSourceRegister
      with RelationProvider


  def createReader(options: DataSourceOptions) = 
    val path = options.get("path").get
    val sc = SparkSession.builder.getOrCreate().sparkContext
    val conf = sc.hadoopConfiguration
    new MyDataSourceReader(path, conf)
  

  override def shortName(): String = "csv2"

  override def createRelation(sqlContext: SQLContext,
                              parameters: Map[String, String]): BaseRelation =
    new Csv2Relation(sqlContext, parameters("path"))


class Csv2Relation(context:SQLContext, path:String)
  extends BaseRelation
    with PrunedFilteredScan

  val _sqlContext = context;
  val _path = path;

  override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = 
    // *** receive filters here when running in SQL CLI
    val sc = SparkSession.builder.getOrCreate().sparkContext
    val conf = sc.hadoopConfiguration
    new DataSourceRDD(sc, new MyDataSourceReader(_path, conf).createDataReaderFactories())
  

因此需要实现 2 个接口才能使其在所有场景中工作:PrunedFilteredScan + (SupportsPushDownFilters 或 SupportsPushDownCatalystFilters)。

【讨论】:

以上是关于Spark-SQL CLI:未调用 SupportsPushDownFilters.pushFilters的主要内容,如果未能解决你的问题,请参考以下文章

spark-sql(spark sql cli)客户端集成hive

Spark-sql CLI 在运行查询时仅使用 1 个执行程序

Laravel 5.2 - 调用未定义的方法 Illuminate\Support\Facades\Request::session()

Spark-Sql整合hive,在spark-sql命令和spark-shell命令下执行sql命令和整合调用hive

Spark-SQL之DataFrame操作大全

Spark-SQL之DataFrame操作大全