SPARK中InMemoryFileIndex文件缓存导致的REFRESH TABLE tableName问题
Posted 鸿乃江边鸟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SPARK中InMemoryFileIndex文件缓存导致的REFRESH TABLE tableName问题相关的知识,希望对你有一定的参考价值。
背景
在spark中,有时候会报出running ‘REFRESH TABLE tableName’ command in SQL or by recreating the Dataset/DataFrame involved.的错误,这种错误的原因有一种隐形的原因,那就是InMemoryFileIndex会缓存需要scan的文件在内存中,
分析
在scan file的过程中,最主要涉及的是CatalogFileIndex类,该类中的方法filterPartitions会创建InMemoryFileIndex:
def filterPartitions(filters: Seq[Expression]): InMemoryFileIndex =
if (table.partitionColumnNames.nonEmpty)
val startTime = System.nanoTime()
val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
table.identifier, filters)
val partitions = selectedPartitions.map p =>
val path = new Path(p.location)
val fs = path.getFileSystem(hadoopConf)
PartitionPath(
p.toRow(partitionSchema, sparkSession.sessionState.conf.sessionLocalTimeZone),
path.makeQualified(fs.getUri, fs.getWorkingDirectory))
val partitionSpec = PartitionSpec(partitionSchema, partitions)
val timeNs = System.nanoTime() - startTime
new InMemoryFileIndex(sparkSession,
rootPathsSpecified = partitionSpec.partitions.map(_.path),
parameters = Map.empty,
userSpecifiedSchema = Some(partitionSpec.partitionColumns),
fileStatusCache = fileStatusCache,
userSpecifiedPartitionSpec = Some(partitionSpec),
metadataOpsTimeNs = Some(timeNs))
else
new InMemoryFileIndex(sparkSession, rootPaths, parameters = table.storage.properties,
userSpecifiedSchema = None, fileStatusCache = fileStatusCache)
而在该InMemoryFileIndex中有成员变量fileStatusCache,该成员变量的赋值通过*FileStatusCache.getOrCreate(sparkSession)*而来:
def getOrCreate(session: SparkSession): FileStatusCache = synchronized
if (session.sqlContext.conf.manageFilesourcePartitions &&
session.sqlContext.conf.filesourcePartitionFileCacheSize > 0)
if (sharedCache == null)
sharedCache = new SharedInMemoryCache(
session.sqlContext.conf.filesourcePartitionFileCacheSize,
session.sqlContext.conf.metadataCacheTTL
)
sharedCache.createForNewClient()
else
NoopCache
而SharedInMemoryCache的会调用guava的CacheBuilder方法:
var builder = CacheBuilder.newBuilder()
.weigher(weigher)
.removalListener(removalListener)
.maximumWeight(maxSizeInBytes / weightScale)
if (cacheTTL > 0)
builder = builder.expireAfterWrite(cacheTTL, TimeUnit.SECONDS)
builder.build[(ClientId, Path), Array[FileStatus]]()
该方法会对最终要scan的文件进行缓存处理,该缓存文件的调用路径如下:
FileSourceScanExec.doExecute => inputRDD => createBucketedReadRDD或者createNonBucketedReadRDD => dynamicallySelectedPartitions => selectedPartitions
||
\\/
CatalogFileIndex.listFiles => filterPartitions
||
\\/
InMemoryFileIndex.listFiles => partitionSpec => inferPartitioning => leafDirToChildrenFiles
其中cachedLeafDirToChildrenFiles的值会在InMemoryFileIndex对象初始化的时候进行赋值,对应的方法为refresh0:
private def refresh0(): Unit =
val files = listLeafFiles(rootPaths)
cachedLeafFiles =
new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent)
cachedPartitionSpec = null
listLeafFiles方法就会对调用fileStatusCache.getLeafFile从而获取到缓存的文件路径
结论
文中提到的 REFRESH TABLE tableName’ command in SQL错误,在两个不同的jvm进程中对于同一个表的读写中回经常出现,这里有三个参数可以去设置:
spark.sql.hive.filesourcePartitionFileCacheSize
spark.sql.hive.manageFilesourcePartitions
spark.sql.metadataCacheTTLSeconds
其他
-
那为什么在同一个jvm中对一个表的读写不会报这种错误呢?那是因为,在一个jvm中,比如说是写了之后再读取,会进行refresh操作,从而调用fileStatusCache.invalidateAll()方法,最终使文件缓存失效.
拿SQL:insert overwrite table a举例,该语句最终会生成InsertIntoHadoopFsRelationCommand物理计划,该run方法最终会调用fileIndex的refresh方法,从而调用*fileStatusCache.invalidateAll()*方法: -
spark.sql.metadataCacheTTLSeconds还有其他的作用
在开启spark.sql.hive.convertMetastoreParquet或者spark.sql.hive.convertMetastoreOrc的情况下,在转换对应的逻辑计划当中,如果缓存中存在对应的表,则会复用缓存中的,具体的方法在HiveMetastoreCatalog.convertToLogicalRelation 中 ,最主要的点是会公用同一个CatalogFileIndex对象,从而实现了文件的复用,从未导致问题
以上是关于SPARK中InMemoryFileIndex文件缓存导致的REFRESH TABLE tableName问题的主要内容,如果未能解决你的问题,请参考以下文章
使用 spark-csv 在 zeppelin 中读取 csv 文件
使用 Scala/Spark 列出目录中的文件(包括文件信息)