在 torch.distributed 中,如何正确平均不同 GPU 上的梯度?

Posted

技术标签:

【中文标题】在 torch.distributed 中,如何正确平均不同 GPU 上的梯度?【英文标题】:In torch.distributed, how to average gradients on different GPUs correctly? 【发布时间】:2020-02-28 11:59:39 【问题描述】:

在torch.distributed中,如何正确平均不同GPU上的梯度?

修改自https://github.com/seba-1511/dist_tuto.pth/blob/gh-pages/train_dist.py,下面的代码可以成功使用两个GPU(可以用nvidia-smi检查)。

但难以理解的一点是,下面的“average_gradients”是否确实是在两个 GPU 上的两个模型上平均梯度的正确方法。像下面的代码一样,使用两个进程运行的两个“model = Net()”是两个不同 GPU 上的两个模型,但是“average_gradients(model)”行只是一个 GPU 上模型的“平均”梯度,而不是两个两个 GPU 上的模型。

问题是下面的代码确实是在两个 GPU 上平均梯度的正确方法吗?如果是真的,如何阅读,如何理解代码?如果不是,在以下两个模型上平均梯度的正确方法是什么?

导入操作系统 进口火炬 导入 torch.distributed 作为 dist 将 torch.nn 导入为 nn 导入 torch.nn.functional 作为 F 将 torch.optim 导入为 optim 从数学导入 ceil 从随机导入随机 from torch.multiprocessing 导入过程 从 torchvision 导入数据集,转换 os.environ["CUDA_VISIBLE_DEVICES"] = "0,1" 类分区(对象): """ 类似数据集的对象,但只访问它的一个子集。""" def __init__(self, data, index): self.data = 数据 self.index = 索引 def __len__(self): 返回 len(self.index) def __getitem__(self, index): data_idx = self.index[索引] 返回 self.data[data_idx] 类数据分区器(对象): """ 将数据集划分为不同的块。""" def __init__(self, data, size=[0.7, 0.2, 0.1], seed=1234): self.data = 数据 self.partitions = [] rng = 随机() rng.seed(种子) data_len = len(数据) 索引 = [x for x in range(0, data_len)] rng.shuffle(索引) 对于大小的压裂: part_len = int(frac * data_len) self.partitions.append(indexes[0:part_len]) 索引 = 索引[part_len:] def 使用(自我,分区): 返回分区(self.data,self.partitions[partition]) 类网络(nn.Module): """网络架构。""" def __init__(self): 超级(网络,自我).__init__() self.conv1 = nn.Conv2d(1, 10, kernel_size=5) self.conv2 = nn.Conv2d(10, 20, kernel_size=5) self.conv2_drop = nn.Dropout2d() self.fc1 = nn.Linear(320, 50) self.fc2 = nn.Linear(50, 10) def 前向(自我,x): x = F.relu(F.max_pool2d(self.conv1(x), 2)) x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2)) x = x.view(-1, 320) x = F.relu(self.fc1(x)) x = F.dropout(x, training=self.training) x = self.fc2(x) 返回 F.log_softmax(x) def partition_dataset(): """ 分区 MNIST """ 数据集 = 数据集.MNIST( '。/数据', 火车=真, 下载=真, 变换=变换。撰写([ transforms.ToTensor(), transforms.Normalize((0.1307, ), (0.3081, )) ])) 大小 = dist.get_world_size() bsz = int(256 / float(大小)) partition_sizes = [1.0 / _ in range(size) 的大小] 分区 = DataPartitioner(数据集,partition_sizes) 分区 = partition.use(dist.get_rank()) train_set = torch.utils.data.DataLoader( 分区,batch_size=bsz,shuffle=True) 返回 train_set, bsz def 平均梯度(模型): """梯度平均。""" 大小 = 浮动(dist.get_world_size()) 对于 model.parameters() 中的参数: dist.all_reduce(param.grad.data, op=dist.reduce_op.SUM) param.grad.data /= 大小 def 运行(等级,大小): """ 分布式同步 SGD 示例 """ # print("107 尺寸 = ", 尺寸) # print("dist.get_world_size() = ", dist.get_world_size()) ## 2 torch.manual_seed(1234) train_set, bsz = partition_dataset() device = torch.device("cuda:".format(rank)) 模型 = 净() 模型 = 模型.to(设备) 优化器 = optim.SGD(model.parameters(), lr=0.01, 动量=0.5) num_batches = ceil(len(train_set.dataset) / float(bsz)) 对于范围内的纪元(10): epoch_loss = 0.0 对于数据,train_set 中的目标: # 数据,目标 = 变量(数据),变量(目标) # 数据, 目标 = 变量(data.cuda(rank)), 变量(target.cuda(rank)) 数据,目标 = data.to(设备),target.to(设备) 优化器.zero_grad() 输出 = 模型(数据) 损失= F.nll_loss(输出,目标) epoch_loss += loss.item() loss.backward() 平均梯度(模型) 优化器.step() print('排名', dist.get_rank(), ', 纪元 ', 纪元, ': ', epoch_loss / num_batches) # 如果纪元 == 4: # 从实用程序导入 module_utils # module_utils.save_model() def init_processes(rank, size, fn, backend='gloo'): """初始化分布式环境。""" os.environ['MASTER_ADDR'] = '127.0.0.1' os.environ['MASTER_PORT'] = '29500' dist.init_process_group(后端,rank=rank,world_size=size) fn(等级,大小) 如果 __name__ == "__main__": 大小 = 2 进程 = [] 对于范围(大小)中的排名: p = 进程(目标 = 初始化进程,参数 =(排名,大小,运行)) p.start() processes.append(p) 对于进程中的 p: p.join()

【问题讨论】:

你终于弄明白了吗? 我的解决方案是使用 DistributedDataParallel 而不是 DataParallel,如下所示。经过一些调查,还发现 DataParallel 的性能比 DistributedDataParallel 差很多,因此 DataParallel 的例子可以忽略。 【参考方案1】:

我的解决方案是使用 DistributedDataParallel 而不是 DataParallel,如下所示。

代码

for param in self.model.parameters():
    torch.distributed.all_reduce(param.grad.data)

可以成功运行。

class DDPOptimizer:
    def __init__(self, model, torch_optim=None, learning_rate=None):
        """
        :param parameters:
        :param torch_optim: like torch.optim.Adam(parameters, lr=learning_rate, eps=1e-9)
            or optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
        :param is_ddp:
        """
        if torch_optim is None:
            torch_optim = torch.optim.Adam(model.parameters(), lr=3e-4, eps=1e-9)

        if learning_rate is not None:
            torch_optim.defaults["lr"] = learning_rate

        self.model = model
        self.optimizer = torch_optim

    def optimize(self, loss):
        self.optimizer.zero_grad()
        loss.backward()
        for param in self.model.parameters():
            torch.distributed.all_reduce(param.grad.data)

        self.optimizer.step()
    pass

def run():
    """ Distributed Synchronous SGD Example """

    module_utils.initialize_torch_distributed()
    start = time.time()

    train_set, bsz = partition_dataset()
    model = Net()

    local_rank = torch.distributed.get_rank()
    device = torch.device("cuda", local_rank)
    model = model.to(device)

    sgd = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
    optimizer = DDPOptimizer(model, torch_optim=sgd)

    # optimizer = NoamOptimizerDistributed(100, 1, 10, model)

    num_batches = math.ceil(len(train_set.dataset) / float(bsz))

    epoch, end_epoch = 1, 10

    while epoch <= end_epoch:
        epoch_loss = 0.0
        for data, target in train_set:
            data, target = data.to(device), target.to(device)

            output = model(data)
            loss = F.nll_loss(output, target)
            epoch_loss += loss.item()
            optimizer.optimize(loss)

        print('Rank ', dist.get_rank(), ', epoch ', epoch, ': ', epoch_loss / num_batches)
        # if epoch % 6 == 0:
        #     if local_rank == 0:
        #         module_utils.save_model(model, "a.pt")
        epoch += 1

    print("Time take to train: ", time.time() - start)

【讨论】:

谢谢你,汤姆。请问您是如何处理验证功能的?它是分布式的还是您在单台机器上运行? 嗨,Pooria,我在一台机器上训练 DPP。关于验证,我处理得不是很好,只是检查不同本地等级的验证结果或损失。例如,检查 2-4 损失是否接近并且看起来都不错。【参考方案2】:

在单台机器上运行 DPP。而关于验证,我处理的不是很好,只是检查验证结果。

【讨论】:

以上是关于在 torch.distributed 中,如何正确平均不同 GPU 上的梯度?的主要内容,如果未能解决你的问题,请参考以下文章

torch.distributed.barrier() 是如何工作的

将 torch.distributed.rpc.rpc_aync 放在不同的 .py 文件时,结果会有所不同

torch.distributed.init_process_group(‘gloo’, init_method=‘file://tmp/somefile’, rank=0, world_size=1

AttributeError: module ‘torch.distributed‘ has no attribute ‘_all_gather_base‘

AttributeError: module ‘torch.distributed‘ has no attribute ‘_all_gather_base‘

GETTING STARTED WITH DISTRIBUTED DATA PARALLE