Transformer:Embedding
《Attention Is All You Need》里的Transformer架构
先从 Transformer 的 Embedding 部分写起。当我们输入文本到 Transformer 网络中时,模型接收的是存储着数值的向量而不是文字,因而第一步要做的事就是将这个 text 转化为词向量。
1. Tokenizer 处理得到词索引
假定我们输入的 text 文本是一个包含许多单词的句子,我们需要想办法用数值表达这个句子,一种做法就是对每一个单词取一个唯一的索引值,用这个数值来表示这个句子。这些索引映射关系存起来就是词表。
- 当然分词粒度也不一定就是一个单词。只是说单词分词法(word base)最直观。单词分词法将一个word作为最小元,也就是根据空格或者标点分词。最详尽的分词是单字分词法(character-base)。单字分词法会穷举所有出现的字符,所以是最完整的。另外还有一种最常用的、介于两种方法之间的分词法叫子词分词法,会把一个句子分成最小可分的子词例如[‘To’, ‘day’, ‘is’, ‘S’, ‘un’, ‘day’]。
比如 text = “Today is a nice day!”,token = tokenizer(text),那么处理后得到的 tokens 就是一个字典,其中包含了 “input_ids” 词索引序列:’input_ids’: [[101, 2769, 3221, …, 102, 0, 0]]。0 是 padding 的填充项(补全 seq length 长度)
2. Token_Embedding 词嵌入处理
经过 tokenizer 分词处理后,我们的输入 x 就变为一个形状 [batch_size, seq_len] 的向量,例如 [[101, 2054, 2003, 2026, 3793, 102], [101, 2054, 2064, 2017, 102, 0]] 表示输入 batch_size = 2 两个句子。
那么 embedding 操作就是把输入向量升维。我们输入的 x 对一个单词的表示现在相当于是一个索引值,而我们要把每一个单词都变成一个唯一的向量模型才能学习到其空间嵌入特征。具体而言,假设我们有以下配置:
批次大小 (batch_size) = 2
序列长度 (seq_len) = 4
嵌入维度 (d_model) = 6
词汇表大小 (vocab_size) = 10000
x = [
[101, 2054, 2003, 102], # 第一个句子的词索引
[101, 5243, 3122, 102] # 第二个句子的词索引
]
嵌入后的张量可能就是这样,形状变为 [2,4,6] (batch_size, seq_len, d_model)
token_embedding = [
[ # 第一个句子
[0.1, -0.2, 0.3, 0.1, -0.5, 0.7], # 词索引101的嵌入向量
[0.5, 0.2, -0.3, 0.4, 0.1, -0.2], # 词索引2054的嵌入向量
[-0.3, 0.1, 0.5, -0.2, 0.4, 0.3], # 词索引2003的嵌入向量
[0.2, -0.4, -0.1, 0.5, 0.3, 0.1] # 词索引102的嵌入向量
],
[ # 第二个句子
[0.1, -0.2, 0.3, 0.1, -0.5, 0.7], # 词索引101的嵌入向量
[0.4, 0.3, 0.2, -0.1, -0.3, 0.5], # 词索引5243的嵌入向量
[-0.2, 0.5, 0.1, 0.3, -0.4, 0.2], # 词索引3122的嵌入向量
[0.2, -0.4, -0.1, 0.5, 0.3, 0.1] # 词索引102的嵌入向量
]
]
嵌入转化的原理实际就是一个查找表。我们首先把每个单词的词索引转化成 one-hot 向量,那么可以想象这个向量维数应该很大,因此再降维。从每个单词的 one-hot 向量经过 Embedding 矩阵得到降维后的结果。比如有三个单词, one-hot 处理后的结果是 $36$ 大小的向量,那么经过一个 $64$ 大小的权重矩阵就可以乘出 $3*4$ 大小的结果从而降维(d_model = 4)。因此这个嵌入矩阵的形状就应该是[vocab_size, d_model] 大小。
继承 torch 里的 Embedding 类,forward 的时候把输入 x 转换为嵌入向量。具体实现:
# 将输入的词汇表索引转换为指定维度的Embedding
# 每个token的词索引升维到d_model维,padding_idx=1表示填充词的索引为1
# 继承nn.Embedding在训练中前向传播,反向传播,更新参数
class TokenEmbedding(nn.Embedding):
def __init__(self, vocab_size, d_model):
super(TokenEmbedding, self).__init__(vocab_size, d_model, padding_idx=1)
2. Position_Embedding 位置编码
Transformer 架构引入了位置编码,为每个单词向量生成一个固定的位置向量,来表达其位置相对关系。
这部分是固定计算的,对于每个 seq_len 长度的句子输入,会返回一个 [seq_len, d_model] 形状即符合当前序列长度的位置编码。把词嵌入和位置编码相加才是最终的词向量。
# 通过位置编码计算输入每个词生成的正弦余弦位置编码
# 创建的是固定不变的位置编码,在训练中不更新,直接基于公式计算这个序列长度的位置编码矩阵
class PositionalEmbedding(nn.Module):
def __init__(self, d_model, max_len, device):
super(PositionalEmbedding, self).__init__()
# 初始化一个大小为(max_len, d_model)的零矩阵
self.encoding = torch.zeros(max_len, d_model, device=device)
self.encoding.requires_grad = False
pos = torch.arange(0, max_len, device=device)
pos = pos.float().unsqueeze(dim=1)
_2i = torch.arange(0, d_model, step=2, device=device).float()
self.encoding[:, 0::2] = torch.sin(pos / (10000 ** (_2i / d_model)))
self.encoding[:, 1::2] = torch.cos(pos / (10000 ** (_2i / d_model)))
def forward(self, x):
# x 形状: [batch_size, seq_len],也就是词索引
batch_size, seq_len = x.size()
# 返回适合当前序列长度的位置编码
return self.encoding[:seq_len, :]
3. Transformer_Embedding
最后把词嵌入和位置编码合并,即直接相加。(这里在嵌入层神经网络应用一个 dropout 防止过拟合)
# 嵌入层的输入是经过tokenizer处理后的词索引,输出是词的Embedding和位置编码
class TransformerEmbedding(nn.Module):
def __init__(self, vocab_size, d_model, max_len, drop_prob, device):
super(TransformerEmbedding, self).__init__()
self.token_embedding = TokenEmbedding(vocab_size, d_model)
self.positional_embedding = PositionalEmbedding(d_model, max_len, device)
self.dropout = nn.Dropout(p=drop_prob)
def forward(self, x):
token_embedding = self.token_embedding(x)
positional_embedding = self.positional_embedding(x)
return self.dropout(token_embedding + positional_embedding)
Transformer:Multi-Attention
《Attention Is All You Need》里的Transformer架构
1. Self-Attention
自注意力的作用:随着模型处理输入序列的每个单词,自注意力会关注整个输入序列的所有单词,帮助模型对本单词更好地进行编码。在处理过程中,自注意力机制会将对所有相关单词的理解融入到我们正在处理的单词中。更具体的功能如下:
序列建模:自注意力可以用于序列数据(例如文本、时间序列、音频等)的建模。它可以捕捉序列中不同位置的依赖关系,从而更好地理解上下文。这对于机器翻译、文本生成、情感分析等任务非常有用。
并行计算:自注意力可以并行计算,这意味着可以有效地在现代硬件上进行加速。相比于RNN和CNN等序列模型,它更容易在GPU和TPU等硬件上进行高效的训练和推理。(因为在自注意力中可以并行的计算得分)
长距离依赖捕捉:传统的循环神经网络(RNN)在处理长序列时可能面临梯度消失或梯度爆炸的问题。自注意力可以更好地处理长距离依赖关系,因为它不需要按顺序处理输入序列。
自注意力的计算:
Self-Attention 是通过输入本身的序列作为 Q 和 K 计算注意力分数的。从每个编码器的输入向量(每个单词的词向量,即Embedding,可以是任意形式的词向量,比如说word2vec,GloVe,one-hot编码)中生成三个向量,即查询向量、键向量和一个值向量。(这三个向量是通过词嵌入与三个权重矩阵相乘后创建出来的)
参考 CSDN @告白气球 的这张图:
经过上一节 Embedding 层后的嵌入输出维度为 (batch_size, seq_len, d_model),通过 nn.Linear 线性变换为 k, q, v 向量。然后经过注意力分数计算的公式即可得到注意力 score:q 和 k 先相乘,得到 (batch_size, num_heads, seq_len, seq_len) 的score矩阵,矩阵表示序列中每个位置对所有其他位置的注意力权重;softmax 之后与 v 相乘,score 的形状变为 **[batch_size, num_heads, seq_len, n_d]**。
2. Multi-Head
self-attention只是使用了一组 WQ、WK、WV 来进行变换得到查询、键、值矩阵,而 Multi-Head Attention 使用多组WQ,WK,WV得到多组查询、键、值矩阵,然后每组分别计算得到一个 Z 矩阵。
将所有注意力头 Z 矩阵拼接起来,得到的就是合并了所有头的注意力信息。展平 num_heads 维度,得到 [batch_size, seq_len, d_model] 形状的 score。
最后用一个附加的权重矩阵 W^O 乘以 score,将展平的score通过线性层投影回 d_model 维度,并加上偏置。这个线性变换将合并后的表示投影到一个更有意义的空间。
总结整个流程就是:
最后附上带有详细注释的代码,流程应该很清楚:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import torch.nn.functional as F
import numpy as np
import random
import math
from torch import Tensor
# batch_size, seq_len, d_model
x = torch.rand(128, 32, 512)
class MultiHeadAttention(nn.Module):
def __init__(self, d_model: int, num_heads: int):
super(MultiHeadAttention, self).__init__()
self.d_model = d_model
self.num_heads = num_heads
# 线性层的权重矩阵形状为(d_model, d_model)
# 输入的形状为(batch_size, seq_len, d_model),线性变换(wx+b)后得到k,q,v
# 不过一般新向量在维度上往往比词嵌入向量更低
# W_q:学习"我应该关注什么"的特征表示
# W_k:学习"我能提供什么信息"的特征表示
# W_v:学习"我的实际内容是什么"的特征表示
self.w_q = nn.Linear(d_model, d_model)
self.w_k = nn.Linear(d_model, d_model)
self.w_v = nn.Linear(d_model, d_model)
self.w_o = nn.Linear(d_model, d_model)
self.softmax = nn.Softmax(dim=-1)
def forward(self, q, k, v, mask=None):
batch_size, seq_len, d_model = q.size()
n_d = self.d_model // self.num_heads
# 将输入向量x线性变换为q,k,v
q,k,v = self.w_q(q), self.w_k(k), self.w_v(v)
# 这里通过view操作将d_model维度拆分为num_heads个头,每个头的大小为n_d
# 然后通过permute操作将头维度移到前面,变成(batch_size, num_heads, seq_len, n_d)
# 目的:1.减少计算量,并行计算;2.每个头可以关注不同特征子空间
q = q.view(batch_size, seq_len, self.num_heads, n_d).permute(0, 2, 1, 3)
k = k.view(batch_size, seq_len, self.num_heads, n_d).permute(0, 2, 1, 3)
v = v.view(batch_size, seq_len, self.num_heads, n_d).permute(0, 2, 1, 3)
# 这里k.transpose(2,3)将k的维度从(batch_size, num_heads, seq_len, n_d)变为(batch_size, num_heads, n_d, seq_len)
# 这样q和k相乘后,得到(batch_size, num_heads, seq_len, seq_len)的score矩阵
# 矩阵表示序列中每个位置对所有其他位置的注意力权重
# score[b, h, i, j] 表示在批次 b 的第 h 个头中,位置 i 对位置 j 的注意力分数
score = q@k.transpose(2,3) / math.sqrt(n_d)
# 将 mask 中值为0的位置在 score 中填充为一个很大的负数 -1e9
# 在编码器自注意力中,通常不使用掩码,允许每个位置关注所有位置
# 在解码器自注意力中,使用掩码确保每个位置只能关注其前面的位置
# 在填充掩码中,用于忽略填充标记的影响
if mask is not None:
score = score.masked_fill(mask == 0, -1e9)
# self.softmax(score) 将注意力分数转换为概率分布(每行和为1)
# score 的形状变为 [batch_size, num_heads, seq_len, n_d]
score = self.softmax(score)@v
# 将 score 的维度转换为 [batch_size, seq_len, num_heads, n_d]
# 然后展平 num_heads 维度,得到 [batch_size, seq_len, d_model]
score = score.permute(0,2,1,3).contiguous().view(batch_size, seq_len, d_model)
# 将展平的score通过线性层投影回d_model维度,并加上偏置
# 这个线性变换将合并后的表示投影到一个更有意义的空间
out = self.w_o(score)
return out
Transformer:Encoder & Decoder
《Attention Is All You Need》里的Transformer架构
1. Encoder
Encoder Layer 由一个多头注意力层和一个前馈网络层组成。输入的嵌入向量经过多头注意力机制捕获语义特征长距离依赖,然后经过一层残差连接+归一化,通过前馈网络层映射到高维空间捕获多维特征,最后再次残差连接+归一化输出。
分析一下 Encoder 部分的张量变化:
【Embedding】
- 输入张量形状:[batch_size, seq_length]
- Embedding 变换后:[batch_size, seq_length, d_model]
【Multi-Attention】
- 将输入分为 num_heads 个头,Q、K、V 矩阵形状:[batch_size, seq_length, d_model / num_heads]
- 注意力计算后合并注意力:[batch_size, seq_length, d_model]
【Add & Norm】
将注意力输出与原始输入相加通过归一化层,计算残差损失
输出形状保持:[batch_size, seq_length, d_model]
【FFN】
- 线性映射到高维(hidden_dim) + ReLU激活 + 线性映射回低维(d_model)
最终输出:[batch_size, seq_length, d_model],也就是和 Embedding 变换后的维度对齐。
借助 pytorch 的代码实现:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import torch.nn.functional as F
import numpy as np
import random
import math
from torch import Tensor
import os
import multi_attention
import layernorm
import embedding
class PositionwiseFeedForward(nn.Module):
def __init__(self, d_model, hidden_dim, dropout=0.1):
super(PositionwiseFeedForward, self).__init__()
self.fc1 = nn.Linear(d_model, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
x = self.fc1(x)
x = F.relu(x)
x = self.dropout(x)
x = self.fc2(x)
return x
class EncoderLayer(nn.Module):
def __init__(self, d_model, hidden_dim, num_heads, dropout=0.1):
super(EncoderLayer, self).__init__()
self.attn = multi_attention.MultiHeadAttention(d_model, num_heads)
self.norm1 = layernorm.LayerNorm(d_model)
self,dropout1 = nn.Dropout(dropout)
self.ffn = PositionwiseFeedForward(d_model, hidden_dim, dropout)
self.norm2 = layernorm.LayerNorm(d_model)
self.dropout2 = nn.Dropout(dropout)
def forward(self, x, mask=None):
_x = x
x = self.attn(x, x, x, mask)
x = self.dropout1(x)
# 残差连接
x = self.norm1(x + _x)
_x = x
x = self.ffn(x)
x = self.dropout2(x)
x = self.norm2(x + _x)
return x
class Encoder(nn.Module):
def __init__(self, voc_size, max_len, d_model, hidden_dim, num_heads, num_layers, dropout=0.1, device=None):
super(Encoder, self).__init__()
self.embed = embedding.Embedding(voc_size, d_model, max_len, dropout, device)
self.layers = nn.ModuleList([EncoderLayer(d_model, hidden_dim, num_heads, dropout) for _ in range(num_layers)])
def forward(self, x, mask=None):
x = self.embed(x)
for layer in self.layers:
x = layer(x, mask)
return x
2. Decoder
解码器包含自注意力与交叉注意力两部分。
编码器的输出被用作交叉注意力部分的 K 和 V 矩阵(键值向量),Q 矩阵(查询向量)则是自注意力层的输出。交叉注意力层会对编码结果进行注意力调整,计算编码结果与解码输入间的注意关系,以获得与当前解码位置相关的编码器信息。
在 Multi-Attention 里我们可以看到注意力分数可以应用对应的 mask,mask 矩阵对应值为0的部分,在注意力矩阵将其值设定为无穷小。这样在 Softmax 操作后,这些部分对应的注意力分数就变为0,不被注意到。
在 Encoder 里面我们只有 **padding mask (s_mask)**,用来标识输入序列的最大长度,不够最大长度的部分补0。而在 Decoder 里还存在一种 mask,也就是 **sequence mask (t_mask)**,用于解码器的自注意力层,确保预测第 i 个位置的token时只能看到位置 i 之前的信息。
前馈网络层和编码器实现一致。每一层后面都添加一层 Add&Norm 与 dropout。
最终输出经过一层线性变换,然后Softmax得到输出的概率分布(softmax层会把向量变成概率),然后通过词典,输出概率最大的对应的单词作为我们的预测输出。
借助 pytorch 的代码实现:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import torch.nn.functional as F
import numpy as np
import random
import math
from torch import Tensor
import os
import multi_attention
import layernorm
import embedding
class DecoderLayer(nn.Module):
def __init__(self, d_model, ffn_hidden, num_heads, dropout=0.1):
super(DecoderLayer, self).__init__()
self.attn1 = multi_attention.MultiHeadAttention(d_model, num_heads)
self.norm1 = layernorm.LayerNorm(d_model)
self.dropout1 = nn.Dropout(dropout)
self.cross_attn = multi_attention.MultiHeadAttention(d_model, num_heads)
self.norm2 = layernorm.LayerNorm(d_model)
self.dropout2 = nn.Dropout(dropout)
self.ffn = embedding.PositionwiseFeedForward(d_model, ffn_hidden, dropout)
self.norm3 = layernorm.LayerNorm(d_model)
self.dropout3 = nn.Dropout(dropout)
# t_mask是目标序列的掩码,s_mask是源序列的掩码
# enc是编码器输出,dec是解码器输出
def forward(self, dec, enc, t_mask, s_mask):
_x = dec
x = self.attn1(_x, _x, _x, t_mask)
x = self.dropout1(x)
x = self.norm1(x + _x)
_x = x
x = self.cross_attn(x, enc, enc, s_mask)
x = self.dropout2(x)
x = self.norm2(x + _x)
x = self.ffn(x)
x = self.dropout3(x)
x = self.norm3(x + _x)
return x
class Decoder(nn.Module):
def __init__(self, dec_voc_size, max_len, d_model, hidden_dim, num_heads, num_layers, dropout=0.1, device=None):
super(Decoder, self).__init__()
self.embed = embedding.TransformerEmbedding(dec_voc_size, d_model, max_len, dropout, device)
self.layers = nn.ModuleList([DecoderLayer(d_model, hidden_dim, num_heads, dropout) for _ in range(num_layers)])
self.fc = nn.Linear(d_model, dec_voc_size)
def forward(self, dec, enc, t_mask, s_mask):
dec = self.embed(dec)
for layer in self.layers:
dec = layer(dec, enc, t_mask, s_mask)
return self.fc(dec)
Transformer:Conbination Transformer Layers
《Attention Is All You Need》里的Transformer架构
之前已经写好了 transformer 的各个模块,这里最后合并一下各个组件,组合成最后的 transformer layer!
__init__() 参数说明
- src_pad_idx:源序列表示 padding 的 idx
- tgt_pad_idx:目标序列表示 padding 的 idx
- src_voc_size:源语言词汇表大小
- tgt_voc_size:目标语言词汇表大小
- max_len:序列最大长度
- d_model:词嵌入维度
- hidden_dim:前馈神经网络隐藏层维度
- num_heads:多头注意力的头数
- num_layers:编码器和解码器的层数
- dropout=0.1:dropout比率
- device=None:使用设备
掩码mask预处理
我们知道,在 Encoder 和 Decoder 里,mask 操作是必须的一环。mask 种类有两种,一种是处理填充序列的 padding mask,一种是处理因果序列的 casual mask,其中 padding mask 在编码器-解码器的 self-attention 和 cross-attention 里都会用到,而 casual mask 则用于解码器的 self-attention 部分。
casual mask 的定义:对于每个序列,我们通过定义一个对应的下三角矩阵作为 mask。在计算序列第n个词的 attention score 的时候,只有前 n-1 个 token 参与计算。
padding mask 的定义:二值矩阵,等于 pad_idx 的部分记为 0,不等于的部分记为 1.
需要注意的是,mask 的掩码操作在注意力矩阵应用 softmax 操作之前。这个流程是:
- Q 查询矩阵与 K 键矩阵转置相乘,[batch_size, num_heads, seq_q, d_k] 与 [batch_size, num_heads, d_k, seq_k] 相乘,得到的矩阵形状为 [batch_size, num_heads, seq_q, seq_k]
- 除以根号 d 进行缩放操作
- 接下来在注意力分数矩阵上应用掩码,因此 mask 的形状须是一个四维矩阵。在生成 pad_mask 的时候,我们首先用源序列和目标序列创建基础 mask,q_mask = (q != pad_idx),k_mask = (k != pad_idx),shape 为 [batch_size, seq_q / seq_k],接下来拓展维度使其与查询矩阵与键矩阵相匹配,最后将 q_mask & k_mask,得到的最终 mask 为 [batch_size, 1, seq_q, seq_k],最后应用掩码时通过广播机制匹配到所有注意力头上。对于 casual_mask,创建 [seq_q, seq_k] 形状的下三角矩阵,然后拓展维度与 pad_mask 按元素乘,得到最终的 mask
- softmax 操作
forward
Encoder 只需要传递源序列,Decoder 需要传递 encoder 计算的结果与目的序列(做 cross-attention),在前向过程中,先通过 encoder 计算出每一层 Encoder Layer 对于源序列的输出,然后将每一层的编码器输出传递给每一层的解码器,解码器在进行计算,得到最终的解码输出。
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import torch.nn.functional as F
import numpy as np
import random
import math
from torch import Tensor
import os
import multi_attention
import layernorm
import embedding
import encoder
import decoder
class Transformer(nn.Module):
# src_pad_idx: 源序列的填充索引
# tgt_pad_idx: 目标序列的填充索引
# 填充索引是单个数值,在词表中用来表示填充符号padding token的索引值,通常设为0或-1
# src_voc_size: 源序列的词汇表大小
# tgt_voc_size: 目标序列的词汇表大小
# max_len: 序列的最大长度
# d_model: 词嵌入的维度
def __init__(self, src_pad_idx, tgt_pad_idx, src_voc_size, tgt_voc_size, max_len, d_model, hidden_dim, num_heads, num_layers, dropout=0.1, device=None):
super(Transformer, self).__init__()
self.encoder = encoder.Encoder(src_voc_size, max_len, d_model, hidden_dim, num_heads, num_layers, dropout, device)
self.decoder = decoder.Decoder(tgt_voc_size, max_len, d_model, hidden_dim, num_heads, num_layers, dropout, device)
self.src_pad_idx = src_pad_idx
self.tgt_pad_idx = tgt_pad_idx
self.device = device
def make_pad_mask(self, q, k, pad_idx_q, pad_idx_k):
len_q, len_k = q.size(1), k.size(1)
# 先用ne()操作转换为二值矩阵,然后通过unsqueeze()操作在q的维度上扩展维度
# 最后通过repeat()操作将q的维度扩展为(batch_size, 1, len_q, len_k)
# 这个操作是为了将mask矩阵的维度和score分数矩阵的维度对齐
q = q.ne(pad_idx_q).unsqueeze(1).unsqueeze(3)
q = q.repeat(1, 1, 1, len_k)
k = k.ne(pad_idx_k).unsqueeze(1).unsqueeze(2)
k = k.repeat(1, 1, len_q, 1)
mask = q & k
return mask
def make_casual_mask(self, q, k):
mask = torch.tril(torch.ones(q.size(1), k.size(1))).type(torch.BoolTensor).to(self.device)
return mask
def forward(self, src, tgt):
src_mask = self.make_pad_mask(src, src, self.src_pad_idx, self.src_pad_idx)
# 注意*是按元素乘,相当于把两种mask的作用效果叠加了
tgt_mask = self.make_pad_mask(tgt, tgt, self.tgt_pad_idx, self.tgt_pad_idx)*self.make_casual_mask(tgt, tgt)
enc = self.encoder(src, src_mask)
dec = self.decoder(tgt, enc, tgt_mask, src_mask)
return dec