Check-N-Run: a Checkpointing System for Training Deep Learning Recommendation Models | NSDI‘ 22

Posted 6+2 :)

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Check-N-Run: a Checkpointing System for Training Deep Learning Recommendation Models | NSDI‘ 22相关的知识,希望对你有一定的参考价值。

分布式机器学习论文分享 | Check-N-Run: a Checkpointing System for Training Deep Learning Recommendation Models | NSDI’ 22

这次和大家分享一篇刚刚放出的论文,来自于2022年NSDI (USENIX Symposium on Networked Systems Design and Implementation, CCF A类) 。该论文由Facebook主导,首次(据作者所述)针对大规模工业训练深度学习推荐系统的场景提出了一个检查点系统,在不损害测试准确率的情况下节省存储空间、存储带宽和网络带宽。该系统具有增量化动态量化的设计,在大规模真实工业负载的实验中,该系统可以减少6-17x 带宽和__2.5-8x__ 存储空间。

图1 论文作者

😜作者简介

作者都是来自Facebook的,其中通讯作者Murali Annavaram还是USC (University of Southern California) 的教授。一作是Assaf Eisenman,近几年常在NSDI收录名单上看到他,现在他去了斯坦佛大学。

😜预备知识

为了使所有读者更好地理解论文的动机和核心设计,本节我们简要介绍一些有关推荐模型、检查点和检查点系统的预备知识。

推荐模型

推荐系统根据用户过去的行为以及物品之间的联系等信息,为用户推荐更适合用户的物品。推荐系统的核心是训练大规模的推荐模型(可达TB级别),一般需要训练几天。值得一提的是,训练深度的推荐模型占用了Facebook工业集群超过80%的推理周期和超过50%的训练周期。

输入到推荐模型的原始数据包括稠密的特征信息(例如用户常购买的物品的信息)和海量稀疏的特征信息(例如用户不常购买的物品的信息),稠密的数据适用于一般的机器学习模型(例如多层感知机MLP),但如果直接用稀疏的特征数据训练机器学习模型,将吃力不讨好。因此对于海量的稀疏特征信息,推荐模型常使用embedding层,将高维稀疏的特征数据映射到低维空间中,得到稠密的表征信息。

图2 典型的推荐模型

检查点

检查点类似于电子游戏里的存档点,在机器学习中指的是某个时间点对训练过程的一次快照,保存在非易失存储中,主要包括模型各层的参数、优化器的状态、读取器的状态以及一些相关的指标(例如训练损失值和验证集准确率)。在真实工业场景下,检查点可用于快速从故障中恢复训练,以避免重新训练,浪费大量的资源。检查点还可用于在线训练(在训练的同时为用户提供推理服务)和迁移学习。

解耦的检查点系统

作者搭的检查点系统有多个训练集群,每个训练集群包括16个节点,每个节点有8个GPU和多核CPU,为了支持高性能训练,作者将训练集群解耦,集群里的GPU用来训练多个并行的复杂工业作业,CPU使用多进程并行处理多个模型快照,这种解耦式的训练集群创建一个检查点的时间几乎对训练速度没有影响(按照作者的实验,创建一个检查点只需要不到0.4%的训练时间)。同时,作者将读取数据和写入检查点到非易失存储的过程与训练过程解耦,将读取器和检查点存储在不同的集群中,进一步提高了训练的性能。

图3 解耦的检查点系统

😜研究动机

训练失败

由于在Facebook的检查点系统中读取集群、训练集群和检查点集群是分散的,很难保证训练过程中的每一个步骤不会出错。作者统计了包括21个训练集群的系统中,一个月的训练失败作业的训练失败时间(开始训练至训练失败的时间)。 由图4所示,有效的检查点系统是训练正常进行的关键,尤其是对于长期运行的训练作业,如果缺少有效的检查点,将会花费大量的时间重新训练,甚至训练任务可能永远不会完成。

图4 训练失败时间

愈加庞大的推荐模型

作者发现过去两年Facebook训练的推荐模型大小增加了3×以上,越大的模型将会导致训练在集群中更分散,训练失败的概率会更高。同时不断增大的TB级别的推荐模型会导致检查点面临存储容量和带宽的瓶颈。

模型部分更新

作者观察Facebook最大的推荐模型在给定的训练时间内,只有一小部分的embedding向量被更新。图5显示了随训练样本数量的增加,模型更新部分占总模型大小的变化函数。从三个不同的初始点开始,该变化函数呈现相同的趋势。图六显示在给定的时间内,模型更新大小几乎不变。以上数据都表明,每次迭代中只有部分模型(部分embedding向量)更新。

图5 模型更新大小-训练样本数目

图6 模型更新大小-训练时间

😜核心设计

Check-N-Run是一个分布式检查点系统,支持对推荐模型进行大规模长期高性能的训练。图7展示了该系统各组件及其功能,核心为训练集群中的CPU上运行的组件。检查点的创建进程包括三个步骤(1)创建训练状态的内存快照(2)构建一个优化的检查点(3)将检查点写入存储,通常是检查点集群中的存储。这三个步骤都交由训练集训中的CPU来完成,GPU除了用于加速训练只用于检测哪些embedding向量变化了,这种解耦式设计尽可能减小了检查点对训练速度的影响。接下来我们介绍一下论文的核心设计,如何优化检查点的构建过程,即上述的第二步。

图7 Check-N-Run各组件及其功能

增量化

由于推荐模型训练过程中每次迭代只有部分模型更新,因此作者引入了增量检查点,并提出了3种类型的增量检查点,分别是One-Shot Incremental Checkpoint,Consecutive Incremental Checkpoint和Intermittent Incremental Checkpoint。



图8 三种增量检查点(个人创作,仅作参考)

第一种One-Shot检查点,第一次记录一次包括所有embedding向量的完整基线,在这之后每个增量检查点只记录自基线以来所有修改的部分。如果需要从检查点恢复至训练状态,只需要同时读取基线和相应的单个时间点的增量检查点。

第二种Consecutive检查点,也记录一次完整基线,在这之后每个增量检查点只记录自上个检查点以来修改的部分。这种增量检查点一次只需要记录较少的部分,但如果需要从检查点恢复,需要同时读取基线以及自基线以来所有的增量检查点。因此这种类型的增量检查点实际更耗费存储空间,因为不同增量检查点很可能具有相同部分的副本,而任一增量检查点都不能删除。但Consecutive增量检查点适用于online训练。

第三种Intermittent增量检查点是第一种的改进版本,也是这篇论文所采取的设计。其改进的地方在于,一段时间后,系统会更新新的包括所有embedding向量的完整基线,这样后续的增量检查点都只需要维护较小的增量视图。作者提出了一个基于过去信息的增量大小预测器,当过去一段时间的增量累积量≈预测的未来一段时间的增量累积量时,Check-N-Run会创建新的完整基线,而不是创建增量检查点。

动态量化

量化(Quantization)原是信号处理的一种方法,在机器学习中被认为是一种有效的模型压缩方法。若把量化应用于创建检查点(本文讨论的是对embedding向量进行量化,即量化粒度为向量),可大幅度减小检查点的大小,从而节省存储空间和带宽,然而使用被量化后的检查点恢复至训练状态,可能会损失训练的精度。

作者致力于将由于量化导致的精度衰减限制在工业允许范围内(<0.01%),即几乎不损失精度。作者讨论了均匀量化和非均匀量化,认为非对称的均匀量化在工业上最可行,并认为量化所导致的精度损失主要来源于xmin与xmax的设置,因此作者提出了一种自适应的贪心搜索算法来高效地搜索次佳的设置。


图9 量化过程和贪心搜索算法单次迭代(个人创作,仅作参考)

😜实验结果

实验所采用的训练集群类似于Nvidia HGX,详细信息可见https://images.nvidia.com/content/pdf/hgx2-datasheet.pdf

由于读取增量检查点时实际上读取的是完整的模型,因此Check-N-Run所造成的精度损失只会来源于量化,论文内的 Figure 14 说明梯度衰减与量化的幅度和消耗检查点的次数有关,Check-N-Run使用一种自适应的方法根据消耗检查点的次数寻找量化的幅度,使得精度衰减可以一直维持在<0.01%的范围内。

论文内的 Figure 15 和 Figure 16 分别说明了Intermittent增量检查点在带宽和存储空间上的优势。其中 Figure 15 第8个time interval内更新了新的完整基线,因此代表Intermittent增量检查点的红色直方突出来了。

论文内的 Figure 17 则说明了Check-N-Run的总体作用,即我们开头所讲的减少6-17x 带宽和2.5-8x 存储空间。



图10 实验结果图

😜讨论

减少分布式系统中各组件的耦合是提高性能的关键。

要维护分布式系统的一致性,应确保操作的原子性。

深度学习模型确实是增量更新的,每次迭代只更新一部分。若我们能捕捉每次部分更新的规律,就会提高我们对深度学习模型的理解。

Checkpoint彻底解密:Checkpoint的运行原理和源码实现彻底详解(DT大数据梦工厂)

内容:

1、Checkpoint重大价值;

2、Checkpoint运行原理图;

3、Checkpoint源码解析;

机器学习、图计算稍微复杂迭代算法的时候都有Checkpoint的身影,作用不亚于persist

==========Checkpoint到底是什么============

1、Spark 在生产环境下经常会面临transformation的RDD非常多(例如一个Job中包含1万个RDD)或者具体transformation的RDD本身计算特别复杂或者耗时(例如计算时长超过1个小时),这个时候就要考虑对计算结果数据的持久化;

2、Spark是擅长多步骤迭代的,同时擅长基于Job的复用,这个时候如果能够对曾经计算的过程产生的数据进行复用,就可以极大的提升效率;

3、如果采用persist把数据放在内存中,虽然是快速的,但是也是最不可靠的;如果把数据放在磁盘上,也不是完全可靠的!例如磁盘会损坏,系统管理员可能清空磁盘

4、Checkpoint的产生就是为了相对而言更加可靠的持久化数据,在Checkpoint的时候可以指定把数据放在本地,并且是多副本的方式,但是在生产环境下是放在HDFS上,这就天然的借助了HDFS高容错、高可靠的特征来完成了最大化的可靠的持久化数据的方式;

5、Checkpoint是为了最大程度保证绝对可靠的复用RDD计算数据的Spark高级功能,通过checkpoint我们通常把数据持久化到HDFS来保证数据最大程度的安全性;

6、Checkpoint就是针对整个RDD计算链条中特别需要数据持久化的环节(后面会反复使用当前环节的RDD)开始基于HDFS等的数据持久化复用策略,通过对RDD启动checkpoint机制来实现容错和高可用;

加入进行一个1万个步骤,在9000个步骤的时候persist,数据还是有可能丢失的,但是如果checkpoint,数据丢失的概率几乎为0。

==========Checkpoint原理机制============

1、在SparkContext中设置进行Checkpoint操作的RDD,把数据放在哪里,在生产集群中运行,必须是HDFS的路径,同时为了提高效率,在进行Checkpoint的时候,可以指定很多目录;

/**
 * Set the directory under which RDDs are going to be checkpointed. The directory must
 * be a HDFS path if running on a cluster.
 */
def setCheckpointDir(directory: String) {

  // If we are running on a cluster, log a warning if the directory is local.
  // Otherwise, the driver may attempt to reconstruct the checkpointed RDD from
  // its own local file system, which is incorrect because the checkpoint files
  // are actually on the executor machines.
  if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) {
    logWarning("Checkpoint directory must be non-local " +
      "if Spark is running on a cluster: " + directory)
  }

  checkpointDir Option(directory).map { dir =>
    val path = new Path(dirUUID.randomUUID().toString)
    val fs = path.getFileSystem(hadoopConfiguration)
    fs.mkdirs(path)
    fs.getFileStatus(path).getPath.toString
  }
}

2、RDD中的checkpoint,在进行这个操作的时候,其所依赖的所有的RDD都会从计算联调中清空掉,保存在之前设置的路径下,且所有parent级别的RDD都会被清空掉;

/**
 * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
 * directory set with `SparkContext#setCheckpointDirand all references to its parent
 * RDDs will be removed. This function must be called before any job has been
 * executed on this RDD. It is strongly recommended that this RDD is persisted in
 * memory, otherwise saving it on a file will require recomputation.
 */
def checkpoint(): Unit = RDDCheckpointData.synchronized {
  // NOTE: we use a global lock here due to complexities downstream with ensuring
  // children RDD partitions point to the correct parent partitions. In the future
  // we should revisit this consideration.
  if (context.checkpointDir.isEmpty) {
    throw new SparkException("Checkpoint directory has not been set in the SparkContext")
  } else if (checkpointData.isEmpty) {
    checkpointData Some(new ReliableRDDCheckpointData(this))
  }
}

3、作为最佳实践,一般在进行checkpoint方法,调用前通常都要进行persist来把当前RDD的数据持久化到内存或者磁盘上(上面代码的注释说只会存在内存中说法不准确),因为checkpoint是lazy级别的,必须要有Job的执行,且在Job执行完成之后才会从后往前回溯哪个RDD进行了checkpoint标记,然后对该标记了要进行Checkpoint的RDD新启动一个Job执行具体的Checkpoint的过程

4、因为会从计算链条清空,所以Checkpoint改变了RDD的Lineage(血统关系);

5、当我们调用了checkpoint对RDD进行checkpoint操作的话,此时框架会自动生成RDDCheckpointData,当RDD上运行过一个Job后,就会立即触发RDDCheckpoint中的checkpoint方法,在其内部会调用doCheckpoint,实际上在生产环境下会调用ReliableRDDCheckpointData的doCheckpoint,在生产环境下会导致ReliableCheckpointRDD的writeRDDToCheckpointDirectory的调用,而在writeRDDToCheckpointDirectory方法内部会触发runJob来执行把当前的RDD中的数据写到checkpoint的目录中,同时会产生ReliableCheckpointRDD实例返回;

/**
 * This class contains all the information related to RDD checkpointing. Each instance of this
 * class is associated with a RDD. It manages process of checkpointing of the associated RDD,
 * as well as, manages the post-checkpoint state by providing the updated partitions,
 * iterator and preferred locations of the checkpointed RDD.
 */
private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
  extends Serializable {

  import CheckpointState._

  // The checkpoint state of the associated RDD.
  protected var cpState Initialized

  // The RDD that contains our checkpointed data
  private var cpRDD: Option[CheckpointRDD[T]] = None

  // TODO: are we sure we need to use a global lock in the following methods?

  /**
   * Return whether the checkpoint data for this RDD is already persisted.
   */
  def isCheckpointedBoolean = RDDCheckpointData.synchronized {
    cpState == Checkpointed
  }

  /**
   * Materialize this RDD and persist its content.
   * This is called immediately after the first action invoked on this RDD has completed.
   */
  final def checkpoint(): Unit = {
    // Guard against multiple threads checkpointing the same RDD by
    // atomically flipping the state of this RDDCheckpointData
    RDDCheckpointData.synchronized {
      if (cpState == Initialized) {
        cpState CheckpointingInProgress
     else {
        return
      }
    }

    val newRDD = doCheckpoint()

    // Update our state and truncate the RDD lineage
    RDDCheckpointData.synchronized {
      cpRDD Some(newRDD)
      cpState Checkpointed
      rdd.markCheckpointed()
    }
  }

/**
 * Materialize this RDD and write its content to a reliable DFS.
 * This is called immediately after the first action invoked on this RDD has completed.
 */
protected override def doCheckpoint(): CheckpointRDD[T] = {
  val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rddcpDir)

  // Optionally clean our checkpoint files if the reference is out of scope
  if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints"false)) {
    rdd.context.cleaner.foreach { cleaner =>
      cleaner.registerRDDCheckpointDataForCleanup(newRDDrdd.id)
    }
  }

  logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}")
  newRDD
}

/**
 * Write RDD to checkpoint files and return a ReliableCheckpointRDD representing the RDD.
 */
def writeRDDToCheckpointDirectory[T: ClassTag](
    originalRDD: RDD[T],
    checkpointDir: String,
    blockSize: Int = -1): ReliableCheckpointRDD[T] = {

  val sc = originalRDD.sparkContext

  // Create the output path for the checkpoint
  val checkpointDirPath = new Path(checkpointDir)
  val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
  if (!fs.mkdirs(checkpointDirPath)) {
    throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath")
  }

  // Save to file, and reload it as an RDD
  val broadcastedConf = sc.broadcast(
    new SerializableConfiguration(sc.hadoopConfiguration))
  // TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582)
  sc.runJob(originalRDD,
    writePartitionToCheckpointFile[T](checkpointDirPath.toStringbroadcastedConf) _)

  if (originalRDD.partitioner.nonEmpty) {
    writePartitionerToCheckpointDir(scoriginalRDD.partitioner.getcheckpointDirPath)
  }

  val newRDD = new ReliableCheckpointRDD[T](
    sccheckpointDirPath.toStringoriginalRDD.partitioner)
  if (newRDD.partitions.length != originalRDD.partitions.length) {
    throw new SparkException(
      s"Checkpoint RDD $newRDD(${newRDD.partitions.length}) has different " +
        s"number of partitions from original RDD $originalRDD(${originalRDD.partitions.length})")
  }
  newRDD
}

技术分享

王家林老师名片:

中国Spark第一人

新浪微博:http://weibo.com/ilovepains

微信公众号:DT_Spark

博客:http://blog.sina.com.cn/ilovepains

手机:18610086859

QQ:1740415547

邮箱:[email protected]


本文出自 “一枝花傲寒” 博客,谢绝转载!

以上是关于Check-N-Run: a Checkpointing System for Training Deep Learning Recommendation Models | NSDI‘ 22的主要内容,如果未能解决你的问题,请参考以下文章

FlinkFailed to create checkpoint storage at checkpoint coordinator side

hadoop 异常 Inconsistent checkpoint fields

mysql 之 checkpoint和LSN详解

flink checkpoint

Flink 与rabbitmq集成 并开启checkpoint

FLink Checkpoint 介绍