Spark SubQuery 扫描整个分区
Posted
技术标签:
【中文标题】Spark SubQuery 扫描整个分区【英文标题】:Spark SubQuery scan whole partition 【发布时间】:2019-07-11 17:55:13 【问题描述】:我有一个按“日期”字段分区的配置单元表 我想写一个查询来获取最新(最大)分区的数据。
spark.sql("select field from table where date_of = '2019-06-23'").explain(True)
vs
spark.sql("select filed from table where date_of = (select max(date_of) from table)").explain(True)
以下是两个查询的物理计划
*(1) Project [qbo_company_id#120L]
+- *(1) FileScan parquet
table[qbo_company_id#120L,date_of#157] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[s3location..., PartitionCount: 1, PartitionFilters: [isnotnull(date_of#157), (cast(date_of#157 as string) = 2019-06-23)], PushedFilters: [], ReadSchema: struct<qbo_company_id:bigint>
*(1) Project [qbo_company_id#1L]
+- *(1) Filter (date_of#38 = Subquery subquery0)
: +- Subquery subquery0
: +- *(2) HashAggregate(keys=[], functions=[max(date_of#76)], output=[max(date_of)#78])
: +- Exchange SinglePartition
: +- *(1) HashAggregate(keys=[], functions=[partial_max(date_of#76)], output=[max#119])
: +- LocalTableScan [date_of#76]
+- *(1) FileScan parquet
table[qbo_company_id#1L,date_of#38] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[s3location..., PartitionCount: 1836, PartitionFilters: [isnotnull(date_of#38)], PushedFilters: [], ReadSchema: struct<qbo_company_id:bigint>
为什么子查询扫描整个分区而不是选择最新的分区?借助有关分区的元数据,为什么不能只扫描所需的分区?
【问题讨论】:
嗨@Selvam date_of 的类型是什么?如果是日期或时间戳字段,max 函数将起作用 基于这里的良好讨论,我将其解码为 pyspark。以防万一有人想参考 Pyspark 中的解决方案。 ***.com/questions/55053218/… 【参考方案1】:如果我是你……我更喜欢不同的方法,而不是 sql 查询和全表扫描。
spark.sql(s"show partitions $tablename")
然后,我会将其转换为具有 joda 日期列的 Seq[scala.collection.immutable.Map[String, org.joda.time.DateTime]
/**
* listMyHivePartitions - lists hive partitions as sequence of map
* @param tableName String
* @param spark SparkSession
* @return Seq[Map[String, DateTime]]
*/
def listMyHivePartitions(tableName :String,spark:SparkSession) : Seq[Map[String, DateTime]] =
println(s"Listing the keys from $tableName")
val partitions: Seq[String] = spark.sql(s"show partitions $tableName").collect().map(row =>
println(s" Identified Key: $row.toString()")
row.getString(0)
).toSeq
println(s"Fetched $partitions.size partitons from $tableName")
partitions.map(key => key.split("/").toSeq.map(keyVal =>
val keyValSplit = keyVal.split("=")
(keyValSplit(0).toLowerCase().trim, new DateTime(keyValSplit(1).trim))
).toMap)
将适用
getRecentPartitionDate
如下所示
/**
* getRecentPartitionDate.
*
* @param column String
* @param seqOfMap @see Seq[scala.collection.immutable.Map[String, org.joda.time.DateTime]
**/
def getRecentPartitionDate(column: String, seqOfMap: Seq[scala.collection.immutable.Map[String, org.joda.time.DateTime]]): Option[Map[String, DateTime]] =
logger.info(" >>>>> column " + column)
val mapWithMostRecentBusinessDate = seqOfMap.sortWith(
(a, b) =>
logger.debug(a(column).toString() + " col2" + b(column).toString())
a(column).isAfter(b(column))
)
logger.debug(s" mapWithMostRecentBusinessDate: $mapWithMostRecentBusinessDate , \n Head = $mapWithMostRecentBusinessDate.headOption ")
mapWithMostRecentBusinessDate.headOption
优点是没有 sqls 没有全表扫描...
当您从后端数据库 hivemetastore 查询时也可以应用上述内容,将文件显示分区表,查询结果为java.sql.ResultSet
/**
* showParts.
*
* @param table
* @param config
* @param stmt
*/
def showParts(table: String, config: Config, stmt: Statement): Seq[scala.collection.immutable.Map[String, org.joda.time.DateTime]] =
val showPartitionsCmd = " show partitions " + table;
logger.info("showPartitionsCmd " + showPartitionsCmd)
try
val resultSet = stmt.executeQuery(showPartitionsCmd)
// checkData(resultSet)
val result = resultToSeq(resultSet);
logger.info(s"partitions of $table ->" + showPartitionsCmd + table);
logger.debug("result " + result)
result
catch
case e: Exception => logger.error(s"Exception occurred while show partitions table $table..", e)
null
/** *
* resultToSeq.
*
* @param queryResult
*/
def resultToSeq(queryResult: ResultSet) =
val md = queryResult.getMetaData
val colNames = for (i <- 1 to md.getColumnCount) yield md.getColumnName(i)
var rows = Seq[scala.collection.immutable.Map[String, org.joda.time.DateTime]]()
while (queryResult.next())
var row = scala.collection.immutable.Map.empty[String, DateTime]
for (n <- colNames)
val str = queryResult.getString(n).split("=")
//str.foreach(logger.info)
import org.joda.time.format.DateTimeFormat
val format = DateTimeFormat.forPattern("yyyy-MM-dd")
row += str(0) -> DateTime.parse(str(1)) //.toString(DateTimeFormat.shortDate())
logger.debug(row.toString())
rows = rows :+ row
rows
在获得地图序列后,我将在顶部应用 def,即getRecentPartitionDate
【讨论】:
代替这个我可以做 spark.sql("select max(field) from table").first()/collect.然后我可以在字符串查询中传递这个值。我不是在问如何提高查询性能,而是为什么子查询要进行全扫描。 子查询在这种情况下对表进行全扫描。在正常的 sql 情况下也是如此 不要使用查询来获取最新的分区。是我所说的要点......在上述方法中显示分区命令未使用查询或子查询,并且性能问题复杂化的东西是我所说的......希望你能正确理解。 如果您尝试过,是的,我们可以,但问题是我们在时间格式等方面没有最低控制权。这就是我以这种方式完成的一个项目中更可控的原因。再次 orderBy 是数据帧级别的排序,它可能会洗牌。我所做的是简单的 scala 函数级别排序......所以这些是不同的。我想我可以使用显示分区来提出想法......是这个答案的要点。其余一切都基于您的舒适度和要求。我已经提到了实现目标的两种强大而简单的方法。 AFAIK 在这种情况下,它将对底层数据源和优化进行全面扫描。这就是发生全面扫描的原因【参考方案2】:以 Ram 的回答为基础,有一种更简单的方法来完成此任务,通过直接查询 Hive 元存储而不是执行 Spark-SQL 查询来消除大量开销。无需重新发明***:
import org.apache.hadoop.hive.conf.HiveConf
import scala.collection.JavaConverters._
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient
val hiveConf = new HiveConf(spark.sparkContext.hadoopConfiguration, classOf[HiveConf])
val cli = new HiveMetaStoreClient(hiveConf)
val maxPart = cli.listPartitions("<db_name>", "<tbl_name>", Short.MaxValue).asScala.map(_.getValues.asScala.mkString(",")).max
【讨论】:
以上是关于Spark SubQuery 扫描整个分区的主要内容,如果未能解决你的问题,请参考以下文章
SPARK闲杂--为什么复用Exchange和subquery