horovod使用学习之一 -- hvd.DistributedOptimizer(optimizer)

Posted lixiaolun

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了horovod使用学习之一 -- hvd.DistributedOptimizer(optimizer)相关的知识,希望对你有一定的参考价值。

horovod使用学习方式参考:https://github.com/uber/horovod#usage

To use Horovod, make the following additions to your program:
添加下述代码到程序中,就可以使用Horovod来分布式运行tensorflow程序:

  1. Run hvd.init().
    添加hvd.init()

  2. Pin a server GPU to be used by this process using session_config.gpu_options.visible_device_list. With the typical setup of one GPU per process, this can be set to local rank. In that case, the first process on the server will be allocated the first GPU, second process will be allocated the second GPU and so forth.
    使用session_config.gpu_options.visible_device_list指定要使用机器的哪个GPU。如果想让一个进程使用一个GPU,该值可以设置为local rank,
    例如:session_config.gpu_options.visible_device_list = str(hvd.local_rank())。
    这样设置的话,机器上的第一个GPU就被分配给了第一个进程;第二个GPU就被分配给了第二个进程;依此类推。

  3. Scale the learning rate by number of workers. Effective batch size in synchronous distributed training is scaled by the number of workers. An increase in learning rate compensates for the increased batch size.
    学习率需要放到到原来的n倍,n等于worker的个数。同步分布式训练的实际批的大小也根据worker个数扩容,学习率的增长要和批大小的增长保持一致。
    例如:opt = tf.train.RMSPropOptimizer(0.001 * hvd.size())

  4. Wrap optimizer in hvd.DistributedOptimizer. The distributed optimizer delegates gradient computation to the original optimizer, averages gradients using allreduce or allgather, and then applies those averaged gradients.
    在hvd.DistributedOptimizer中封装一个tf.optimizer。 DistributedOptimizer会先调用被封装的optimizer的compute_gradient()方法,然后使用allreduce或者allgather获得梯度并计算梯度的均值,然后再应用平均梯度。
    本节详细介绍hvd.DistributedOptimizer。

  5. Add hvd.BroadcastGlobalVariablesHook(0) to broadcast initial variable states from rank 0 to all other processes. This is necessary to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint. Alternatively, if you‘re not using MonitoredTrainingSession, you can simply execute the hvd.broadcast_global_variables op after global variables have been initialized.
    使用hvd.BroadcastGlobalVariablesHook(0)可以从rank 0向其他所有进程广播初始变量的值。以此来保证在训练开始或者从checkpoint恢复时,所有worker上初始值是相同的。
    或者,如果没有使用MonitoredTrainingSession,可以在所有全局变量初始化之后,直接执行hvd.broadcast_global_variables操作来广播初始值。

  6. Modify your code to save checkpoints only on worker 0 to prevent other workers from corrupting them. This can be accomplished by passing checkpoint_dir=None to tf.train.MonitoredTrainingSession if hvd.rank() != 0.
    只有在rank 0上需要保存checkpoint,例如: checkpoint_dir = ‘./checkpoints‘ if hvd.rank() == 0 else None。

Example (see the examples directory for full training examples):

import tensorflow as tf
import horovod.tensorflow as hvd


# Initialize Horovod
hvd.init()

# Pin GPU to be used to process local rank (one GPU per process)
config = tf.ConfigProto()
config.gpu_options.visible_device_list = str(hvd.local_rank())

# Build model...
loss = ...
opt = tf.train.AdagradOptimizer(0.01 * hvd.size())

# Add Horovod Distributed Optimizer
opt = hvd.DistributedOptimizer(opt)

# Add hook to broadcast variables from rank 0 to all other processes during
# initialization.
hooks = [hvd.BroadcastGlobalVariablesHook(0)]

# Make training operation
train_op = opt.minimize(loss)

# Save checkpoints only on worker 0 to prevent other workers from corrupting them.
checkpoint_dir = ‘/tmp/train_logs‘ if hvd.rank() == 0 else None

# The MonitoredTrainingSession takes care of session initialization,
# restoring from a checkpoint, saving to a checkpoint, and closing when done
# or an error occurs.
with tf.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir,
                                       config=config,
                                       hooks=hooks) as mon_sess:
  while not mon_sess.should_stop():
    # Perform synchronous training.
    mon_sess.run(train_op)

  

本节就重点学习hvd.DistributedOptimizer类。

# DistributedOptimizer继承了tf.train.Optimizer
class DistributedOptimizer(tf.train.Optimizer):
    """An optimizer that wraps another tf.Optimizer, using an allreduce to
    average gradient values before applying gradients to model weights.

       DistributedOptimizer封装了另外一个tf.Optimizer,在模型应用梯度之前
    使用allreduce操作收集梯度值并求其均值。
    """

    def __init__(self, optimizer, name=None, use_locking=False, device_dense=‘‘,
                 device_sparse=‘‘):
        """Construct a new DistributedOptimizer, which uses another optimizer
        under the hood for computing single-process gradient values and
        applying gradient updates after the gradient values have been averaged
        across all the Horovod ranks.

        Args:
          optimizer:
            Optimizer to use for computing gradients and applying updates.
          name:
            Optional name prefix for the operations created when applying
            gradients. Defaults to "Distributed" followed by the provided
            optimizer type.
          use_locking:
            Whether to use locking when updating variables.
            See Optimizer.__init__ for more info.
          device_dense:
            Device to be used for dense tensors. Uses GPU by default
            if Horovod was build with HOROVOD_GPU_ALLREDUCE.
          device_sparse:
            Device to be used for sparse tensors. Uses GPU by default
            if Horovod was build with HOROVOD_GPU_ALLGATHER.
        """
        if name is None:
            name = "Distributed{}".format(type(optimizer).__name__)

        self._optimizer = optimizer
        self._device_dense = device_dense
        self._device_sparse = device_sparse
        super(DistributedOptimizer, self).__init__(
            name=name, use_locking=use_locking)

    def compute_gradients(self, *args, **kwargs):
        """Compute gradients of all trainable variables.

        See Optimizer.compute_gradients() for more info.

        In DistributedOptimizer, compute_gradients() is overridden to also
        allreduce the gradients before returning them.
        """
        # self._optimizer表示原始的optimizer
        # 调用其compute_gradients()方法来计算所有训练参数的梯度
        # compute_gradients()方法返回一个元祖(gradient,variable)的列表
        gradients = self._optimizer.compute_gradients(*args, **kwargs)
        # size()表示worker的个数,如果size() > 1,表示分布式运行
        if size() > 1:
            # 分布式运行需要计算平均梯度值
            averaged_gradients = []
            with tf.name_scope(self._name + "_Allreduce"):
                # 遍历元祖(gradient,variable)的列表
                for grad, var in gradients:
                    if grad is not None:
                        # 使用allreduce()与其他worker同步grad
                        # allreduce():Perform an allreduce on a tf.Tensor or tf.IndexedSlices
                        avg_grad = allreduce(grad, device_dense=self._device_dense,
                                             device_sparse=self._device_sparse)
                        # 将同步后的avg_grad添加到列表中
                        averaged_gradients.append((avg_grad, var))
                    else:
                        averaged_gradients.append((None, var))
            return averaged_gradients
        else:
            return gradients

    def apply_gradients(self, *args, **kwargs):
        """Calls this same method on the underlying optimizer."""
        return self._optimizer.apply_gradients(*args, **kwargs)

    def get_slot(self, *args, **kwargs):
        """Calls this same method on the underlying optimizer."""
        return self._optimizer.get_slot(*args, **kwargs)

    def get_slot_names(self, *args, **kwargs):
        """Calls this same method on the underlying optimizer."""
        return self._optimizer.get_slot_names(*args, **kwargs)

    def variables(self, *args, **kwargs):
        """Calls this same method on the underlying optimizer."""
        return self._optimizer.variables(*args, **kwargs)

 

以上是关于horovod使用学习之一 -- hvd.DistributedOptimizer(optimizer)的主要内容,如果未能解决你的问题,请参考以下文章

[源码解析] 深度学习分布式训练框架 horovod --- 后台线程架构

LF DL的Horovod项目增加了对PySpark和Apache MXNet的支持以及其他功能,以加快培训速度

ubuntu 配置 Horovod环境

如何修复:horovod.run.common.util.network.NoValidAddressesFound

CoRR 2018 | Horovod: Fast and Easy Distributed Deep Learning in Tensorflow

Tensorflow镜像策略和Horovod分布策略