SPARK中 DS V2 push down(下推)的一些说明
Posted 鸿乃江边鸟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SPARK中 DS V2 push down(下推)的一些说明相关的知识,希望对你有一定的参考价值。
背景
本文基于 SPARK 3.3.0
在之前的文章 SPARK中的FileSourceStrategy,DataSourceStrategy以及DataSourceV2Strategy规则 我们有提到 DS V2 push down的功能,如JDBC 复杂下推,以及Parquet的聚合下推等等。其实这里面有个比较大的背景–就是TableCatalog
类。
结论
先说结论,这些聚合下推的大前提是,在spark中已经配置了对应的catalog,如下:
spark.sql.catalog.h2=org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
分析
在Rule V2ScanRelationPushDown
一系列的规则中,第一个规则createScanBuilder
:
private def createScanBuilder(plan: LogicalPlan) = plan.transform
case r: DataSourceV2Relation =>
ScanBuilderHolder(r.output, r, r.table.asReadable.newScanBuilder(r.options))
只有是DataSourceV2Relation
类型,也就是 DS v2,才会转换为 ScanBuilderHolder, 而后续的pushDownFilters
,pushDownAggregates
规则则是基于ScanBuilderHolder来做转换的(如果有遇到ScanBuilderHolder类型才会进行DS v2特有的规则转换),所以DataSourceV2Relation是从哪里来的是关键
。
直接说重点:
在RULE ResolveRelations
中会进行 UnresolvedRelation
到DataSourceV2Relation
或是UnresolvedCatalogRelation
的转换:
object ResolveRelations extends Rule[LogicalPlan]
...
def apply(plan: LogicalPlan)
: LogicalPlan = plan.resolveOperatorsUpWithPruning(AlwaysProcess.fn, ruleId)
case i @ InsertIntoStatement(table, _, _, _, _, _) if i.query.resolved =>
val relation = table match
case u: UnresolvedRelation if !u.isStreaming =>
lookupRelation(u).getOrElse(u)
case other => other
这里的lookupRelation
会根据是否有对应的Catalog的注册来判断是DS V1还是DS V2:
private def lookupRelation(
u: UnresolvedRelation,
timeTravelSpec: Option[TimeTravelSpec] = None): Option[LogicalPlan] =
lookupTempView(u.multipartIdentifier, u.isStreaming, timeTravelSpec.isDefined).orElse
expandIdentifier(u.multipartIdentifier) match
case CatalogAndIdentifier(catalog, ident) =>
val key = catalog.name +: ident.namespace :+ ident.name
AnalysisContext.get.relationCache.get(key).map(_.transform
case multi: MultiInstanceRelation =>
val newRelation = multi.newInstance()
newRelation.copyTagsFrom(multi)
newRelation
).orElse
val table = CatalogV2Util.loadTable(catalog, ident, timeTravelSpec)
val loaded = createRelation(catalog, ident, table, u.options, u.isStreaming)
loaded.foreach(AnalysisContext.get.relationCache.update(key, _))
loaded
case _ => None
...
private def expandIdentifier(nameParts: Seq[String]): Seq[String] =
if (!isResolvingView || isReferredTempViewName(nameParts)) return nameParts
if (nameParts.length == 1)
AnalysisContext.get.catalogAndNamespace :+ nameParts.head
else if (catalogManager.isCatalogRegistered(nameParts.head))
nameParts
else
AnalysisContext.get.catalogAndNamespace.head +: nameParts
object CatalogAndIdentifier
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
private val globalTempDB = SQLConf.get.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Identifier)] =
assert(nameParts.nonEmpty)
if (nameParts.length == 1)
Some((currentCatalog, Identifier.of(catalogManager.currentNamespace, nameParts.head)))
else if (nameParts.head.equalsIgnoreCase(globalTempDB))
// Conceptually global temp views are in a special reserved catalog. However, the v2 catalog
// API does not support view yet, and we have to use v1 commands to deal with global temp
// views. To simplify the implementation, we put global temp views in a special namespace
// in the session catalog. The special namespace has higher priority during name resolution.
// For example, if the name of a custom catalog is the same with `GLOBAL_TEMP_DATABASE`,
// this custom catalog can't be accessed.
Some((catalogManager.v2SessionCatalog, nameParts.asIdentifier))
else
try
Some((catalogManager.catalog(nameParts.head), nameParts.tail.asIdentifier))
catch
case _: CatalogNotFoundException =>
Some((currentCatalog, nameParts.asIdentifier))
expandIdentifier
方法结合CatalogAndIdentifier.unapply
方法,判断:
- 1.如果没有指定catalog,则 默认catalog 为
v2SessionCatalog
,catalog的名称为"spark_catalog"
,这也是spark默认的sessionCatalog 名称,跳到步骤3
如以下SQL:select a from table
- 2.如果指定了catalog,且catalog已经注册了(如以spark.sql.catalog.h2=org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog),则catalog为指定的(如为
JDBCTableCatalog
,catalog的名称为"h2"
,跳到步骤3
如以下SQL:select a from h2.table
- 3.调用
CatalogV2Util.loadTable
方法也就是对应的Catalog的loadTable方法来获取对应的Table:V2SessionCatalog
catalog返回是的V1Table
JDBCTableCatalog
catalog 返回的是JDBCTable
这样在下一步的createRelation
方法中就会根据不同的case转换为不同的relation:
private def createRelation(
catalog: CatalogPlugin,
ident: Identifier,
table: Option[Table],
options: CaseInsensitiveStringMap,
isStreaming: Boolean): Option[LogicalPlan] =
...
case v1Table: V1Table if CatalogV2Util.isSessionCatalog(catalog) =>
if (isStreaming)
if (v1Table.v1Table.tableType == CatalogTableType.VIEW)
throw QueryCompilationErrors.permanentViewNotSupportedByStreamingReadingAPIError(
ident.quoted)
SubqueryAlias(
catalog.name +: ident.asMultipartIdentifier,
UnresolvedCatalogRelation(v1Table.v1Table, options, isStreaming = true))
else
v1SessionCatalog.getRelation(v1Table.v1Table, options)
...
case table =>
...
else
SubqueryAlias(
catalog.name +: ident.asMultipartIdentifier,
DataSourceV2Relation.create(table, Some(catalog), Some(ident), options))
- 如果是V1Table,则会转换为
UnresolvedCatalogRelation
,继而在 RuleFindDataSourceTable
中转为LogicalRelation
,这里就会涉及lookupDataSource
,也就是注册的datasource
(如:“org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider” 或者 "org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2”(目前没有进行cast匹配))发生作用了(在providingInstance()方法中实现) - 如果是其他的,则会转换为
DataSourceV2Relation
,继而在RuleV2ScanRelationPushDown
中做一系列的下推优化
所以说 对于JDBC的catalog来说,想要进行DS V2的优化,就得配置:
spark.sql.catalog.h2=org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
如果想要对于其他DS v2的优化,如Parquet,就得实现对应的TableCatalog,再进行配置:
spark.sql.catalog.parquet=org.apache.spark.sql.execution.datasources.v2.jdbc.xxxx
关于TableCatalog
目前 jdbc的datasource和TableCatalog 在spark都是已经实现了:
## datasource
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
## TableCatalog
org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
如果想实现其他的datasource以及catalog,可以参考JDBC的实现(目前的JDBC的source实现还是基于 DS V1,最好是基于DS V2实现,如参考:ParquetDataSourceV2
)。
在SPARK-28396也有这方面的讨论。
更进一步DS V2 Push Down的特性,参考技术前沿|Spark 3.3.0 中 DS V2 Push-down 的重构与新特性
以上是关于SPARK中 DS V2 push down(下推)的一些说明的主要内容,如果未能解决你的问题,请参考以下文章
spark outer join push down filer rule(spark 外连接中的下推规则)
spark outer join push down filter rule(spark 外连接中的下推规则)
SPARK中的FileSourceStrategy,DataSourceStrategy以及DataSourceV2Strategy规则
SPARK中的FileSourceStrategy,DataSourceStrategy以及DataSourceV2Strategy