简单介绍pytorch中分布式训练DDP使用 (结合实例,快速入门)

Posted 栋次大次

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了简单介绍pytorch中分布式训练DDP使用 (结合实例,快速入门)相关的知识,希望对你有一定的参考价值。

文章目录

DDP原理

DistributedDataParallel(DDP)支持多机多卡分布式训练。pytorch原生支持,本文简要总结下DDP的使用,多卡下的测试,并根据实际代码介绍。

voxceleb_trainer: 开源的声纹识别工具,简单好用,适合研究人员。

通俗理解:

  1. DDP模式会开启N个进程,每个进程在一张显卡上加载模型,这些模型相同(被复制了N份到N个显卡),缓解GIL锁的限制。
  2. 训练阶段,每个进程通过Ring-Reduce的方法与其他进程通讯(交换各自的梯度)
  3. 各个进程使用平均后的梯度更新自己的参数,因为每个进程下模型的初始参数、更新梯度是一样的,所以更新后模型的参数也保持一致。

DP模式出现的较早,支持单机多卡的训练,使用方法

model=torch.nn.DataParallel(model)

DP模式中只有一个进程,容易受到GIL的限制。master节点相当于参数服务器,向其他卡广播参数,在梯度反向传播后,每个卡将梯度汇总到master节点,master对梯度进行平均后更新参数,再将参数发送到其他卡上。

显而易见的,这种模式会导致节点的计算任务,通讯量很重,从而导致网络阻塞,降低训练速度。

强烈建议使用DDP

GIL是什么?为什么DDP更快?

GIL(全局解释器锁,可以参考GIL),主要的缺点就是:限制python进程只能利用一个CPU核心,不适合计算密集型的任务。使用多进程,才能有效利用多核的计算资源。DDP启动多进程,一定程度上避免了这个限制。

Ring-Reduce梯度合并:各个进程独立计算梯度,每个进程将梯度依次传给下一个进程,之后再把从上一个进程拿到的梯度传给下一个进程,循环n(进程数量)次之后,所有的进程就可以得到全部的梯度。

快的原因:每个进程只和自己上下游的两个进程进行通信,极大缓解了参数服务器的通讯阻塞现象。

通常讲,神经网络并行有三种:

  • Data parallelism: 数据并行,可以间接增大batch_size。一般常用的DP,DDP都是这种模式
  • Model parallelism: 模型并行,把模型放在不同的显卡上,计算是并行的。可能会加速,需要看实际的通信效率。
  • Workload Partitioning:把模型放在不同的显卡上,计算是串行的,不能加速。

pytorch中DDP使用

DDP推荐使用单进程单卡,就是一个模型放在一个卡上。

也可以单进程多卡。分配有三种情况:

  • 每个进程一张卡。(官方推荐的最佳模式)
  • 每个进程多张卡,复制模式。一个模型复制在不同的卡上,每个进程等同于DP模式。但速度不如单卡单进程,一般不采用
  • 每个进程多张卡,并行模式。一个模型的不同部分分布在不同的卡上。一般用在模型很大,一张卡塞不下batch_size=1的情况。

本文只介绍单卡单进程的情况。(实际没接触到大到一张卡塞不下的模型,小破实验室ε=ε=ε=┏(゜ロ゜;)┛)

相关的概念

先了解下相关的概念:

  • group,进程组。默认情况下只有一个组,

  • world size: 全局的并行数,

    torch.distributed.get_world_size()

  • rank: 表示当前进程的序号,用于进程间通讯。从0开始,rank=0的进程是master进程

    torch.distributed.get_rank()

  • local_rank: 每台机子上的进程的序号。

    一般情况下,用local_rank来手动设置模型是跑在当前机器的哪块GPU上。

    torch.distributed.local_rank()

使用流程

使用很简单,在代码中加入:

model = DDP(model, device_ids=[local_rank], output_device=local_rank)

原本的model是pytorch模型,新得到的model是DDP模型。

https://zhuanlan.zhihu.com/p/178402798

## main.py文件
import torch
import argparse

# 新增1:依赖
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP

# 新增2:从外面得到local_rank参数,在调用DDP的时候,其会自动给出这个参数,后面还会介绍。所以不用考虑太多,照着抄就是了。
#       argparse是python的一个系统库,用来处理命令行调用,如果不熟悉,可以稍微百度一下,很简单!
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", default=-1)
FLAGS = parser.parse_args()
local_rank = FLAGS.local_rank

# 新增3:DDP backend初始化
#   a.根据local_rank来设定当前使用哪块GPU
torch.cuda.set_device(local_rank)
#   b.初始化DDP,使用默认backend(nccl)就行。如果是CPU模型运行,需要选择其他后端。
dist.init_process_group(backend='nccl')

# 新增4:定义并把模型放置到单独的GPU上,需要在调用`model=DDP(model)`前做哦。
#       如果要加载模型,也必须在这里做哦。
device = torch.device("cuda", local_rank)
model = nn.Linear(10, 10).to(device)
# 可能的load模型...

# 新增5:之后才是初始化DDP模型
model = DDP(model, device_ids=[local_rank], output_device=local_rank)

除了模型部分,最重要的是数据的分发。简单来说,就是把数据集均分到不同的卡上,保证每个卡的数据不同(如果都拿整个数据,会出现冗余)。

pytorch中使用torch.utils.data.distributed.DistributedSampler实现数据的分发。

my_trainset = torchvision.datasets.CIFAR10(root='./data', train=True)
# 新增1:使用DistributedSampler,DDP帮我们把细节都封装起来了。用,就完事儿!
#       sampler的原理,后面也会介绍。
train_sampler = torch.utils.data.distributed.DistributedSampler(my_trainset)
# 需要注意的是,这里的batch_size指的是每个进程下的batch_size。也就是说,总batch_size是这里的batch_size再乘以并行数(world_size)。
trainloader = torch.utils.data.DataLoader(my_trainset, batch_size=batch_size, sampler=train_sampler)


for epoch in range(num_epochs):
    # 新增2:设置sampler的epoch,DistributedSampler需要这个来维持各个进程之间的相同随机数种子
    trainloader.sampler.set_epoch(epoch)
    # 后面这部分,则与原来完全一致了。
    for data, label in trainloader:
        prediction = model(data)
        loss = loss_fn(prediction, label)
        loss.backward()
        optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
        optimizer.step()

上边两个不做完,基本就可以进行多卡的训练。

模型保存:

# 1. save模型的时候,和DP模式一样,有一个需要注意的点:保存的是model.module而不是model。
#    因为model其实是DDP model,参数是被`model=DDP(model)`包起来的。
# 2. 我只需要在进程0上保存一次就行了,避免多次保存重复的东西。
if dist.get_rank() == 0:
    torch.save(model.module, "saved_model.ckpt")

注意点:

  1. 理论上,在没有buffer参数(如BN)的情况下,DDP性能和单卡Gradient Accumulation性能是完全一致的。并行8就等于Gradient Accumulation Step为8的单卡。
  2. 速度上,DDP比Gradient Accumulation的单卡快

如何启动

有两种方法:1. torch.distributed.launch启动 2. torch.multiprocessing.spawn

torch.distributed.launch

介绍一些参数:

  • –nnodes 有多少台机器
  • –node_rank 当前是哪台机器
  • –nproc_per_node 每台机器有多少进程

实现方式:在每台机子上都运行一次torch.distributed.launch,每个torch.distributed.launch会启动n个进程,并给每个进程一个--local_rank=i的参数

单机模式:

## Bash运行
# 假设我们只在一台机器上运行,可用卡数是8
python -m torch.distributed.launch --nproc_per_node 8 main.py

多机模式:

–master_address: master进程的网络地址,默认是127.0.0.1(只用用于单机)

–master_port: master进程的一个端口,默认29500,使用前需要确认端口是否被其他程序占用。

## Bash运行
# 假设我们在2台机器上运行,每台可用卡数是8
#    机器1:
python -m torch.distributed.launch --nnodes=2 --node_rank=0 --nproc_per_node 8 \\
  --master_adderss $my_address --master_port $my_port main.py
#    机器2:
python -m torch.distributed.launch --nnodes=2 --node_rank=1 --nproc_per_node 8 \\
  --master_adderss $my_address --master_port $my_port main.py

spawn调用方式

给出一个demo:

https://zhuanlan.zhihu.com/p/178402798

def demo_fn(rank, world_size):
    dist.init_process_group("nccl", rank=rank, world_size=world_size)
    # lots of code.
    ...

def run_demo(demo_fn, world_size):
    mp.spawn(demo_fn,
             args=(world_size,),
             nprocs=world_size,
             join=True)

相比于launch,spawn使用起来更加复杂一点,但是封装的好,方便其他人直接使用。

DDP实现的原理和细节参考:https://zhuanlan.zhihu.com/p/187610959

针对实例voxceleb_trainer多卡介绍

voxceleb_trainer是一个开源的声纹识别工具,代码简洁。实现了多卡模式,基于spawn启动模式,简单看一下:

和上文介绍的流程类似:

首先设置地址,端口号,初始化进程组,并先将模型放置到单卡上,再封装为DDP模型。

if args.distributed:
    # 针对单机多卡
    # 设置本地ip
    os.environ['MASTER_ADDR']='localhost'
    # 端口号
    os.environ['MASTER_PORT']=args.port
	# 初始化进程组
    dist.init_process_group(backend='nccl', world_size=ngpus_per_node, rank=args.gpu)
    torch.cuda.set_device(args.gpu)
    # 模型传到GPU上
    s.cuda(args.gpu)

    # BN同步
    if args.syncBN:
    s = torch.nn.SyncBatchNorm.convert_sync_batchnorm(s)
    print('----syncBN----')
    s = torch.nn.parallel.DistributedDataParallel(s, device_ids=[args.gpu], find_unused_parameters=False)

    print('Loaded the model on GPU :d'.format(args.gpu))

以上封装在一个main_work函数中,其中数据加载变为:

Datasets->DistributedSampler->BatchSampler->DataLoader

train_dataset = train_dataset_loader(**vars(args))

train_sampler = train_dataset_sampler(train_dataset, **vars(args))
# 总的batch_size = args.batch_size * n_gpu (显卡数)
train_loader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=args.batch_size,
    num_workers=args.nDataLoaderThread,
    sampler=train_sampler,
    pin_memory=False,
    worker_init_fn=worker_init_fn,
    drop_last=True,
)

spawn启动:

torch.multiprocessing.spawn(fn, args=(), nprocs=1, join=True, daemon=False, start_method='spawn')

  • fn: 传入的函数,定义main(rank, *args),在定义的时候,第一个参数留rank, 程序自动分配rank的值,告诉函数当前是在哪块GPU上进行的。
  • args: 传入的fn参数,tuple类型
  • nprocs: 进程个数
  • join:是否加入同一进程组

例如:

mp.spawn(main_worker, nprocs=n_gpus, args=(n_gpus, args))

看到这里点个大拇指,关注一下~

后边更新:DDP的一些细节

以上是关于简单介绍pytorch中分布式训练DDP使用 (结合实例,快速入门)的主要内容,如果未能解决你的问题,请参考以下文章

简单介绍pytorch中分布式训练DDP使用 (结合实例,快速入门)

[理论+实操] MONAI&PyTorch 如何进行分布式训练,详细介绍DP和DDP

[理论+实操] MONAI&PyTorch 如何进行分布式训练,详细介绍DP和DDP

[理论+实操] MONAI&PyTorch 如何进行分布式训练,详细介绍DP和DDP

在 DDP Pytorch Lightning 中跨 GPU 拆分训练数据

Pytorch DDP 分布式训练实例