[理论+实操] MONAI&PyTorch 如何进行分布式训练,详细介绍DP和DDP

Posted Tina姐

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[理论+实操] MONAI&PyTorch 如何进行分布式训练,详细介绍DP和DDP相关的知识,希望对你有一定的参考价值。

优秀的炼丹师再也不满足单张GPU训练了,总想要迭代的再快点,batchsize再大点。能用3D model 绝不用2D,完了!内存超了,咋办?急, 在线等!!

文章目录

为什么要使用分布式训练

对于懒癌星人,单卡训练能解决的问题,再多卡给我,我都懒得用。但是对于资深炼丹师,分布式训练带来的收益很大,可以加速模型的训练、调参的节奏、以及版本的迭代更新等~

当你遇到以下情况,可以考虑使用分布式训练(使用多张卡,或者多台服务器)

  • 在理想的batc_size下,单卡训练 out of memory
  • 使用单卡虽然能run, 但是速度非常慢,耗时。
  • 数据量太大,需要利用所有资源满足训练。
  • 模型太大,单机内存不足
    等…

分布式训练有哪些方法

分布式训练策略按照并行方式不同,可以简单的分为数据并行模型并行两种方式。原文链接

1️⃣ 数据并行

数据并行是指在不同的 GPU 上都 copy 保存一份模型的副本,然后将不同的数据分配到不同的 GPU 上进行计算,最后将所有 GPU 计算的结果进行合并,从而达到加速模型训练的目的。由于数据并行会涉及到把不同 GPU 的计算结果进行合并然后再更新模型,根据跟新方式不同,又可以分为同步更新和异步更新

2️⃣ 模型并行

分布式训练中的模型并行是指将整个神经网络模型拆解分布到不同的 GPU 中,不同的 GPU 负责计算网络模型中的不同部分。这通常是在网络模型很大很大、单个 GPU 的显存已经完全装不下整体网络的情况下才会采用。

基于 Pytorch 的分布式训练方法

在 Pytorch 中为我们提供了两种多 GPU 的分布式训练方案: torch.nn.DataParallel(DP)torch.nn.parallel.Distributed Data Parallel(DDP)

DP(DataParallel)

原文链接

  • 优点:修改的代码量最少,只要像这样model = nn.DataParallel(model)包裹一下你的模型就行了
  • 缺点:只适用单机多卡,不适用多机多卡;性能不如DDP; DP使用单进程,目前官方已经不推荐。

如果觉得 DDP 很难,可以采用这种方式。示例用法

import monai
import torch.nn as nn
import os

os.environ("CUDA_VISIBLE_DEVICES") = '0,1' 
# 用哪些卡,就写哪些,也可以在命令行运行的时候指定可见GPU
# $: export CUDA_VISIBLE_DEVICES=0,1 python train.py
device = torch.device('cuda' if torch.cuda.is_available () else 'cpu')
model = monai.networks.nets.UNet().to(device)
model = nn.DataParallel(model)

通过两种方式可以指定需要使用的GPU,第一种是在代码里设置os.environ, 第二种是在终端运行代码前,加一句export CUDA_VISIBLE_DEVICES=0,1。按照自己的习惯来就行。

如果需要跑来看看效果,可以执行下面的代码,完整版

import argparse
import os
import sys
from glob import glob

import nibabel as nib
import numpy as np
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel

import monai
from monai.data import DataLoader, Dataset, create_test_image_3d, DistributedSampler
from monai.transforms import (
    AsChannelFirstd,
    Compose,
    LoadImaged,
    RandCropByPosNegLabeld,
    RandRotate90d,
    ScaleIntensityd,
    EnsureTyped,
)


def train(args):
    if not os.path.exists(args.dir):
        # create 40 random image, mask paris for training
        print(f"generating synthetic data to args.dir (this may take a while)")
        os.makedirs(args.dir)
        # set random seed to generate same random data for every node
        np.random.seed(seed=0)
        for i in range(200):
            im, seg = create_test_image_3d(128, 128, 128, num_seg_classes=1, channel_dim=-1)
            n = nib.Nifti1Image(im, np.eye(4))
            nib.save(n, os.path.join(args.dir, f"imgi:d.nii.gz"))
            n = nib.Nifti1Image(seg, np.eye(4))
            nib.save(n, os.path.join(args.dir, f"segi:d.nii.gz"))

    images = sorted(glob(os.path.join(args.dir, "img*.nii.gz")))
    segs = sorted(glob(os.path.join(args.dir, "seg*.nii.gz")))
    train_files = ["img": img, "seg": seg for img, seg in zip(images, segs)]

    # define transforms for image and segmentation
    train_transforms = Compose(
        [
            LoadImaged(keys=["img", "seg"]),
            AsChannelFirstd(keys=["img", "seg"], channel_dim=-1),
            ScaleIntensityd(keys="img"),
            RandCropByPosNegLabeld(
                keys=["img", "seg"], label_key="seg", spatial_size=[96, 96, 96], pos=1, neg=1, num_samples=4
            ),
            RandRotate90d(keys=["img", "seg"], prob=0.5, spatial_axes=[0, 2]),
            EnsureTyped(keys=["img", "seg"]),
        ]
    )

    # create a training data loader
    train_ds = Dataset(data=train_files, transform=train_transforms)
    # use batch_size=2 to load images and use RandCropByPosNegLabeld to generate 2 x 4 images for network training
    train_loader = DataLoader(
        train_ds,
        batch_size=10,
        shuffle=False,
        num_workers=2,
        pin_memory=True,
    )

    # create UNet, DiceLoss and Adam optimizer
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = monai.networks.nets.UNet(
        spatial_dims=3,
        in_channels=1,
        out_channels=1,
        channels=(16, 32, 64, 128, 256),
        strides=(2, 2, 2, 2),
        num_res_units=2,
    ).to(device)
    model = torch.nn.DataParallel(model)
    loss_function = monai.losses.DiceLoss(sigmoid=True).to(device)
    optimizer = torch.optim.Adam(model.parameters(), 1e-3)

    # start a typical PyTorch training
    epoch_loss_values = list()
    for epoch in range(5):
        print("-" * 10)
        print(f"epoch epoch + 1/5")
        model.train()
        epoch_loss = 0
        step = 0
        for batch_data in train_loader:
            step += 1
            inputs, labels = batch_data["img"].to(device), batch_data["seg"].to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = loss_function(outputs, labels)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
            epoch_len = len(train_ds) // train_loader.batch_size
            print(f"step/epoch_len, train_loss: loss.item():.4f")
        epoch_loss /= step
        epoch_loss_values.append(epoch_loss)
        print(f"epoch epoch + 1 average loss: epoch_loss:.4f")
    print(f"train completed, epoch losses: epoch_loss_values")
    torch.save(model.state_dict(), "final_model.pth")


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("-d", "--dir", default="./testdata", type=str, help="directory to create random data")
    args = parser.parse_args()

    train(args=args)

if __name__ == "__main__":
    import os
    os.environ["CUDA_VISIBLE_DEVICES"] = '0,1'
    main()

这里我们使用了2块GPU,运行的时候可以看下两块GPU的使用情况.

DDP(DistributedDataParallel)

本次重点介绍:⭐️⭐️⭐️⭐️⭐️

与 DP 模式不同,DDP 模式本身是为多机多卡设计的,当然在单机多卡的情况下也可以使用。DDP 采用的是 all-reduce 架构,基本解决了 PS 架构中通信成本与 GPU 的数量线性相关的问题。虽然在单机多卡情况下,可以使用 DP 模式,但是使用 DDP 通常会比 DP 模式快一些,因此 DDP 模式也是官方推荐大家使用的方式。

DDP为基于torch.distributed的分布式数据并行结构,工作机制为:在batch维度上对数据进行分组,将输入的数据分配到指定的设备(GPU)上,从而将程序的模型并行化。对应的,每个GPU上会复制一个模型的副本,负责处理分配到的数据,在后向传播过程中再对每个设备上的梯度进行平均。

缺点:代码改动较DP多,坑较多,需要试错攒经验

DDP的启动方式有很多种,内容上是统一的:都是启动多进程来完成运算。

  • torch.multiprocessing.spawn:适用于单价多卡
  • torch.distributed.launch: 可用于多机或多卡。

接下来以torch.distributed.launch为例,只需要8步,将你的代码改成分布式训练。适用于单机多卡。

step 1: 初始化进程

import torch.distributed as dist
dist.init_process_group(backend="nccl", init_method="env://")

参数解析:
在创建模型之前进行初始化

  • backend: 后端通信可以采用mpi, gloo,and nccl。对于基于 GPU 的训练,建议使用 nccl 以获得最佳性能.
  • init_method: 告知每个进程如何发现彼此,如何使用通信后端初始化和验证进程组。 默认情况下,如果未指定 init_method,PyTorch 将使用环境变量初始化方法 (env://)。

    图片左边为常规代码,右边为DDP修改后的代码

step 2: 在创建dataloder前加一个sampler

from monai.data import DistributedSampler
 # create a training data loader
train_ds = Dataset(data=train_files, transform=train_transforms)
# create a training data sampler
train_sampler = DistributedSampler(dataset=train_ds, even_divisible=True, shuffle=True)
# use batch_size=2 to load images and use RandCropByPosNegLabeld to generate 2 x 4 images for network training
train_loader = DataLoader(
    train_ds,
    batch_size=10,
    shuffle=False,
    num_workers=2,
    pin_memory=True,
    sampler=train_sampler,
)

与常规训练的区别👇红框内为新增内容

step 3: 设定Device

device = torch.device(f"cuda:args.local_rank")
torch.cuda.set_device(device)

这里涉及到的参数在后面(step 7)给出
这一步,常规的代码也可以这样写,但平时一般用如下方法

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

step 4: 使用DistributedDataParallel模块包裹模型

from torch.nn.parallel import DistributedDataParallel
model = monai.networks.nets.UNet(
        spatial_dims=3,
        in_channels=1,
        out_channels=1,
        channels=(16, 32, 64, 128, 256),
        strides=(2, 2, 2, 2),
        num_res_units=2,
    ).to(device)
model = DistributedDataParallel(model, device_ids=[device], output_device=[device])

model.to(device) # 这句不能少,最好不要用model.cuda()

与常规对比

step 5: 在epoch训练前设置set_epoch

train_sampler.set_epoch(epoch)


在后面解释为什么要设置set_epoch

step 6: 修改模型的保存方式

因为每个进程的模型是一样的,我们只用在某一个进程上保存模型就行,默认使用0进程保存模型

if dist.get_rank() == 0:
    # all processes should see same parameters as they all start from same
    # random parameters and gradients are synchronized in backward passes,
    # therefore, saving it in one process is sufficient
    torch.save(model.state_dict(), "final_model.pth")

step 7: 在main函数里面添加local_rank参数

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("-d", "--dir", default="./testdata", type=str, help="directory to create random data")
    # must parse the command-line argument: ``--local_rank=LOCAL_PROCESS_RANK``, which will be provided by DDP
    parser.add_argument("--local_rank", type=int, default=0)
    args = parser.parse_args()

    train(args=args)

7步已经改完了所有代码,接下来copy完整代码,尝试跑通💪💪

完整代码如下

import argparse
import os
import sys
from glob import glob

import nibabel as nib
import numpy as np
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel

import monai
from monai.data import DataLoader, Dataset, create_test_image_3d, DistributedSampler
from monai.transforms import (
    AsChannelFirstd,
    Compose,
    LoadImaged,
    RandCropByPosNegLabeld,
    RandRotate90d,
    ScaleIntensityd,
    EnsureTyped,
)


def train(args):
    # disable logging for processes except 0 on every node
    if args.local_rank != 0:
        f = open(os.devnull, "w")
        sys.stdout = sys.stderr = f
    elif not os.path.exists(args.dir):
        # create 40 random image, mask paris for training
        print(f"generating synthetic data to args.dir (this may take a while)")
        os.makedirs(args.dir)
        # set random seed to generate same random data for every node
        np.random.seed(seed=0)
        for i in range(100):
            im, seg = create_test_image_3d(128, 128, 128, num_seg_classes=1, channel_dim=-1)
            n = nib.Nifti1Image(im, np.eye(4))
            nib.save(n, os.path.join(args.dir, f"imgi:d.nii.gz"))
            n = nib.Nifti1Image(seg, np.eye(4))
            nib.save(n, os.path.join(args.dir, f"segi:d.nii.gz"))

    # initialize the distributed training process, every GPU runs in a process
    dist.init_process_group(backend="nccl", init_method="env://")

    images = sorted(glob(os.path.join(args.dir, "img*.nii.gz")))
    segs = sorted(glob(os.path.join(args.dir, "seg*.nii.gz")))
    train_files = ["img": img, "seg": seg for img, seg in zip(images, segs)]

    # define transforms for image and segmentation
    train_transforms = Compose(
        [
            LoadImaged(keys=["img", "seg"]),
            AsChannelFirstd(keys=["img", "seg"], channel_dim=-1),
            ScaleIntensityd(keys="img"),
            RandCropByPosNegLabeld(
                keys=["img", "seg"], label_key="seg", spatial_size=[96, 96, 96], pos=1, neg=1, num_samples=4
            ),
            RandRotate90d(keys=["img", "seg"], prob=0.5, spatial_axes=[0, 2]),
            EnsureTyped(keys=["img", "seg"]),
        ]
    )

    # create a training data loader
    train_ds = Dataset(data=train_files, transform=train_transforms)
    # create a training data sampler
    train_sampler = DistributedSampler(dataset=train_ds, even_divisible=True, shuffle=True)
    # use batch_size=2 to load images and use RandCropByPosNegLabeld to generate 2 x 4 images for network training
    train_loader = DataLoader(
        train_ds,
        batch_size=10,
        shuffle=False,
        num_workers=2,
        pin_memory=True,
        sampler=train_sampler,
    )

    # create UNet, DiceLoss and Adam optimizer
    device = torch.device(f"cuda:args.local_rank")
    torch.cuda.set_device(device)
    model = monai.networks.nets.UNet(
        spatial_dims=[理论+实操] MONAI&PyTorch 如何进行分布式训练,详细介绍DP和DDP

Nginx——反向代理 & 负载均衡(无理论,案例实操)

Nginx——反向代理 & 负载均衡(无理论,案例实操)

MONAI Label 安装流程及使用攻略

MONAI Label 安装流程及使用攻略

pytorch实现简单的分类器