使用Pytorch实现Transformer,如何巧妙的使用或者停用 optimizer.zero_grad()来训练大模型?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用Pytorch实现Transformer,如何巧妙的使用或者停用 optimizer.zero_grad()来训练大模型?相关的知识,希望对你有一定的参考价值。

参考技术A optimizer.zero_grad()意思是把梯度置零,也就是把loss关于weight的导数变成0.
pytorch对于每个batch大都执行了这样的操作:
optimizer.zero_grad() ## 梯度清零preds = model(inputs) ## inference
loss = criterion(preds, targets) ## 求解loss
loss.backward() ## 反向传播求解梯度
optimizer.step() ## 更新权重参数

1,由于pytorch的动态计算图,当我们使用loss.backward()和opimizer.step()进行梯度下降更新参数的时候,梯度并不会自动清零。并且这两个操作是独立操作。
2,backward():反向传播求解梯度。
3,step():更新权重参数。
参考技术B optimizer.zero_grad()意思是把梯度置零,也就是把loss关于weight的导数变成0.
pytorch对于每个batch大都执行了这样的操作:
optimizer.zero_grad() ## 梯度清零preds = model(inputs) ## inference
loss = criterion(preds, targets) ## 求解loss
loss.backward() ## 反向传播求解梯度
optimizer.step() ## 更新权重参数

1,由于pytorch的动态计算图,当我们使用loss.backward()和opimizer.step()进行梯度下降更新参数的时候,梯度并不会自动清零。并且这两个操作是独立操作。
2,backward():反向传播求解梯度。
3,step():更新权重参数。

Swin Transformer模型——pytorch实现

论文传送门:Swin Transformer: Hierarchical Vision Transformer using Shifted Windows
前置文章:ViT模型——pytorch实现

Swin Transformer的特点:

相较于ViT:
①采用逐渐递增的下采样倍数,获得具有层次的特征图(hierarchical feature maps),便于进行检测和分割任务;
②引入W-MSA(Windows Multi-Head Self-Attention)和SW-MSA(Shifted Windows Multi-Head Self-Attention),减少了计算量。

W-MSA和SW-MSA:

W-MSA是将特征图划分成一个个Window,然后在每个Window中进行Patch的划分和Attention的计算,这样可以减少计算量,但同时也使得不同Window之间无法进行信息交互
因此,作者又提出SW-MSA,即偏移的W-MSA,具体做法为将原本划分Windows的网格向右、向下平移window_size//2长度,然后通过平移拼接将小块的Window拼接成整块Window,从而保证与W-MSA的Windows数量相同,同时生成mask,将原本不相邻的区域的tokens(patches)设为-100,在Attention计算时,与attention( Q K T QK^T QKT)相加,使得不相邻的区域的attention经过softmax后趋近于0,避免不相邻区域的干扰,经过Attention后再将拼接的Windows拆分,反向平移还原回原特征图。

Relative Position Bias:

在Attention计算中,引入一个偏置项B,B从一个可训练的矩阵(relative positon bias matrix)中取,索引为每个token(patch)的相对位置索引(经过一些变换)。

Swin Transformer的结构:

Patch Partition:将输入图片用4x4的窗口划分,并在Channel通道堆叠,代码中使用Conv2d实现;
Linear Embedding:将H和W维度展平;
(代码中将Patch Partition和Linear Embedding通过一个Patch Embedding实现)
Swin Transformer Block:成对出现,整个结构与ViT中的Transformer Block相同,只是把MSA替换成了W-MSA和SW-MSA,第奇数个Block使用W-MSA,第偶数个Block使用SW-MSA(二者交替使用);
Patch Merging:下采样方法,类似focus,每次Patch Merging先使高H、宽W减半,通道C翻4倍,然后通过一个Linear将C减半,即最后C为原来的2倍;
Stage:Linear Embedding/Patch Merging + L * Swin Transformer Block

不同规模的Swin Transformer模型:

import torch
import torch.nn as nn
import torch.nn.functional as F
from einops import rearrange


def drop_path_f(x, drop_prob: float = 0., training: bool = False):
    """Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks).

    This is the same as the DropConnect impl I created for EfficientNet, etc networks, however,
    the original name is misleading as 'Drop Connect' is a different form of dropout in a separate paper...
    See discussion: https://github.com/tensorflow/tpu/issues/494#issuecomment-532968956 ... I've opted for
    changing the layer and argument names to 'drop path' rather than mix DropConnect as a layer name and use
    'survival rate' as the argument.

    """
    if drop_prob == 0. or not training:
        return x
    keep_prob = 1 - drop_prob
    shape = (x.shape[0],) + (1,) * (x.ndim - 1)  # work with diff dim tensors, not just 2D ConvNets
    random_tensor = keep_prob + torch.rand(shape, dtype=x.dtype, device=x.device)
    random_tensor.floor_()  # binarize
    output = x.div(keep_prob) * random_tensor
    return output


class DropPath(nn.Module):
    """Drop paths (Stochastic Depth) per sample  (when applied in main path of residual blocks).
    """

    def __init__(self, drop_prob=None):
        super(DropPath, self).__init__()
        self.drop_prob = drop_prob

    def forward(self, x):
        return drop_path_f(x, self.drop_prob, self.training)


class PatchEmbedding(nn.Module):  # Patch Partition + Linear Embedding
    def __init__(self, patch_size=4, in_channels=3, emb_dim=96):
        super(PatchEmbedding, self).__init__()
        self.conv = nn.Conv2d(in_channels, emb_dim, patch_size, patch_size)  # 4x4卷积实现Patch Partition

    def forward(self, x):
        # (B,C,H,W)
        x = self.conv(x)
        _, _, H, W = x.shape
        x = rearrange(x, "B C H W -> B (H W) C")  # Linear Embedding
        return x, H, W


class MLP(nn.Module):  # MLP
    def __init__(self, in_dim, hidden_dim=None, drop_ratio=0.):
        super(MLP, self).__init__()
        if hidden_dim is None:
            hidden_dim = in_dim * 4  # linear的hidden_dims默认为in_dims的4倍

        self.fc1 = nn.Linear(in_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, in_dim)
        self.gelu = nn.GELU()
        self.dropout = nn.Dropout(drop_ratio)

    def forward(self, x):
        # Linear + GELU + Dropout + Linear + Dropout
        x = self.fc1(x)
        x = self.gelu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        x = self.dropout(x)
        return x


class WindowMultiHeadSelfAttention(nn.Module):  # W-MSA / SW-MSA
    def __init__(self, dim, window_size, num_heads,
                 attn_drop_ratio=0., proj_drop_ratio=0.):
        super(WindowMultiHeadSelfAttention, self).__init__()
        self.window_size = window_size
        self.num_heads = num_heads
        self.head_dim = dim // num_heads
        self.qkv = nn.Linear(dim, dim * 3)
        self.attn_drop = nn.Dropout(attn_drop_ratio)
        self.proj = nn.Linear(dim, dim)
        self.proj_drop = nn.Dropout(proj_drop_ratio)

        # 创建Relative position bias matrix,其参数可训练,根据Relative position index取其中的值作为B
        self.relative_position_bias_matrix = nn.Parameter(torch.zeros((2 * window_size - 1) ** 2, num_heads))
        # 使用register_buffer,使得relative_position_index可以随model.state_dict()保存,并可以随model.cuda()加载至GPU
        self.register_buffer("relative_position_index", self._get_relative_position_index())

    def _get_relative_position_index(self):  # 创建Relative position index
        coords = torch.flatten(
            torch.stack(
                torch.meshgrid([torch.arange(self.window_size), torch.arange(self.window_size)], indexing="ij"), dim=0
            ), 1
        )
        relative_coords = coords[:, :, None] - coords[:, None, :]
        relative_coords += self.window_size - 1
        relative_coords[0, :, :] *= 2 * self.window_size - 1
        relative_position_index = relative_coords.sum(0)
        return relative_position_index.view(-1)

    def forward(self, x, mask=None):
        qkv = self.qkv(x)
        qkv = rearrange(qkv, "B P (C H d) -> C B H P d", C=3, H=self.num_heads, d=self.head_dim)
        q, k, v = qkv[0], qkv[1], qkv[2]
        k = rearrange(k, "B H P d -> B H d P")
        # Attention(Q, K, V ) = softmax(QKT/dk)V (T表示转置)
        attn = torch.matmul(q, k) * self.head_dim ** -0.5  # QKT/dk

        bias = self.relative_position_bias_matrix[self.relative_position_index]
        bias = rearrange(bias, "(P1 P2) H -> 1 H P1 P2", P1=self.window_size ** 2, P2=self.window_size ** 2)
        attn += bias  # QKT/dk + B

        if mask is not None:
            # 如果mask不为None,对attn进行加和,使得在原图上不相邻的token对应的attn-100,经过softmax后趋近于0
            attn = rearrange(attn, "(B NW) H P1 P2 -> B NW H P1 P2", NW=mask.shape[0])
            mask = rearrange(mask, "NW P1 P2 -> 1 NW 1 P1 P2")
            attn += mask
            attn = rearrange(attn, "B NW H P1 P2 -> (B NW) H P1 P2")

        attn = F.softmax(attn)  # softmax(QKT/dk + B)

        attn = self.attn_drop(attn)

        x = torch.matmul(attn, v)  # softmax(QKT/dk + B)V
        x = rearrange(x, "B H P d -> B P (H d)")
        x = self.proj(x)
        x = self.proj_drop(x)

        return x


class SwinTransformerBlock(nn.Module):  # Swin Transformer Block
    def __init__(self, dim, num_heads, window_size=7, shift=True,
                 attn_drop_ratio=0., proj_drop_ratio=0., drop_path_ratio=0.):
        super(SwinTransformerBlock, self).__init__()
        self.num_heads = num_heads
        self.window_size = window_size
        self.shift_size = window_size // 2 if shift else 0.  # 不进行shift时,shift_size取0
        self.layernorm1 = nn.LayerNorm(dim)
        self.attn = WindowMultiHeadSelfAttention(dim, self.window_size, self.num_heads,
                                                 attn_drop_ratio=attn_drop_ratio,
                                                 proj_drop_ratio=proj_drop_ratio)
        self.droppath = DropPath(drop_path_ratio) if drop_path_ratio > 0. else nn.Identity()
        self.layernorm2 = nn.LayerNorm(dim)
        self.mlp = MLP(dim)

    def _create_mask(self, H, W, device):  # 创建mask
        mask = torch.zeros((1, 1, H, W), device=device)
        slices = (slice(0, -self.window_size),
                  slice(-self.window_size, -self.shift_size),
                  slice(-self.shift_size, None))
        count = 0
        for h in slices:
            for w in slices:
                mask[:, :, h, w] = count
                count += 1

        mask = rearrange(mask, "1 1 (H Hs) (W Ws) -> (H W) (Hs Ws)", Hs=self.window_size, Ws=self.window_size)
        attn_mask = mask.unsqueeze(1) - mask.unsqueeze(2)
        attn_mask = attn_mask.masked_fill(attn_mask != 0, float(-100.))  # 在原图上不相邻的token,mask为-100.
        attn_mask = attn_mask.masked_fill(attn_mask == 0, float(0.))  # 在原图上相邻的token,mask为0.
        return attn_mask

    def forward(self, input: tuple):
        x, H, W = input
        shortcut = x
        x = self.layernorm1(x)
        x = rearrange(x, "B (H W) C -> B C H W", H=H, W=W)
        if self.shift_size > 0.:  # 如果偏移量shift_size>0.,则对x进行偏移,同时创建对应的mask
            x = torch.roll(x, shifts=(-self.shift_size, -self.shift_size), dims=(2, 3))
            mask = self._create_mask(H, W, device=x.device)
        else:
            mask = None
        num_windows = (x.shape[2] // self.window_size, x.shape[3] // self.window_size)
        # x = rearrange(x, "B C (H Hs) (W Ws) -> (B H W) C Hs Ws", Hs=self.window_size, Ws=self.window_size)
        x = rearrange(x, "B C (H Hs) (W Ws) -> (B H W) (Hs Ws) C", Hs=self.window_size, Ws=self.window_size)
        x = self.attn(x, mask)
        # x = rearrange(x, "(B H W) C Hs Ws -> B C (H Hs) (W Ws)", Hs=self.window_size, Ws=self.window_size)
        x = rearrange(x, "(B H W) (Hs Ws) C -> B C (H Hs) (W Ws)", H=num_windows[0], W=num_windows[1],
                      Hs=self.window_size, Ws=self.window_size)
        if self.shift_size > 0.:  # 如果偏移量shift_size>0.,则将偏移过的x调整回原来的位置
            x = torch.roll(x, shifts=(self.shift_size, self.shift_size), dims=(2, 3))

        x = rearrange(x, "B C H W -> B (H W) C", H=H, W=W)
        x = shortcut + self.droppath(x)  # 残差连接
        shortcut = x
        x = self.layernorm2(x)
        x = self.mlp(x)
        x = shortcut + self.droppath(x)  # 残差连接
        return x, H, W


class PatchMerging(nn.Module):  # Patch Merging
    def __init__(self, dim):
        super(PatchMerging, self).__init__()
        self.layernorm = nn.LayerNorm(4 * dim)
        self.linear = nn.Linear(4 * dim, 2 * dim, bias=False)

    def forward(self, input: tuple):
        # (B,L,C) --> (B,C,H,W) --> (B,4*C,H/2,W/2) --> (B,L/4,4*C) --> (B,L/4,2*C)
        x, H, W = input
        x = rearrange(x, "B (H W) C -> B C H W", H=H, W=W)
        x = torch.cat([x[:, :, 0::2, 0::2], x[:, :, 1::2, 0::2], x[:, :, 0::2, 1::2], x[:, :, 1::2, 1::2]], dim=1)
        _, _, H, W = x.shape
        x = rearrange(x, "B C H W -> B (H W) C")
        x = self.layernorm(x)
        x = self.linear(x)
        return x, H, W


class SwinHead(nn.Module):  # Swin Head,分类任务的Head
    def __init__(self, dim, num_classes):
        super(SwinHead, self).__init__()
        self.layernorm = nn.LayerNorm(dim)
        self.avgpool = nn.AdaptiveAvgPool1d(1)
        self.mlphead = nn.Linear(dim, num_classes)

    def forward(self, x):
        x = self.layernorm(x)
        x = rearrange(x, "B L C -> B C L")
        x = self.avgpool(x)
        return self.mlphead(x.squeeze())


class SwinTransformer(nn.Module):  # Swin Transformer
    def __init__(self, dims=(96, 192, 384, 768), num_blocks=(2, 2, 6, 2), num_heads=(3, 6, 12, 24),
                 num_classes=1000,
                 pos_drop_ratio=0., attn_drop_ratio=0., proj_drop_ratio=0., drop_path_ratio_max=0.1):
        super(SwinTransformer, self).__init__()
        self.patchembedding = PatchEmbedding(emb_dim=dims[0])
        self.pos_drop = nn.Dropout(pos_drop_ratio)
        # #drop path ratio从0递增至drop_path_ratio_max
        drop_path_ratio = [i.item() for i in torch.linspace(0, drop_path_ratio_max, sum(num_blocks))]
        self.blocks1 = nn.Sequential(
            *[SwinTransformerBlock(dims[0], num_heads[0], shift=(i % 2 != 0),
                                   attn_drop_ratio=attn_drop_ratio,
                                   proj_drop_ratio=proj_drop_ratio,
                                   drop_path_ratio=drop_path_ratio[i + sum(num_blocks[:0])])
              for以上是关于使用Pytorch实现Transformer,如何巧妙的使用或者停用 optimizer.zero_grad()来训练大模型?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用具有多维序列到序列的 PyTorch Transformer?

基于Pytorch,从头开始实现Transformer(编码器部分)

pytorch实现 vision_transformer

pytorch实现 vision_transformer

Swin Transformer模型——pytorch实现

Pytorch动手实现Transformer机器翻译