ResNet实战:单机多卡DDP方式混合精度训练
Posted AI浩
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ResNet实战:单机多卡DDP方式混合精度训练相关的知识,希望对你有一定的参考价值。
文章目录
摘要
本例提取了植物幼苗数据集中的部分数据做数据集,数据集共有12种类别,模型使用最经典的resnet50,演示如何实现混合精度训练以及如何使用DDP的方式实现多卡并行训练。
通过本文你和学到:
1、如何使用混合精度训练?
2、如何制作ImageNet数据集?
3、如何使用DDP方式的进行多卡训练?
4、如何使用Mixup数据增强。
5、如何进行多卡BN同步?
6、如何使用余弦退火调整学习率?
7、如何使用classification_report实现对模型的评价。
8、预测的两种写法。
apex
使用apex实现混合精度训练,具体安装方法见:
https://blog.csdn.net/hhhhhhhhhhwwwwwwwwww/article/details/120839608
DP和DDP
pytorch中的有两种分布式训练方式,一种是常用的DataParallel(DP),另外一种是DistributedDataParallel(DDP),两者都可以用来实现数据并行方式的分布式训练,DP采用的是PS模式,DDP采用的是ring-all-reduce模式,两种分布式训练模式主要区别如下:
1、DP是单进程多线程的实现方式,DDP是采用多进程的方式。
2、DP只能在单机上使用,DDP单机和多机都可以使用。
3、DDP相比于DP训练速度要快。
Parameter Server架构(PS模式)
Parameter Server架构(PS模式)由server节点和worker节点组成。
server节点的主要功能是初始化和保存模型参数、接受worker节点计算出的局部梯度、汇总计算全局梯度,并更新模型参数(DP)。
worker节点的主要功能是各自保存部分训练数据,初始化模型,从server节点拉取最新的模型参数(pull),再读取参数,根据训练数据计算局部梯度,上传给server节点(push)。
详细的计算过程如下:
PS模式下的DP,会造成负载不均衡,因为充当server的GPU需要一定的显存用来保存worker节点计算出的局部梯度;另外server还需要将更新后的模型参数broadcast到每个worker,server的带宽就成了server与worker之间的通信瓶颈,server与worker之间的通信成本会随着worker数目的增加而线性增加。
ring-all-reduce模式
ring-all-reduce模式没有server节点,worker与worker之间的通信构成一个环。
ring-all-reduce模式下,所有worker只和自己相邻的两个worker进行通信,该工作模式分为两个工作阶段:
-
Scatter Reduce:在这个 Scatter Reduce阶段,GPU 会逐步交换彼此的梯度并融合,最后每个 GPU 都会包含完整融合梯度的一部分
-
Allgather:GPU 会逐步交换彼此不完整的融合梯度,最后所有 GPU 都会得到完整的融合梯度
计算过程如下:
DDP的基本用法 (代码编写流程)
-
- 使用 torch.distributed.init_process_group 初始化进程组
- 使用 torch.nn.parallel.DistributedDataParallel 创建 分布式模型
- 使用 torch.utils.data.distributed.DistributedSampler 创建 DataLoader
- 调整其他必要的地方(tensor放到指定device上,S/L checkpoint,指标计算等)
- 使用 torch.distributed.launch / torch.multiprocessing 或 slurm 开始训练
Mixup
为了提高成绩我在代码中加入Mixup这种增强方式。使用到了timm,安装命令:
pip install timm
导入包:from timm.data.mixup import Mixup,
定义Mixup,和SoftTargetCrossEntropy
mixup_fn = Mixup(
mixup_alpha=0.8, cutmix_alpha=1.0, cutmix_minmax=None,
prob=0.1, switch_prob=0.5, mode='batch',
label_smoothing=0.1, num_classes=12)
criterion_train = SoftTargetCrossEntropy()
项目结构
resnet_demo
├─data
│ ├─Black-grass
│ ├─Charlock
│ ├─Cleavers
│ ├─Common Chickweed
│ ├─Common wheat
│ ├─Fat Hen
│ ├─Loose Silky-bent
│ ├─Maize
│ ├─Scentless Mayweed
│ ├─Shepherds Purse
│ ├─Small-flowered Cranesbill
│ └─Sugar beet
├─mean_std.py
├─makedata.py
├─train.py
├─test1.py
└─test.py
mean_std.py:计算mean和std的值。
makedata.py:生成数据集。
计算mean和std
为了使模型更加快速的收敛,我们需要计算出mean和std的值,新建mean_std.py,插入代码:
from torchvision.datasets import ImageFolder
import torch
from torchvision import transforms
def get_mean_and_std(train_data):
train_loader = torch.utils.data.DataLoader(
train_data, batch_size=1, shuffle=False, num_workers=0,
pin_memory=True)
mean = torch.zeros(3)
std = torch.zeros(3)
for X, _ in train_loader:
for d in range(3):
mean[d] += X[:, d, :, :].mean()
std[d] += X[:, d, :, :].std()
mean.div_(len(train_data))
std.div_(len(train_data))
return list(mean.numpy()), list(std.numpy())
if __name__ == '__main__':
train_dataset = ImageFolder(root=r'data1', transform=transforms.ToTensor())
print(get_mean_and_std(train_dataset))
数据集结构:
运行结果:
([0.3281186, 0.28937867, 0.20702125], [0.09407319, 0.09732835, 0.106712654])
把这个结果记录下来,后面要用!
生成数据集
我们整理还的图像分类的数据集结构是这样的
data
├─Black-grass
├─Charlock
├─Cleavers
├─Common Chickweed
├─Common wheat
├─Fat Hen
├─Loose Silky-bent
├─Maize
├─Scentless Mayweed
├─Shepherds Purse
├─Small-flowered Cranesbill
└─Sugar beet
pytorch和keras默认加载方式是ImageNet数据集格式,格式是
├─data
│ ├─val
│ │ ├─Black-grass
│ │ ├─Charlock
│ │ ├─Cleavers
│ │ ├─Common Chickweed
│ │ ├─Common wheat
│ │ ├─Fat Hen
│ │ ├─Loose Silky-bent
│ │ ├─Maize
│ │ ├─Scentless Mayweed
│ │ ├─Shepherds Purse
│ │ ├─Small-flowered Cranesbill
│ │ └─Sugar beet
│ └─train
│ ├─Black-grass
│ ├─Charlock
│ ├─Cleavers
│ ├─Common Chickweed
│ ├─Common wheat
│ ├─Fat Hen
│ ├─Loose Silky-bent
│ ├─Maize
│ ├─Scentless Mayweed
│ ├─Shepherds Purse
│ ├─Small-flowered Cranesbill
│ └─Sugar beet
新增格式转化脚本makedata.py,插入代码:
import glob
import os
import shutil
image_list=glob.glob('data1/*/*.png')
print(image_list)
file_dir='data'
if os.path.exists(file_dir):
print('true')
#os.rmdir(file_dir)
shutil.rmtree(file_dir)#删除再建立
os.makedirs(file_dir)
else:
os.makedirs(file_dir)
from sklearn.model_selection import train_test_split
trainval_files, val_files = train_test_split(image_list, test_size=0.3, random_state=42)
train_dir='train'
val_dir='val'
train_root=os.path.join(file_dir,train_dir)
val_root=os.path.join(file_dir,val_dir)
for file in trainval_files:
file_class=file.replace("\\\\","/").split('/')[-2]
file_name=file.replace("\\\\","/").split('/')[-1]
file_class=os.path.join(train_root,file_class)
if not os.path.isdir(file_class):
os.makedirs(file_class)
shutil.copy(file, file_class + '/' + file_name)
for file in val_files:
file_class=file.replace("\\\\","/").split('/')[-2]
file_name=file.replace("\\\\","/").split('/')[-1]
file_class=os.path.join(val_root,file_class)
if not os.path.isdir(file_class):
os.makedirs(file_class)
shutil.copy(file, file_class + '/' + file_name)
训练
完成上面的步骤后,就开始train脚本的编写,新建train.py.
导入项目使用的库
import torch
import torch.nn as nn
import torch.nn.parallel
import torch.optim as optim
import torch.utils.data
import torch.utils.data.distributed
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from sklearn.metrics import classification_report
from timm.data.mixup import Mixup
from timm.loss import SoftTargetCrossEntropy
from torchvision.models.resnet import resnet50
from apex import amp
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
import os
from apex.parallel import convert_syncbn_model
设置全局参数
设置学习率、BatchSize、epoch等参数。
# 设置全局参数
model_lr = 1e-4
BATCH_SIZE = 256
EPOCHS = 1000
use_amp=False #是否使用混合精度
classes=12
CLIP_GRAD=5.0
is_distributed=True
model_lr:学习率,根据实际情况做调整。
BATCH_SIZE:batchsize,根据显卡的大小设置。
EPOCHS:epoch的个数,一般300够用。
use_amp:是否使用混合精度。
classes:类别个数。
CLIP_GRAD:梯度的最大范数,在梯度裁剪里设置。
rank:默认是0。
is_distributed:是否是分布式?
设置distributed
使用nccl的方式初始化初始化进程组。
# 0. set up distributed device
rank = int(os.environ["RANK"])
local_rank = int(os.environ["LOCAL_RANK"])
torch.cuda.set_device(rank % torch.cuda.device_count())
dist.init_process_group(backend="nccl")
device = torch.device("cuda", local_rank)
print(f"[init] == local rank: local_rank, global rank: rank ==")
进程组的参数介绍:
GROUP:进程组,大部分情况下DDP的各个进程是在同一个进程组下。
WORLD_SIZE:总的进程数量 (原则上一个process占用一个GPU是较优的)。
RANK:当前进程的序号,用于进程间通讯,rank = 0 的主机为 master 节点。
LOCAL_RANK:当前进程对应的GPU号。
图像预处理与增强
数据处理比较简单,加入了Cutout、做了Resize和归一化,定义Mixup函数。
# 数据预处理7
transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.51819474, 0.5250407, 0.4945761], std=[0.24228974, 0.24347611, 0.2530049])
])
transform_test = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.51819474, 0.5250407, 0.4945761], std=[0.24228974, 0.24347611, 0.2530049])
])
mixup_fn = Mixup(
mixup_alpha=0.8, cutmix_alpha=1.0, cutmix_minmax=None,
prob=0.1, switch_prob=0.5, mode='batch',
label_smoothing=0.1, classes=12)
读取数据
使用pytorch默认读取数据的方式,然后将dataset_train.class_to_idx打印出来,预测的时候要用到。
# 读取数据
train_dataset = datasets.ImageFolder('data/train', transform=transform)
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset,shuffle=True)
val_dataset = datasets.ImageFolder("data/val", transform=transform_test)
print(train_dataset.class_to_idx)
# 导入数据
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE, sampler=train_sampler)
test_loader = torch.utils.data.DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
设置模型
- 设置loss函数,train的loss为:SoftTargetCrossEntropy,val的loss:nn.CrossEntropyLoss()。
- 设置模型为resnet50,预训练设置为true,num_classes设置为12。
- 使用convert_sync_batchnorm函数实现多卡之间的BN同步。
- 创建DDP方式的多卡训练。
- 优化器设置为adam。
- 学习率调整策略选择为余弦退火。
- 如果使用混合精度,则将amp初始化为“O1”。
# 实例化模型并且移动到GPU
criterion_train = SoftTargetCrossEntropy()
criterion_val = torch.nn.CrossEntropyLoss()
model_ft = resnet50(pretrained=True)
num_ftrs = model_ft.fc.in_features
model_ft.fc = nn.Linear(num_ftrs, classes)
#model_ft=convert_syncbn_model(model_ft)#使用apex同步BN
model_ft=torch.nn.SyncBatchNorm.convert_sync_batchnorm(model_ft)
model_ft.to(device)
print(model_ft)
# DistributedDataParallel
model_ft = DDP(model_ft, device_ids=[local_rank], output_device=local_rank)
# 选择简单暴力的Adam优化器,学习率调低
optimizer = optim.Adam(model_ft.parameters(), lr=model_lr)
cosine_schedule = optim.lr_scheduler.CosineAnnealingLR(optimizer=optimizer, T_max=20, eta_min=1e-6)
if use_amp:
model, optimizer = amp.initialize(model_ft, optimizer, opt_level="O1") # 这里是“欧一”,不是“零一”
定义训练和验证函数
定义训练函数和验证函数。在训练函数中:
首先 ,调用set_epoch,每个 epoch 开始时调用 set_epoch() 方法,然后再创建 DataLoader 迭代器。
然后,初始化loss,开始遍历train_loader。
判断batch中有没有奇数的情况,如果有则减去一因为MixUp的loss需要偶数才行。
将数据放入转为cuda,计算mixup。
将data输入model得到输出结果,然后计算loss。
判断是否使用混合精度,如果使用则调用scaled_loss.backward()没有则调用loss.backward()。
clip_grad_norm_()执行梯度裁剪,防止梯度爆炸。
在一个epoch完成后,使用classification_report计算详细的得分情况。
# 定义训练过程
def train(model, device, train_loader, optimizer, epoch):
if rank == 0:
print(" ======= Training ======= \\n")
train_sampler.set_epoch(epoch)
model.train()
sum_loss = 0
total_num = len(train_loader.dataset)
print(total_num, len(train_loader))
for batch_idx, (data, target) in enumerate(train_loader):
if len(data)%2!=0:
print(len(data))
data=data[0:len(data)-1]
target=target[0:len(target)-1]
print(len(data))
data, target = data.to(device, non_blocking=True), target.to(device, non_blocking=True)
samples, targets = mixup_fn(data, target)
output = model(data)
loss = criterion_train(output, targets)
optimizer.zero_grad()
if use_amp:
with amp.scale_loss(loss, optimizer) as scaled_loss:
scaled_loss.backward()
grad_norm = torch.nn.utils.clip_grad_norm_(amp.master_params(optimizer), CLIP_GRAD)
else:
loss.backward()
grad_norm = torch.nn.utils.clip_grad_norm_(model.parameters(),CLIP_GRAD)
#torch.nn.utils.clipgrad_norm() 的使用应该在loss.backward() 之后,optimizer.step()之前.
#注意这个方法只在训练的时候使用,在测试的时候验证和测试的时候不用。
optimizer.step()
lr = optimizer.state_dict()['param_groups'][0]['lr']
print_loss = loss.data.item()
sum_loss += print_loss
if (batch_idx + 1) % 10 == 0:
print('Train Epoch: [/ (:.0f%)]\\tLoss: :.6f\\tLR::.9f'.format(
epoch, (batch_idx + 1) * len(data), len(train_loader.dataset),
100. * (batch_idx + 1) / len(train_loader), loss.item(), lr))
ave_loss = sum_loss / len(train_loader)
print('epoch:,loss:'.format(epoch, ave_loss))
ACC = 0
# 验证过程
def val(model, device, test_loader):
global ACC
model.eval()
test_loss = 0
correct = 0
total_num = len(test_loader.dataset)
print(total_num, len(test_loader))
val_list = []
pred_list = []
with torch.no_grad():
for data, target in test_loader:
for t in target:
val_list.append(t.data.item())
data, target = data.to(device), target.to(device)
output = model(data)
loss = criterion_val(output, target)
_, pred = torch.max(output.data, 1)
for p in pred:
pred_list.append(p.data.item())
correct += torch.sum(pred == target)
print_loss = loss.data.item()
test_loss += print_loss
correct = correct.data.item()
acc = correct / total_num
avgloss = test_loss / len(test_loader)
print('\\nVal set: Average loss: :.4f, Accuracy: / (:.0f%)\\n'.format(
avgloss, correct, len(test_loader.dataset), 100 * acc))
if acc > ACC:
if isinstance(model, torch.nn.parallel.DistributedDataParallel):
torch.save(model.module, 'model_' + str(epoch) + '_' + str(round(acc, 3)) + '.pth')
else:
torch.save(model, 'model_' + str(epoch) + '_' + str(round(acc, 3)) + '.pth')
ACC = acc
return val_list, pred_list
# 训练
is_set_lr = False
for epoch in range(1, EPOCHS + 1):
train(model_ft, device, train_loader, optimizer, epoch)
if epoch < 600:
cosine_schedule.step()
else:
if is_set_lr:
continue
for param_group in以上是关于ResNet实战:单机多卡DDP方式混合精度训练的主要内容,如果未能解决你的问题,请参考以下文章
简单介绍pytorch中分布式训练DDP使用 (结合实例,快速入门)
简单介绍pytorch中分布式训练DDP使用 (结合实例,快速入门)