Delta Lake中CDC的实现

Posted 鸿乃江边鸟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Delta Lake中CDC的实现相关的知识,希望对你有一定的参考价值。

背景

本文基于delta 2.0.0
Delta是通过CDF(change data feed)来实现CDC(change data capture)。
CDF是能让表能够输出数据表变化的能力,CDC是能够捕获和识别数据的变化,并能够将变化的数据交给下游做进一步的处理。
我们来分析一下是怎么做到数据行级别的CDF的

分析

在设置delta.enableChangeDataFeed= true的前提下(在Enable change data fedd有提及),我们分析一下逻辑计划DeltaDelete对应的RunnableCommand DeleteCommand的Run方法:

 final override def run(sparkSession: SparkSession): Seq[Row] = 
    recordDeltaOperation(deltaLog, "delta.dml.delete") 
      deltaLog.assertRemovable()
      deltaLog.withNewTransaction  txn =>
        val deleteActions = performDelete(sparkSession, deltaLog, txn)
        if (deleteActions.nonEmpty) 
          txn.commit(deleteActions, DeltaOperations.Delete(condition.map(_.sql).toSeq))
        
      
      // Re-cache all cached plans(including this relation itself, if it's cached) that refer to
      // this data source relation.
      sparkSession.sharedState.cacheManager.recacheByPlan(sparkSession, target)
    

    Seq.empty[Row]
  

最重要的方法是performDelete,这里的performDelete方法会根据condition来做不同的操作:

  • 如果没有条件,那就是全部删除:
    case None =>
      // Case 1: Delete the whole table if the condition is true
      val allFiles = txn.filterFiles(Nil)

      numRemovedFiles = allFiles.size
      scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
      val (numBytes, numPartitions) = totalBytesAndDistinctPartitionValues(allFiles)
      numBytesRemoved = numBytes
      numFilesBeforeSkipping = numRemovedFiles
      numBytesBeforeSkipping = numBytes
      numFilesAfterSkipping = numRemovedFiles
      numBytesAfterSkipping = numBytes
      if (txn.metadata.partitionColumns.nonEmpty) 
        numPartitionsAfterSkipping = Some(numPartitions)
        numPartitionsRemovedFrom = Some(numPartitions)
        numPartitionsAddedTo = Some(0)
      
      val operationTimestamp = System.currentTimeMillis()
      allFiles.map(_.removeWithTimestamp(operationTimestamp))

这里列举该表的所有文件,并把所有的文件标识为RemoveFile,并带上时间戳

  • 如果是有筛选条件,则会根据筛选条件进行行级别的记录变更(这里还会根据情况具体分析):

    1. 能够从delta的元数据能够囊括过滤的条件,则只是在元数据层面进行修改:
       case Some(cond) =>
         val (metadataPredicates, otherPredicates) =
           DeltaTableUtils.splitMetadataAndDataPredicates(
             cond, txn.metadata.partitionColumns, sparkSession)
    
         numFilesBeforeSkipping = txn.snapshot.numOfFiles
         numBytesBeforeSkipping = txn.snapshot.sizeInBytes
    
         if (otherPredicates.isEmpty) 
           // Case 2: The condition can be evaluated using metadata only.
           //         Delete a set of files without the need of scanning any data files.
           val operationTimestamp = System.currentTimeMillis()
           val candidateFiles = txn.filterFiles(metadataPredicates)
    
           scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
           numRemovedFiles = candidateFiles.size
           numBytesRemoved = candidateFiles.map(_.size).sum
           numFilesAfterSkipping = candidateFiles.size
           val (numCandidateBytes, numCandidatePartitions) =
             totalBytesAndDistinctPartitionValues(candidateFiles)
           numBytesAfterSkipping = numCandidateBytes
           if (txn.metadata.partitionColumns.nonEmpty) 
             numPartitionsAfterSkipping = Some(numCandidatePartitions)
             numPartitionsRemovedFrom = Some(numCandidatePartitions)
             numPartitionsAddedTo = Some(0)
           
           candidateFiles.map(_.removeWithTimestamp(operationTimestamp))
         
    

    DeltaTableUtils.splitMetadataAndDataPredicates是判断过滤条件是否是在元数据层能够囊括,如果可以的话,就通过val candidateFiles = txn.filterFiles(metadataPredicates)来过滤出所有的文件,然后通过candidateFiles.map(_.removeWithTimestamp(operationTimestamp))在元数据层面进行删除.
    2. 如果不仅仅是从元数据层面进行过滤,则会过滤出所有满足条件的数据:

          val candidateFiles = txn.filterFiles(metadataPredicates ++ otherPredicates)
          numFilesAfterSkipping = candidateFiles.size
          val (numCandidateBytes, numCandidatePartitions) =
            totalBytesAndDistinctPartitionValues(candidateFiles)
          numBytesAfterSkipping = numCandidateBytes
          if (txn.metadata.partitionColumns.nonEmpty) 
            numPartitionsAfterSkipping = Some(numCandidatePartitions)
          
          val nameToAddFileMap = generateCandidateFileMap(deltaLog.dataPath, candidateFiles)
          val fileIndex = new TahoeBatchFileIndex(
            sparkSession, "delete", candidateFiles, deltaLog, deltaLog.dataPath, txn.snapshot)
          // Keep everything from the resolved target except a new TahoeFileIndex
          // that only involves the affected files instead of all files.
          val newTarget = DeltaTableUtils.replaceFileIndex(target, fileIndex)
          val data = Dataset.ofRows(sparkSession, newTarget)
          val deletedRowCount = metrics("numDeletedRows")
          val deletedRowUdf = udf  () =>
            deletedRowCount += 1
            true
          .asNondeterministic()
          val filesToRewrite =
            withStatusCode("DELTA", FINDING_TOUCHED_FILES_MSG) 
              if (candidateFiles.isEmpty) 
                Array.empty[String]
               else 
                data
                  .filter(new Column(cond))
                  .filter(deletedRowUdf())
                  .select(new Column(InputFileName())).distinct()
                  .as[String].collect()
              
            
    
          numRemovedFiles = filesToRewrite.length
          scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
    
    • val candidateFiles = txn.filterFiles(metadataPredicates ++ otherPredicates) 这一步是过滤出满足条件的文件,大概思路是根据列的统计信息来过滤出不存在的文件,从而山选出可能存在的文件,
    • val nameToAddFileMap = generateCandidateFileMap(deltaLog.dataPath, candidateFiles) 得到具体的文件路径
    • val data = Dataset.ofRows(sparkSession, newTarget)这一步是替换了fileIndex之后的dataset
    • val filesToRewrite = ... 是得到需要重写的文件
     val baseRelation = buildBaseRelation(
     sparkSession, txn, "delete", deltaLog.dataPath, filesToRewrite, nameToAddFileMap)
      // Keep everything from the resolved target except a new TahoeFileIndex
      // that only involves the affected files instead of all files.
     val newTarget = DeltaTableUtils.replaceFileIndex(target, baseRelation.location)
     val targetDF = Dataset.ofRows(sparkSession, newTarget)
     val filterCond = Not(EqualNullSafe(cond, Literal.TrueLiteral))
     val rewrittenActions = rewriteFiles(txn, targetDF, filterCond, filesToRewrite.length)
     val (changeFiles, rewrittenFiles) = rewrittenActions
         .partition(_.isInstanceOf[AddCDCFile])
         ...
     val operationTimestamp = System.currentTimeMillis()
     removeFilesFromPaths(deltaLog, nameToAddFileMap, filesToRewrite, operationTimestamp) ++
     rewrittenActions   
    
    • val rewrittenActions = rewriteFiles(txn, targetDF, filterCond, filesToRewrite.length) 会根据每一行是否满足条件来增加一个名为“_change_type”值为“delete”的列:
    ```
         baseData
        .filter(numTouchedRowsUdf())
        .withColumn(
          CDC_TYPE_COLUMN_NAME,
          new Column(
            If(filterCondition, typedLit[String](CDC_TYPE_NOT_CDC).expr,
            lit(CDC_TYPE_DELETE).expr)
          )
       ...
       txn.writeFiles(dfToWrite)
    ```
     1. 这里的` txn.writeFiles(dfToWrite)` 会进行根据是否存在“_change_type” 列来插入名为“__is_cdc”, 值为“true或false”的列,同时也会增加分区列“__is_cdc”
     注意在`txn.writeFiles(dfToWrite)`如下代码块中:
      ```
      val committer = getCommitter(outputPath)
      ```
      会得到`DelayedCommitProtocol`对象,这个对象里的`newTaskTempFile`方法,会对CDC的数据做额外处理:
      ```
        else if (subDir.startsWith(cdcPartitionTrue)) 
         val cleanedSubDir = cdcPartitionTrueRegex.replaceFirstIn(subDir, CDC_LOCATION)
         new Path(cleanedSubDir, filename)
      ```
      其中*CDC_LOCATION*的值为“_change_data”,这样所有有变化的数据就存在了“_change_data”目录下了.
     2. 在`writeFiles`方法最后会返回两种元数据文件,如下:
      ```
       val resultFiles = committer.addedStatuses.map  a =>
       a.copy(stats = optionalStatsTracker.map(
         _.recordedStats(new Path(new URI(a.path)).getName)).getOrElse(a.stats))
       
    
      resultFiles.toSeq ++ committer.changeFiles
      ```
       - `addedStatuses`返回的是`AddFile`文件,这表明是没有改动的文件,
       - `changeFiles`返回的是`AddCDCFile`文件,这表明的是满足条件的需要被delete的文件
          其实这两种文件的区分最终还是在`commitTask`方法中,     
          `commitTask`中的`buildActionFromAddedFile`方法会根据`__is_cdc=true`来区分`AddCDCFile`和`AddFile`文件, 如果存在,则是`AddCDCFile`,否则是`AddFile`,代码如下:
           ```
           val partitioning = f._1.filter  case (k, v) => k != CDC_PARTITION_COL 
           f._1.get(CDC_PARTITION_COL) match 
            case Some("true") =>
              val partitioning = f._1.filter  case (k, v) => k != CDC_PARTITION_COL 
              AddCDCFile(f._2, partitioning, stat.getLen)
            case _ =>
              val addFile = AddFile(f._2, partitioning, stat.getLen, stat.getModificationTime, true)
              addFile
          ```
          最终会被commitTask方法调用,最终传递到Driver端,这样rewrite就会返回两种元数据文件。请注意这里并还没有进行元数据的操作,真正的元数据的操作在` txn.commit`中
    
    • val (changeFiles, rewrittenFiles) = rewrittenActions.partition(_.isInstanceOf[AddCDCFile])根据两种元数据文件的不同进行metrics级别的记录
    • removeFilesFromPaths(deltaLog, nameToAddFileMap, filesToRewrite, operationTimestamp) ++ rewrittenActions
      这一步主要是在元数据层面删除根据条件过滤出来的数据,因为该数据已经根据用户的条件已经处理完了。

结论

根据以上的分析,可以知道目前的CDF只是在Delte层级做了反馈,如果说想要在Flink层达到CDC的效果,还得有个中间层,把delta里的CDF的数据给读取出来,转换Flink 内部形式的ChangelogMode CDC格式(比如说INSERT("+I", (byte) 0),DELETE("-D", (byte) 3))

以上是关于Delta Lake中CDC的实现的主要内容,如果未能解决你的问题,请参考以下文章

深入剖析 Delta Lake: MySQL CDC 实战

深入剖析 Delta Lake: MySQL CDC 实战

pyspark delta-lake 元存储

Delta Lake 版本管理(13)

Delta Lake 提供纯 ScalaJavaPython 操作 API,和 Flink 整合更加容易

Pyspark Delta Lake 捕获表不是 delta 表异常