Delta Lake中CDC的实现
Posted 鸿乃江边鸟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Delta Lake中CDC的实现相关的知识,希望对你有一定的参考价值。
背景
本文基于delta 2.0.0
Delta是通过CDF(change data feed)来实现CDC(change data capture)。
CDF是能让表能够输出数据表变化的能力,CDC是能够捕获和识别数据的变化,并能够将变化的数据交给下游做进一步的处理。
我们来分析一下是怎么做到数据行级别的CDF的
分析
在设置delta.enableChangeDataFeed= true
的前提下(在Enable change data fedd有提及),我们分析一下逻辑计划DeltaDelete
对应的RunnableCommand DeleteCommand
的Run方法:
final override def run(sparkSession: SparkSession): Seq[Row] =
recordDeltaOperation(deltaLog, "delta.dml.delete")
deltaLog.assertRemovable()
deltaLog.withNewTransaction txn =>
val deleteActions = performDelete(sparkSession, deltaLog, txn)
if (deleteActions.nonEmpty)
txn.commit(deleteActions, DeltaOperations.Delete(condition.map(_.sql).toSeq))
// Re-cache all cached plans(including this relation itself, if it's cached) that refer to
// this data source relation.
sparkSession.sharedState.cacheManager.recacheByPlan(sparkSession, target)
Seq.empty[Row]
最重要的方法是performDelete
,这里的performDelete
方法会根据condition来做不同的操作:
- 如果没有条件,那就是全部删除:
case None =>
// Case 1: Delete the whole table if the condition is true
val allFiles = txn.filterFiles(Nil)
numRemovedFiles = allFiles.size
scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
val (numBytes, numPartitions) = totalBytesAndDistinctPartitionValues(allFiles)
numBytesRemoved = numBytes
numFilesBeforeSkipping = numRemovedFiles
numBytesBeforeSkipping = numBytes
numFilesAfterSkipping = numRemovedFiles
numBytesAfterSkipping = numBytes
if (txn.metadata.partitionColumns.nonEmpty)
numPartitionsAfterSkipping = Some(numPartitions)
numPartitionsRemovedFrom = Some(numPartitions)
numPartitionsAddedTo = Some(0)
val operationTimestamp = System.currentTimeMillis()
allFiles.map(_.removeWithTimestamp(operationTimestamp))
这里列举该表的所有文件,并把所有的文件标识为RemoveFile
,并带上时间戳。
-
如果是有筛选条件,则会根据筛选条件进行行级别的记录变更(这里还会根据情况具体分析):
- 能够从delta的元数据能够囊括过滤的条件,则只是在元数据层面进行修改:
case Some(cond) => val (metadataPredicates, otherPredicates) = DeltaTableUtils.splitMetadataAndDataPredicates( cond, txn.metadata.partitionColumns, sparkSession) numFilesBeforeSkipping = txn.snapshot.numOfFiles numBytesBeforeSkipping = txn.snapshot.sizeInBytes if (otherPredicates.isEmpty) // Case 2: The condition can be evaluated using metadata only. // Delete a set of files without the need of scanning any data files. val operationTimestamp = System.currentTimeMillis() val candidateFiles = txn.filterFiles(metadataPredicates) scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000 numRemovedFiles = candidateFiles.size numBytesRemoved = candidateFiles.map(_.size).sum numFilesAfterSkipping = candidateFiles.size val (numCandidateBytes, numCandidatePartitions) = totalBytesAndDistinctPartitionValues(candidateFiles) numBytesAfterSkipping = numCandidateBytes if (txn.metadata.partitionColumns.nonEmpty) numPartitionsAfterSkipping = Some(numCandidatePartitions) numPartitionsRemovedFrom = Some(numCandidatePartitions) numPartitionsAddedTo = Some(0) candidateFiles.map(_.removeWithTimestamp(operationTimestamp))
DeltaTableUtils.splitMetadataAndDataPredicates
是判断过滤条件是否是在元数据层能够囊括,如果可以的话,就通过val candidateFiles = txn.filterFiles(metadataPredicates)
来过滤出所有的文件,然后通过candidateFiles.map(_.removeWithTimestamp(operationTimestamp))
在元数据层面进行删除.
2. 如果不仅仅是从元数据层面进行过滤,则会过滤出所有满足条件的数据:val candidateFiles = txn.filterFiles(metadataPredicates ++ otherPredicates) numFilesAfterSkipping = candidateFiles.size val (numCandidateBytes, numCandidatePartitions) = totalBytesAndDistinctPartitionValues(candidateFiles) numBytesAfterSkipping = numCandidateBytes if (txn.metadata.partitionColumns.nonEmpty) numPartitionsAfterSkipping = Some(numCandidatePartitions) val nameToAddFileMap = generateCandidateFileMap(deltaLog.dataPath, candidateFiles) val fileIndex = new TahoeBatchFileIndex( sparkSession, "delete", candidateFiles, deltaLog, deltaLog.dataPath, txn.snapshot) // Keep everything from the resolved target except a new TahoeFileIndex // that only involves the affected files instead of all files. val newTarget = DeltaTableUtils.replaceFileIndex(target, fileIndex) val data = Dataset.ofRows(sparkSession, newTarget) val deletedRowCount = metrics("numDeletedRows") val deletedRowUdf = udf () => deletedRowCount += 1 true .asNondeterministic() val filesToRewrite = withStatusCode("DELTA", FINDING_TOUCHED_FILES_MSG) if (candidateFiles.isEmpty) Array.empty[String] else data .filter(new Column(cond)) .filter(deletedRowUdf()) .select(new Column(InputFileName())).distinct() .as[String].collect() numRemovedFiles = filesToRewrite.length scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
val candidateFiles = txn.filterFiles(metadataPredicates ++ otherPredicates)
这一步是过滤出满足条件的文件,大概思路是根据列的统计信息来过滤出不存在的文件,从而山选出可能存在的文件,val nameToAddFileMap = generateCandidateFileMap(deltaLog.dataPath, candidateFiles)
得到具体的文件路径val data = Dataset.ofRows(sparkSession, newTarget)
这一步是替换了fileIndex之后的datasetval filesToRewrite = ...
是得到需要重写的文件
val baseRelation = buildBaseRelation( sparkSession, txn, "delete", deltaLog.dataPath, filesToRewrite, nameToAddFileMap) // Keep everything from the resolved target except a new TahoeFileIndex // that only involves the affected files instead of all files. val newTarget = DeltaTableUtils.replaceFileIndex(target, baseRelation.location) val targetDF = Dataset.ofRows(sparkSession, newTarget) val filterCond = Not(EqualNullSafe(cond, Literal.TrueLiteral)) val rewrittenActions = rewriteFiles(txn, targetDF, filterCond, filesToRewrite.length) val (changeFiles, rewrittenFiles) = rewrittenActions .partition(_.isInstanceOf[AddCDCFile]) ... val operationTimestamp = System.currentTimeMillis() removeFilesFromPaths(deltaLog, nameToAddFileMap, filesToRewrite, operationTimestamp) ++ rewrittenActions
-
val rewrittenActions = rewriteFiles(txn, targetDF, filterCond, filesToRewrite.length)
会根据每一行是否满足条件来增加一个名为“_change_type”值为“delete”的列:baseData .filter(numTouchedRowsUdf()) .withColumn( CDC_TYPE_COLUMN_NAME, new Column( If(filterCondition, typedLit[String](CDC_TYPE_NOT_CDC).expr, lit(CDC_TYPE_DELETE).expr) ) ... txn.writeFiles(dfToWrite)
-
这里的
txn.writeFiles(dfToWrite)
会进行根据是否存在“_change_type” 列来插入名为“__is_cdc”, 值为“true或false”的列,同时也会增加分区列“__is_cdc”
注意在txn.writeFiles(dfToWrite)
如下代码块中:val committer = getCommitter(outputPath)
会得到
DelayedCommitProtocol
对象,这个对象里的newTaskTempFile
方法,会对CDC的数据做额外处理:else if (subDir.startsWith(cdcPartitionTrue)) val cleanedSubDir = cdcPartitionTrueRegex.replaceFirstIn(subDir, CDC_LOCATION) new Path(cleanedSubDir, filename)
其中CDC_LOCATION的值为“_change_data”,这样所有有变化的数据就存在了“_change_data”目录下了.
-
在
writeFiles
方法最后会返回两种元数据文件,如下:val resultFiles = committer.addedStatuses.map a => a.copy(stats = optionalStatsTracker.map( _.recordedStats(new Path(new URI(a.path)).getName)).getOrElse(a.stats)) resultFiles.toSeq ++ committer.changeFiles
addedStatuses
返回的是AddFile
文件,这表明是没有改动的文件,changeFiles
返回的是AddCDCFile
文件,这表明的是满足条件的需要被delete的文件
其实这两种文件的区分最终还是在commitTask
方法中,
commitTask
中的buildActionFromAddedFile
方法会根据__is_cdc=true
来区分AddCDCFile
和AddFile
文件, 如果存在,则是AddCDCFile
,否则是AddFile
,代码如下:
最终会被commitTask方法调用,最终传递到Driver端,这样rewrite就会返回两种元数据文件。请注意这里并还没有进行元数据的操作,真正的元数据的操作在val partitioning = f._1.filter case (k, v) => k != CDC_PARTITION_COL f._1.get(CDC_PARTITION_COL) match case Some("true") => val partitioning = f._1.filter case (k, v) => k != CDC_PARTITION_COL AddCDCFile(f._2, partitioning, stat.getLen) case _ => val addFile = AddFile(f._2, partitioning, stat.getLen, stat.getModificationTime, true) addFile
txn.commit
中
-
-
val (changeFiles, rewrittenFiles) = rewrittenActions.partition(_.isInstanceOf[AddCDCFile])
根据两种元数据文件的不同进行metrics级别的记录 -
removeFilesFromPaths(deltaLog, nameToAddFileMap, filesToRewrite, operationTimestamp) ++ rewrittenActions
这一步主要是在元数据层面删除根据条件过滤出来的数据,因为该数据已经根据用户的条件已经处理完了。
结论
根据以上的分析,可以知道目前的CDF只是在Delte层级做了反馈,如果说想要在Flink层达到CDC的效果,还得有个中间层,把delta里的CDF的数据给读取出来,转换Flink 内部形式的ChangelogMode CDC格式(比如说INSERT("+I", (byte) 0)
,DELETE("-D", (byte) 3)
)
以上是关于Delta Lake中CDC的实现的主要内容,如果未能解决你的问题,请参考以下文章