Transformer详解
Posted Twq
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Transformer详解相关的知识,希望对你有一定的参考价值。
1.理论知识讲解
transfromer这个模型在机器翻译方面就是做如下事情由一种语言到另一种语言
下图中六个encoder在结构上是完全相同的但是每个encoder的内部的参数不完全相同,也就是在训练的时候6个encoder都在训练,并不是一个在训练,然后其它五个去拷贝这个encoder,六个decoder的结构也是完全相同的;每个encoder和decoder是不一样的
1.1 Encoder
1.1.1输入部分
该部分分为两个小部分1⃣️Embedding
下图中有12个字,按字切分,然后每个字定义一个512维的字向量(可以使用word2vec或者随机初始化)
2⃣️位置嵌入
使用transformer为什么需要位置嵌入,因为在encoder中的第二个部分中多头注意力机制中是将输入的词并行输入的,并不是像rnn那样是一个词一个词的输入,但是这样并行输入有缺少了rnn那种词与词之间的先后关系,因此需要引入词位置的编码。
下图rnn是一个词一个词的输入,输入“我”,处理“我”,在输入“爱”,在接收“我”的信息之后再处理“爱”,这样保证词与词之间的先后顺序不会乱,但是一个词一个词的输入导致效率低
位置编码公式:
公式中的pos就是单词或者是字的位置,2i和2i+1分别对应,在偶数位置使用sin,在基数位置使用cos。对于爱这个字所在的位置pos,如果爱这个字对应的词向量为512维,则对于这512位的向量偶数位用cos来计算,基数位置用cos来计算
公式中的d_model它表示模型中每个输入和输出 token 的向量表示的维度大小。也就是说,模型中的每个单词都被表示为一个长度为 d_model 的向量,对于一下这个例子d_model的值为512,在我的代码中d_model的值为128。
由公式计算出位置编码之后,与原来的词向量进行相加得到最终整个transformer的输入。
1.1.2注意力机制
1⃣️基本的注意力机制
下图中,对于“婴儿在干嘛”这句话应该关注图片中的哪个区域,通过公式计算得到这句话应该更关注图片中的哪个位置
注意力机制的公式,其中Q,K,V,三个都是矩阵
2⃣️在transformer中怎么操作
x1与wq矩阵相乘的到q1矩阵,依次类推
计算attention值
Score的值等于,你当前关注的词的q,分别乘以这个句子中所有的词的k,就会得到句子中每个词对当前的词的打分结果
1.1.3残差和LayNorm
1⃣️残差
可以有效缓解梯度消失
1.1.3前馈神经网络
1.2decoder
在多头注意力机制中多了一个mask机制,表示将当前单词和之后的单词做mask
为什么使用mask的原因:
在 Transformer 模型中,为了生成下一个目标单词,解码器需要访问已经生成的单词和源序列的信息。在训练时,我们可以将目标序列的所有单词都输入到解码器中,并在每个时间步生成一个单词。但是,在生成过程中,我们需要逐步地生成目标序列中的单词,而在每个时间步中,我们只能访问当前时刻之前生成的单词。这意味着,在生成第 i 个单词时,我们不能使用第 i+1 个单词的信息。因此为了解决这一问题才引入了mask机制
k v矩阵是由encoder得到,而q矩阵是decoder中得到,encoder中得到的每个k v矩阵将会和decoder中的q矩阵进行交互
下图讲述了mask的过程,一行一行的看,当输入的s的时候就只能看到s看不到 后面的“卷”,“起”,“来”,因为此时这三个字的mask为1,当输入“卷”的时候就只能看到“S”和“卷”,看不到后面的两个字,这也是代码中上三角矩阵的作用
2.代码部分讲解
import torch
import numpy as np
from config import ngpu, device
# 计算角度:pos * 1/(10000^(2i/d))
def get_angles(pos, i, d_model):
# 2*(i//2)保证了2i,这部分计算的是1/10000^(2i/d)
angle_rates = 1 / np.power(10000, 2 * (i // 2) / np.float32(d_model)) # => [1, 512]
return pos * angle_rates # [50,1]*[1,512]=>[50, 512]
# np.arange()函数返回一个有终点和起点的固定步长的排列,如[1,2,3,4,5],起点是1,终点是5,步长为1
# 注意:起点终点是左开右闭区间,即start=1,end=6,才会产生[1,2,3,4,5]
# 只有一个参数时,参数值为终点,起点取默认值0,步长取默认值1。
def positional_encoding(position, d_model): #d_model是位置编码的长度,相当于position encoding的embedding_dim?
angle_rads = get_angles(np.arange(position)[:, np.newaxis], # [50, 1]
np.arange(d_model)[np.newaxis, :], # [1, d_model=512]
d_model)
angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2]) #从0开始步长为2,2i
angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2]) #从1开始步长为2,2i+1
pos_encoding = angle_rads[np.newaxis, ...] #[50, 512] => [1,50,512]
return torch.tensor(pos_encoding, dtype=torch.float32)
pos_encoding = positional_encoding(50, 512)
pad = 1 #重要!
def create_padding_mask(seq): # seq [b, seq_len]
# seq = torch.eq(seq, torch.tensor(0)).float() # pad=0的情况
seq = torch.eq(seq, torch.tensor(pad)).float() # pad!=0
return seq[:, np.newaxis, np.newaxis, :] # =>[b, 1, 1, seq_len]
def create_look_ahead_mask(size): # seq_len
mask = torch.triu(torch.ones((size, size)), diagonal=1)
# mask = mask.device() #
return mask # [seq_len, seq_len]
def scaled_dot_product_attention(q, k, v, mask=None):
"""
#计算注意力权重。
q, k, v 必须具有匹配的前置维度。 且dq=dk
k, v 必须有匹配的倒数第二个维度,例如:seq_len_k = seq_len_v。
#虽然 mask 根据其类型(填充或前瞻)有不同的形状,
#但是 mask 必须能进行广播转换以便求和。
#参数:
q: 请求的形状 == (..., seq_len_q, depth)
k: 主键的形状 == (..., seq_len_k, depth)
v: 数值的形状 == (..., seq_len_v, depth_v) seq_len_k = seq_len_v
mask: Float 张量,其形状能转换成
(..., seq_len_q, seq_len_k)。默认为None。
#返回值:
#输出,注意力权重
"""
# matmul(a,b)矩阵乘:a b的最后2个维度要能做乘法,即a的最后一个维度值==b的倒数第2个纬度值,
# 除此之外,其他维度值必须相等或为1(为1时会广播)
matmul_qk = torch.matmul(q, k.transpose(-1, -2)) # 矩阵乘 =>[..., seq_len_q, seq_len_k]
# 缩放matmul_qk
dk = torch.tensor(k.shape[-1], dtype=torch.float32) # k的深度dk,或叫做depth_k
scaled_attention_logits = matmul_qk / torch.sqrt(dk) # [..., seq_len_q, seq_len_k]
# 将 mask 加入到缩放的张量上(重要!)
if mask is not None: # mask: [b, 1, 1, seq_len]
# mask=1的位置是pad,乘以-1e9(-1*10^9)成为负无穷,经过softmax后会趋于0
scaled_attention_logits += (mask * -1e9)
# softmax 在最后一个轴(seq_len_k)上归一化
attention_weights = torch.nn.functional.softmax(scaled_attention_logits, dim=-1) # [..., seq_len_q, seq_len_k]
output = torch.matmul(attention_weights, v) # =>[..., seq_len_q, depth_v]
return output, attention_weights # [..., seq_len_q, depth_v], [..., seq_len_q, seq_len_k]
class MultiHeadAttention(torch.nn.Module):
def __init__(self, d_model, num_heads):
super(MultiHeadAttention, self).__init__()
self.num_heads = num_heads
self.d_model = d_model
assert d_model % self.num_heads == 0 #因为输入要被(平均?)split到不同的head
self.depth = d_model//self.num_heads #512/8=64,所以在scaled dot-product atten中dq=dk=64,dv也是64
self.wq = torch.nn.Linear(d_model, d_model)
self.wk = torch.nn.Linear(d_model, d_model)
self.wv = torch.nn.Linear(d_model, d_model)
self.final_linear = torch.nn.Linear(d_model, d_model)
def split_heads(self, x, batch_size): # x [b, seq_len, d_model]
x = x.view(batch_size, -1, self.num_heads,
self.depth) # [b, seq_len, d_model=512]=>[b, seq_len, num_head=8, depth=64]
return x.transpose(1, 2) # [b, seq_len, num_head=8, depth=64]=>[b, num_head=8, seq_len, depth=64]
def forward(self, q, k, v, mask): # q=k=v=x [b, seq_len, embedding_dim] embedding_dim其实也=d_model
batch_size = q.shape[0]
q = self.wq(q) # => [b, seq_len, d_model]
k = self.wk(k) # => [b, seq_len, d_model]
v = self.wv(v) # => [b, seq_len, d_model]
q = self.split_heads(q, batch_size) # => [b, num_head=8, seq_len, depth=64]
k = self.split_heads(k, batch_size) # => [b, num_head=8, seq_len, depth=64]
v = self.split_heads(v, batch_size) # => [b, num_head=8, seq_len, depth=64]
scaled_attention, attention_weights = scaled_dot_product_attention(q, k, v, mask)
# => [b, num_head=8, seq_len_q, depth=64], [b, num_head=8, seq_len_q, seq_len_k]
scaled_attention = scaled_attention.transpose(1, 2) # =>[b, seq_len_q, num_head=8, depth=64]
# 转置操作让张量存储结构扭曲,直接使用view方法会失败,可以使用reshape方法
concat_attention = scaled_attention.reshape(batch_size, -1, self.d_model) # =>[b, seq_len_q, d_model=512]
output = self.final_linear(concat_attention) # =>[b, seq_len_q, d_model=512]
return output, attention_weights # [b, seq_len_q, d_model=512], [b, num_head=8, seq_len_q, seq_len_k]
# 点式前馈网络
def point_wise_feed_forward_network(d_model, dff):
feed_forward_net = torch.nn.Sequential(
torch.nn.Linear(d_model, dff), # [b, seq_len, d_model]=>[b, seq_len, dff=2048]
torch.nn.ReLU(),
torch.nn.Linear(dff, d_model), # [b, seq_len, dff=2048]=>[b, seq_len, d_model=512]
)
return feed_forward_net
class EncoderLayer(torch.nn.Module):
def __init__(self, d_model, num_heads, dff, rate=0.1):
super(EncoderLayer, self).__init__()
self.mha = MultiHeadAttention(d_model, num_heads) # 多头注意力(padding mask)(self-attention)
self.ffn = point_wise_feed_forward_network(d_model, dff)#前馈神经网络层
self.layernorm1 = torch.nn.LayerNorm(normalized_shape=d_model, eps=1e-6)
self.layernorm2 = torch.nn.LayerNorm(normalized_shape=d_model, eps=1e-6)
self.dropout1 = torch.nn.Dropout(rate)
self.dropout2 = torch.nn.Dropout(rate)
# x [b, inp_seq_len, embedding_dim] embedding_dim其实也=d_model
# mask [b,1,1,inp_seq_len]
def forward(self, x, mask):
attn_output, _ = self.mha(x, x, x, mask) # =>[b, seq_len, d_model] self-attention
attn_output = self.dropout1(attn_output)
out1 = self.layernorm1(x + attn_output) # 残差&层归一化 =>[b, seq_len, d_model] 残差网络思路
ffn_output = self.ffn(out1) # =>[b, seq_len, d_model]
ffn_output = self.dropout2(ffn_output)
out2 = self.layernorm2(out1 + ffn_output) # 残差&层归一化 =>[b, seq_len, d_model]
return out2 # [b, seq_len, d_model]
class DecoderLayer(torch.nn.Module):
def __init__(self, d_model, num_heads, dff, rate=0.1):
super(DecoderLayer, self).__init__()
self.mha1 = MultiHeadAttention(d_model,
num_heads) # masked的多头注意力(look ahead mask 和 padding mask)(self-attention)
self.mha2 = MultiHeadAttention(d_model, num_heads) # 多头注意力(padding mask)(encoder-decoder attention)
self.ffn = point_wise_feed_forward_network(d_model, dff)
self.layernorm1 = torch.nn.LayerNorm(normalized_shape=d_model, eps=1e-6)
self.layernorm2 = torch.nn.LayerNorm(normalized_shape=d_model, eps=1e-6)
self.layernorm3 = torch.nn.LayerNorm(normalized_shape=d_model, eps=1e-6)
self.dropout1 = torch.nn.Dropout(rate)
self.dropout2 = torch.nn.Dropout(rate)
self.dropout3 = torch.nn.Dropout(rate)
# x [b, targ_seq_len, embedding_dim] embedding_dim其实也=d_model=512
# look_ahead_mask [b, 1, targ_seq_len, targ_seq_len] 这里传入的look_ahead_mask应该是已经结合了look_ahead_mask和padding mask的mask
# enc_output [b, inp_seq_len, d_model]
# padding_mask [b, 1, 1, inp_seq_len]
def forward(self, x, enc_output, look_ahead_mask, padding_mask):
attn1, attn_weights_block1 = self.mha1(x, x, x, look_ahead_mask) # =>[b, targ_seq_len, d_model], [b, num_heads, targ_seq_len, targ_seq_len] self-attention,训练阶段用于编码结果
attn1 = self.dropout1(attn1)
out1 = self.layernorm1(x + attn1) # 残差&层归一化 [b, targ_seq_len, d_model]
# Q: receives the output from decoder\'s first attention block,即 masked multi-head attention sublayer
# K V: V (value) and K (key) receive the encoder output as inputs
attn2, attn_weights_block2 = self.mha2(out1, enc_output, enc_output,
padding_mask) # =>[b, targ_seq_len, d_model], [b, num_heads, targ_seq_len, inp_seq_len]
attn2 = self.dropout2(attn2)
out2 = self.layernorm2(out1 + attn2) # 残差&层归一化 [b, targ_seq_len, d_model]
ffn_output = self.ffn(out2) # =>[b, targ_seq_len, d_model]
ffn_output = self.dropout3(ffn_output)
out3 = self.layernorm3(out2 + ffn_output) # 残差&层归一化 =>[b, targ_seq_len, d_model]
return out3, attn_weights_block1, attn_weights_block2
#[b, targ_seq_len, d_model], [b, num_heads, targ_seq_len, targ_seq_len], [b, num_heads, targ_seq_len, inp_seq_len]
class Encoder(torch.nn.Module):
def __init__(self,
num_layers, # N个encoder layer
d_model, # 当前网络的维度
num_heads,
dff, # 点式前馈网络内层fn的维度
input_vocab_size, # 输入词表大小(源语言(法语))
maximun_position_encoding,
rate=0.1):
super(Encoder, self).__init__()
self.num_layers = num_layers
self.d_model = d_model
self.embedding = torch.nn.Embedding(num_embeddings=input_vocab_size, embedding_dim=d_model)#可以对应看transformer的图,这个是编码器的词向量层
self.pos_encoding = positional_encoding(maximun_position_encoding,
d_model).to(device) #在这个相当于关系图中编码的词嵌入层 =>[1, max_pos_encoding, d_model=512] # 相当于作者手动实现了pos_emb
# self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate).cuda() for _ in range(num_layers)] # 不行
#对num_layers个encoder进行堆叠
self.enc_layers = torch.nn.ModuleList([EncoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)])
self.dropout = torch.nn.Dropout(rate)
# x [b, inp_seq_len]
# mask [b, 1, 1, inp_sel_len]
def forward(self, x, mask):
inp_seq_len = x.shape[-1]
# adding embedding and position encoding
x = self.embedding(x) # [b, inp_seq_len]=>[b, inp_seq_len, d_model]
# 缩放 embedding 原始论文的3.4节有提到: In the embedding layers, we multiply those weights by \\sqrtd_model.
x *= torch.sqrt(torch.tensor(self.d_model, dtype=torch.float32))
pos_encoding = self.pos_encoding[:, :inp_seq_len, :]
#pos_encoding = pos_encoding.cuda() #调用了显卡资源
x += pos_encoding #将得到的位置编码于原来数据的输入进行相加得到最终的输入 [b, inp_seq_len, d_model]
x = self.dropout(x)
for i in range(self.num_layers):#将每一层decoder的输出当作下一层decoder的输入
x = self.enc_layers[i](x, mask) # [b, inp_seq_len, d_model]=>[b, inp_seq_len, d_model]
return x # [b, inp_seq_len, d_model]
class Decoder(torch.nn.Module):
def __init__(self,
num_layers, # N个encoder layer
d_model,
num_heads,
dff, # 点式前馈网络内层fn的维度
target_vocab_size, # target词表大小(目标语言(英语))
maximun_position_encoding,
rate=0.1):
super(Decoder, self).__init__()
self.num_layers = num_layers
self.d_model = d_model
self.embedding = torch.nn.Embedding(num_embeddings=target_vocab_size, embedding_dim=d_model)
self.pos_encoding = positional_encoding(maximun_position_encoding,
d_model).to(device) # =>[1, max_pos_encoding, d_model=512]
# self.dec_layers = [DecoderLayer(d_model, num_heads, dff, rate).cuda() for _ in range(num_layers)] # 不行
self.dec_layers = torch.nn.ModuleList([DecoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)]) # decoder比encoder多了一组attention层
self.dropout = torch.nn.Dropout(rate)
# x [b, targ_seq_len]
# look_ahead_mask [b, 1, targ_seq_len, targ_seq_len] 这里传入的look_ahead_mask应该是已经结合了look_ahead_mask和padding mask的mask
# enc_output [b, inp_seq_len, d_model]
# padding_mask [b, 1, 1, inp_seq_len]
def forward(self, x, enc_output, look_ahead_mask, padding_mask):
targ_seq_len = x.shape[-1]
attention_weights =
# adding embedding and position encoding
x = self.embedding(x) # [b, targ_seq_len]=>[b, targ_seq_len, d_model]
# 缩放 embedding 原始论文的3.4节有提到: In the embedding layers, we multiply those weights by \\sqrtd_model.
x *= torch.sqrt(torch.tensor(self.d_model, dtype=torch.float32))
# x += self.pos_encoding[:, :targ_seq_len, :] # [b, targ_seq_len, d_model]
pos_encoding = self.pos_encoding[:, :targ_seq_len, :] # [b, targ_seq_len, d_model]
#pos_encoding = pos_encoding.cuda() #调用显卡资源
x += pos_encoding # [b, inp_seq_len, d_model]
x = self.dropout(x)
for i in range(self.num_layers):
x, attn_block1, attn_block2 = self.dec_layers[i](x, enc_output, look_ahead_mask, padding_mask) # 因为是解码器,需要将编码器的输出结果enc_output传入
# => [b, targ_seq_len, d_model], [b, num_heads, targ_seq_len, targ_seq_len], [b, num_heads, targ_seq_len, inp_seq_len]
attention_weights[f\'decoder_layeri + 1_block1\'] = attn_block1
attention_weights[f\'decoder_layeri + 1_block2\'] = attn_block2
return x, attention_weights
class Transformer(torch.nn.Module):
def __init__(self,
num_layers, # N个encoder layer
d_model,
num_heads,
dff, # 点式前馈网络内层fn的维度
input_vocab_size, # input此表大小(源语言(法语))
target_vocab_size, # target词表大小(目标语言(英语))
pe_input, # input max_pos_encoding
pe_target, # input max_pos_encoding
rate=0.1):
super(Transformer, self).__init__()
self.encoder = Encoder(num_layers,# Encoder中的EncoderLayer层数
d_model,#模型的维度大小,即每个词语的向量维度大小
num_heads,#多头注意力机制中head的数量。
dff,#前向网络中的隐层大小
input_vocab_size,
pe_input,
rate)
self.decoder = Decoder(num_layers,
d_model,
num_heads,
dff,
target_vocab_size,
pe_target,
rate)
self.final_layer = torch.nn.Linear(d_model, target_vocab_size)
# inp [b, inp_seq_len]
# targ [b, targ_seq_len]
# enc_padding_mask [b, 1, 1, inp_seq_len]
# look_ahead_mask [b, 1, targ_seq_len, targ_seq_len]
# dec_padding_mask [b, 1, 1, inp_seq_len] # 注意这里的维度是inp_seq_len
def forward(self, inp, targ, enc_padding_mask, look_ahead_mask, dec_padding_mask):
enc_output = self.encoder(inp, enc_padding_mask) # =>[b, inp_seq_len, d_model]
dec_output, attention_weights = self.decoder(targ, enc_output, look_ahead_mask, dec_padding_mask)
# => [b, targ_seq_len, d_model],
# \'..block1\': [b, num_heads, targ_seq_len, targ_seq_len],
# \'..block2\': [b, num_heads, targ_seq_len, inp_seq_len], ...
final_output = self.final_layer(dec_output) # =>[b, targ_seq_len, target_vocab_size]
return final_output, attention_weights
以上是关于Transformer详解的主要内容,如果未能解决你的问题,请参考以下文章