Spark SubQuery 扫描整个分区

Posted

技术标签:

【中文标题】Spark SubQuery 扫描整个分区【英文标题】:Spark SubQuery scan whole partition 【发布时间】:2019-07-11 17:55:13 【问题描述】:

我有一个按“日期”字段分区的配置单元表 我想写一个查询来获取最新(最大)分区的数据。

spark.sql("select field from table  where date_of = '2019-06-23'").explain(True)
vs 
spark.sql("select filed from table where date_of = (select max(date_of) from table)").explain(True)

以下是两个查询的物理计划

*(1) Project [qbo_company_id#120L]
        +- *(1) FileScan parquet 
    table[qbo_company_id#120L,date_of#157] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[s3location..., PartitionCount: 1, PartitionFilters: [isnotnull(date_of#157), (cast(date_of#157 as string) = 2019-06-23)], PushedFilters: [], ReadSchema: struct<qbo_company_id:bigint>

*(1) Project [qbo_company_id#1L]
+- *(1) Filter (date_of#38 = Subquery subquery0)
   :  +- Subquery subquery0
   :     +- *(2) HashAggregate(keys=[], functions=[max(date_of#76)], output=[max(date_of)#78])
   :        +- Exchange SinglePartition
   :           +- *(1) HashAggregate(keys=[], functions=[partial_max(date_of#76)], output=[max#119])
   :              +- LocalTableScan [date_of#76]
   +- *(1) FileScan parquet 
table[qbo_company_id#1L,date_of#38] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[s3location..., PartitionCount: 1836, PartitionFilters: [isnotnull(date_of#38)], PushedFilters: [], ReadSchema: struct<qbo_company_id:bigint>

为什么子查询扫描整个分区而不是选择最新的分区?借助有关分区的元数据,为什么不能只扫描所需的分区?

【问题讨论】:

嗨@Selvam date_of 的类型是什么?如果是日期或时间戳字段,max 函数将起作用 基于这里的良好讨论,我将其解码为 pyspark。以防万一有人想参考 Pyspark 中的解决方案。 ***.com/questions/55053218/… 【参考方案1】:

如果我是你……我更喜欢不同的方法,而不是 sql 查询和全表扫描。

spark.sql(s"show partitions $tablename")

然后,我会将其转换为具有 joda 日期列的 Seq[scala.collection.immutable.Map[String, org.joda.time.DateTime]

/**
    * listMyHivePartitions - lists hive partitions as sequence of map
    * @param tableName String
    * @param spark SparkSession
    * @return Seq[Map[String, DateTime]]
    */
  def listMyHivePartitions(tableName :String,spark:SparkSession) : Seq[Map[String, DateTime]]  = 
    println(s"Listing the keys from $tableName")
    val partitions: Seq[String] = spark.sql(s"show partitions $tableName").collect().map(row => 
      println(s" Identified Key: $row.toString()")
      row.getString(0)
    ).toSeq
    println(s"Fetched $partitions.size  partitons from $tableName")
    partitions.map(key => key.split("/").toSeq.map(keyVal => 
      val keyValSplit = keyVal.split("=")
      (keyValSplit(0).toLowerCase().trim, new DateTime(keyValSplit(1).trim))
    ).toMap)
  

将适用

getRecentPartitionDate 如下所示

/**
    * getRecentPartitionDate.
    *
    * @param column   String
    * @param seqOfMap  @see Seq[scala.collection.immutable.Map[String, org.joda.time.DateTime]
    **/
  def getRecentPartitionDate(column: String, seqOfMap: Seq[scala.collection.immutable.Map[String, org.joda.time.DateTime]]): Option[Map[String, DateTime]] = 
    logger.info(" >>>>> column " + column)
    val mapWithMostRecentBusinessDate = seqOfMap.sortWith(
      (a, b) => 
        logger.debug(a(column).toString() + " col2" + b(column).toString())
        a(column).isAfter(b(column))
      
    )

    logger.debug(s" mapWithMostRecentBusinessDate: $mapWithMostRecentBusinessDate , \n Head = $mapWithMostRecentBusinessDate.headOption ")

    mapWithMostRecentBusinessDate.headOption
  

优点是没有 sqls 没有全表扫描...

当您从后端数据库 hivemetastore 查询时也可以应用上述内容,将文件显示分区表,查询结果为java.sql.ResultSet

 /**
        * showParts.
        *
        * @param table
        * @param config
        * @param stmt
        */
      def showParts(table: String, config: Config, stmt: Statement): Seq[scala.collection.immutable.Map[String, org.joda.time.DateTime]] = 
        val showPartitionsCmd = " show partitions " + table;
        logger.info("showPartitionsCmd " + showPartitionsCmd)
        try 
          val resultSet = stmt.executeQuery(showPartitionsCmd)

          // checkData(resultSet)
          val result = resultToSeq(resultSet);
          logger.info(s"partitions of $table ->" + showPartitionsCmd + table);
          logger.debug("result " + result)

          result
        
        catch 
          case e: Exception => logger.error(s"Exception occurred while show partitions table $table..", e)
            null
        
      

      /** *
        * resultToSeq.
        *
        * @param queryResult
        */
      def resultToSeq(queryResult: ResultSet) = 
        val md = queryResult.getMetaData

        val colNames = for (i <- 1 to md.getColumnCount) yield md.getColumnName(i)
        var rows = Seq[scala.collection.immutable.Map[String, org.joda.time.DateTime]]()
        while (queryResult.next()) 
          var row = scala.collection.immutable.Map.empty[String, DateTime]
          for (n <- colNames) 
            val str = queryResult.getString(n).split("=")

            //str.foreach(logger.info)
            import org.joda.time.format.DateTimeFormat
            val format = DateTimeFormat.forPattern("yyyy-MM-dd")
            row += str(0) -> DateTime.parse(str(1)) //.toString(DateTimeFormat.shortDate())
            logger.debug(row.toString())
          
          rows = rows :+ row
        

        rows
      

在获得地图序列后,我将在顶部应用 def,即getRecentPartitionDate

【讨论】:

代替这个我可以做 spark.sql("select max(field) from table").first()/collect.然后我可以在字符串查询中传递这个值。我不是在问如何提高查询性能,而是为什么子查询要进行全扫描。 子查询在这种情况下对表进行全扫描。在正常的 sql 情况下也是如此 不要使用查询来获取最新的分区。是我所说的要点......在上述方法中显示分区命令未使用查询或子查询,并且性能问题复杂化的东西是我所说的......希望你能正确理解。 如果您尝试过,是的,我们可以,但问题是我们在时间格式等方面没有最低控制权。这就是我以这种方式完成的一个项目中更可控的原因。再次 orderBy 是数据帧级别的排序,它可能会洗牌。我所做的是简单的 scala 函数级别排序......所以这些是不同的。我想我可以使用显示分区来提出想法......是这个答案的要点。其余一切都基于您的舒适度和要求。我已经提到了实现目标的两种强大而简单的方法。 AFAIK 在这种情况下,它将对底层数据源和优化进行全面扫描。这就是发生全面扫描的原因【参考方案2】:

以 Ram 的回答为基础,有一种更简单的方法来完成此任务,通过直接查询 Hive 元存储而不是执行 Spark-SQL 查询来消除大量开销。无需重新发明***:

import org.apache.hadoop.hive.conf.HiveConf
import scala.collection.JavaConverters._
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient

val hiveConf = new HiveConf(spark.sparkContext.hadoopConfiguration, classOf[HiveConf])
val cli = new HiveMetaStoreClient(hiveConf)
val maxPart = cli.listPartitions("<db_name>", "<tbl_name>", Short.MaxValue).asScala.map(_.getValues.asScala.mkString(",")).max

【讨论】:

以上是关于Spark SubQuery 扫描整个分区的主要内容,如果未能解决你的问题,请参考以下文章

SPARK闲杂--为什么复用Exchange和subquery

SPARK闲杂--为什么复用Exchange和subquery

通过联接合并到 BigQuery 分区表中,无需扫描整个表

7.spark core之数据分区

Spark和Java API分区

spark数据分区数量的原理