Y1ran/NLP-BERT–ChineseVersion :这个项目的代码解读与学习,了解BERT与复现论文细节
BERT原文:BERT: Pre-training of Deep Bidirectional Transformers for Language Understanding - ACL Anthology
NLP的主要两个任务:
识别和分类,比如文本分类和情感分析
生成文本,比如机器翻译和聊天机器人
基石都是Transformer架构,但是BERT基于编码器,GPT基于解码器
BERT就像原文中说的,类似于完形填空,是一个双向的模型
项目总览 1 2 3 4 5 6 7 8 9 10 bert_pytorch/ ├── dataset/ # 数据 & vocab ├── model/ # BERT 本体(核心) │ ├── attention/ # Self-Attention 实现 │ ├── embedding/ # Token / Position / Segment Embedding │ ├── utils/ # FFN / LayerNorm / 残差结构 │ ├── transformer.py │ ├── bert.py │ └── language_model.py └── trainer/ # 预训练流程
dataset dataset 构建BERTDataset类
把一行「句子对」加工成 BERT 预训练需要的 4 个张量
也就是论文里的两项任务:
MLM(Masked Language Model) 掩码语言模型
NSP(Next Sentence Prediction) 下句预测
__init__:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 class BERTDataset (Dataset ): def __init__ (self, corpus_path, vocab, seq_len, encoding="utf-8" , corpus_lines=None , on_memory=True ): self .vocab = vocab self .seq_len = seq_len self .on_memory = on_memory self .corpus_lines = corpus_lines self .corpus_path = corpus_path self .encoding = encoding with open (corpus_path, "r" , encoding=encoding) as f: if self .corpus_lines is None and not on_memory: for _ in tqdm.tqdm(f, desc="Loading Dataset" , total=corpus_lines): self .corpus_lines += 1 if on_memory: self .lines = [line[:-1 ].split("\t" ) for line in tqdm.tqdm(f, desc="Loading Dataset" , total=corpus_lines)] self .corpus_lines = len (self .lines) if not on_memory: self .file = open (corpus_path, "r" , encoding=encoding) self .random_file = open (corpus_path, "r" , encoding=encoding) for _ in range (random.randint(self .corpus_lines if self .corpus_lines < 1000 else 1000 )): self .random_file.__next__()
语料文件格式为:句子A\t句子B
on_memory=True代表
一次性把整个语料文件读进内存
每一行变成:[sentence1, sentence2]
这只有小语料库比较合适,大语料库会让内存很紧张
__getitem__:初始语句获取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 def __getitem__ (self, item ): t1, t2, is_next_label = self .random_sent(item) ... def random_sent (self, index ): t1, t2 = self .get_corpus_line(index) if random.random() > 0.5 : return t1, t2, 1 else : return t1, self .get_random_line(), 0 def get_corpus_line (self, item ): if self .on_memory: return self .lines[item][0 ], self .lines[item][1 ] else : line = self .file.__next__() if line is None : self .file.close() self .file = open (self .corpus_path, "r" , encoding=self .encoding) line = self .file.__next__() t1, t2 = line[:-1 ].split("\t" ) return t1, t2 def get_random_line (self ): if self .on_memory: return self .lines[random.randrange(len (self .lines))][1 ] line = self .file.__next__() if line is None : self .file.close() self .file = open (self .corpus_path, "r" , encoding=self .encoding) for _ in range (random.randint(self .corpus_lines if self .corpus_lines < 1000 else 1000 )): self .random_file.__next__() line = self .random_file.__next__() return line[:-1 ].split("\t" )[1 ]
无论如何先取
1 2 t1 = 第 index 行的第一句 t2 = 第 index 行的第二句
get_corpus_line取决于 on_memory,如果True直接按 index 取,否则按数据流顺序从文件里读下一行(如果文件读完从头开始)
然后再根据random的值,替换t2
50%:真实相邻句子 → label = 1
50%:随机句子 → label = 0
__getitem__:对语句进行扰动
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 def __getitem__ (self, item ): ... t1_random, t1_label = self .random_word(t1) t2_random, t2_label = self .random_word(t2) ... def random_word (self, sentence ): tokens = sentence.split() output_label = [] for i, token in enumerate (tokens): prob = random.random() if prob < 0.15 : prob /= 0.15 if prob < 0.8 : tokens[i] = self .vocab.mask_index elif prob < 0.9 : tokens[i] = random.randrange(len (self .vocab)) else : tokens[i] = self .vocab.stoi.get(token, self .vocab.unk_index) output_label.append(self .vocab.stoi.get(token, self .vocab.unk_index)) else : tokens[i] = self .vocab.stoi.get(token, self .vocab.unk_index) output_label.append(0 ) return tokens, output_label
举例:
1 2 3 4 5 6 7 8 sentence = "我 爱 自然 语言 处理" 我 → 10 爱 → 11 自然 → 12 语言 → 13 处理 → 14 [MASK] → 3 [UNK] → 1
位置 i
token
random.random()
output_label操作
0
我
0.62
填0
1
爱
0.04
换为[MASK],填3
2
自然
0.81
填0
3
语言
0.12
换为随机词,比如填25
4
处理
0.30
填0
token返回:[10, 3, 12, 25, 14],对应文本 “我 [MASK] 自然 [随机词] 处理”,是刻意扰动后的输入(相当于增强)
output_label返回:[0, 11, 0, 13, 0],始终保存原始正确答案
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def __getitem__ (self, item ): ... t1 = [self .vocab.sos_index] + t1_random + [self .vocab.eos_index] t2 = t2_random + [self .vocab.eos_index] t1_label = [self .vocab.pad_index] + t1_label + [self .vocab.pad_index] t2_label = t2_label + [self .vocab.pad_index] segment_label = ([1 for _ in range (len (t1))] + [2 for _ in range (len (t2))])[:self .seq_len] bert_input = (t1 + t2)[:self .seq_len] bert_label = (t1_label + t2_label)[:self .seq_len] padding = [self .vocab.pad_index for _ in range (self .seq_len - len (bert_input))] bert_input.extend(padding), bert_label.extend(padding), segment_label.extend(padding)
给扰动后的 t1,t2 token 和 label 加上特殊标记[CLS]和[SEP]
构成输入结构:[CLS] sentence A [SEP] sentence B [SEP]
然后给句子加上 segment_label,这就是 BERT 能区分 “两句话”的关键
最终的输出张量:
1 2 3 4 5 6 7 8 9 def __getitem__ (self, item ): ... output = { "bert_input" : bert_input, "bert_label" : bert_label, "segment_label" : segment_label, "is_next" : is_next_label } return {key: torch.tensor(value) for key, value in output.items()}
key
作用
bert_input
输入给 BERT Encoder
bert_label
MLM loss 的监督
segment_label
句子 A / B 区分
is_next
NSP 任务标签
vocab 1 2 3 TorchVocab → 通用词表机制(排序、stoi/itos) Vocab → 固定 BERT 所需的特殊 token 语义 WordVocab → 把“真实文本”变成“可训练的 id 序列”
构建TorchVocab类
注释部分:
1 2 3 4 5 6 7 """ 定义一个词汇表对象,用于将字段数值化 属性: freqs:一个 collections.Counter 对象,存储用于构建词汇表的数据中各个词元的频率 stoi:一个 collections.defaultdict 实例,将词元字符串映射到数值标识符 itos:一个列表,其中包含按数值标识符索引的词元字符串 """
itos:id → token(index to string)
stoi:token → id(string to index)
并且支持:
限制词表大小 max_size
过滤低频词 min_freq
预置特殊符号 specials(比如 <pad> <unk> 等),会被放到词表最前面
__init__:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 class TorchVocab (object ): def __init__ (self, counter, max_size=None , min_freq=1 , specials=['<pad>' , '<oov>' ], vectors=None , unk_init=None , vectors_cache=None ): self .freqs = counter counter = counter.copy() min_freq = max (min_freq, 1 ) self .itos = list (specials) for tok in specials: del counter[tok] max_size = None if max_size is None else max_size + len (self .itos)
先把 specials 放进 itos,并且把 specials 从 counter 里删掉
不论在语料里出现多少,都强行占据最前面的 id
max_size把 specials 的长度算进去,避免把特殊符挤掉
1 2 3 4 5 def __init__ (): ... words_and_frequencies = sorted (counter.items(), key=lambda tup: tup[0 ]) words_and_frequencies.sort(key=lambda tup: tup[1 ], reverse=True )
一个很经典的“稳定排序”写法:
先按 token 字典序排序
再按频率降序排序(Python sort 稳定)
同样的数据、同样的参数,词表 id 不会因为 hash/遍历顺序漂移
1 2 3 4 5 6 7 8 9 def __init__ (): ... for word, freq in words_and_frequencies: if freq < min_freq or len (self .itos) == max_size: break self .itos.append(word) self .stoi = {tok: i for i, tok in enumerate (self .itos)}
按规则逐个加入词典 itos,并且根据 itos 创建反向字典 stoi
之后就可以实现 stoi[' '] = id 和 itos[id] = ' '
构建Vocab类
把“通用词表”绑定到 BERT 物理语义
1 2 3 4 5 6 7 8 9 10 11 12 13 class Vocab (TorchVocab ): def __init__ (self, counter, max_size=None , min_freq=1 ): self .pad_index = 0 self .unk_index = 1 self .eos_index = 2 self .sos_index = 3 self .mask_index = 4 super ().__init__( counter, specials=["<pad>" , "<unk>" , "<eos>" , "<sos>" , "<mask>" ], max_size=max_size, min_freq=min_freq )
index
token
在 BERT 里的角色
0
<pad>
真空态(padding,不参与任何 loss)
1
<unk>
未知词坍缩态
2
<eos>
句子终止
3
<sos>
句子起始
4
<mask>
MLM 扰动态
构建WordVocab类
1 2 3 4 5 6 7 8 9 10 11 class WordVocab (Vocab ): def __init__ (self, texts, max_size=None , min_freq=1 ): counter = Counter() for line in texts: if isinstance (line, list ): words = line else : words = line.replace("\n" , "" ).replace("\t" , "" ).split() for word in words: counter[word] += 1 super ().__init__(counter, max_size=max_size, min_freq=min_freq)
输入的texts它可以是两种形式之一
已经分好词:
1 2 3 4 texts = [ ["我", "爱", "自然", "语言"], ["语言", "模型"] ]
原始字符串:
1 2 3 4 texts = [ "我 爱 自然 语言", "语言 模型" ]
初始化后并统计词频
to_seq, from_seq最重要的函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 def to_seq (self, sentence, seq_len=None , with_eos=False , with_sos=False , with_len=False ): if isinstance (sentence, str ): sentence = sentence.split() seq = [self .stoi.get(word, self .unk_index) for word in sentence] if with_eos: seq += [self .eos_index] if with_sos: seq = [self .sos_index] + seq origin_seq_len = len (seq) if seq_len is None : pass elif len (seq) <= seq_len: seq += [self .pad_index for _ in range (seq_len - len (seq))] else : seq = seq[:seq_len] return (seq, origin_seq_len) if with_len else seq def from_seq (self, seq, join=False , with_pad=False ): words = [self .itos[idx] if idx < len (self .itos) else "<%d>" % idx for idx in seq if with_pad or idx != self .pad_index] return " " .join(words) if join else words
输入sentence同样两种形式
1 seq = [self .stoi.get(word, self .unk_index) for word in sentence]
在词表里 → 对应 id
不在词表里 → <unk>(坍缩态)
origin_seq_len = len(seq) 用途通常是:
RNN(真实长度)
attention mask
loss 归一化
1 2 3 4 5 6 words = [ self .itos[idx] if idx < len (self .itos) else "<%d>" % idx for idx in seq if with_pad or idx != self .pad_index ]
idx < len(self.itos) → 正常词
否则 → 打印成 <12345>,防止 crash
with_pad=False → 默认不保留 pad
build函数:
独立的“词表构建脚本入口”
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def build (): import argparse parser = argparse.ArgumentParser() parser.add_argument("-c" , "--corpus_path" , required=True , type =str ) parser.add_argument("-o" , "--output_path" , required=True , type =str ) parser.add_argument("-s" , "--vocab_size" , type =int , default=None ) parser.add_argument("-e" , "--encoding" , type =str , default="utf-8" ) parser.add_argument("-m" , "--min_freq" , type =int , default=1 ) args = parser.parse_args() with open (args.corpus_path, "r" , encoding=args.encoding) as f: vocab = WordVocab(f, max_size=args.vocab_size, min_freq=args.min_freq) print ("VOCAB SIZE:" , len (vocab)) vocab.save_vocab(args.output_path)
从原始语料中构建一份 WordVocab,并把这份“冻结后的词表”保存到磁盘
而后训练阶段和推理阶段都直接``load_vocab`
model
BERT与Transformer不同在于在词嵌入和位置嵌入中间还加了一个片段嵌入(segment)
embedding 1 2 3 4 5 6 7 8 9 10 11 12 13 token_id ∈ ℕ segment_id ∈ {0,1,2} position_id ∈ {0,1,2,...} ↓ 各自 lookup / 计算 TokenEmbedding → [B, L, D] SegmentEmbedding → [B, L, D] PositionalEmbedding → [1, L, D] ↓ 按元素相加 Embedding Output → [B, L, D]
构建BERTEmbedding类:
把三种“互补的离散信息”投影到同一个向量空间,然后相加
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 class BERTEmbedding (nn.Module): """ BERT Embedding which is consisted with under features 1. TokenEmbedding : normal embedding matrix 2. PositionalEmbedding : adding positional information using sin, cos 2. SegmentEmbedding : adding sentence segment info, (sent_A:1, sent_B:2) sum of all these features are output of BERTEmbedding """ def __init__ (self, vocab_size, embed_size, dropout=0.1 ): """ :param vocab_size: total vocab size :param embed_size: embedding size of token embedding :param dropout: dropout rate """ super ().__init__() self .token = TokenEmbedding(vocab_size=vocab_size, embed_size=embed_size) self .position = PositionalEmbedding(d_model=self .token.embedding_dim) self .segment = SegmentEmbedding(embed_size=self .token.embedding_dim) self .dropout = nn.Dropout(p=dropout) self .embed_size = embed_size def forward (self, sequence, segment_label ): x = self .token(sequence) + self .position(sequence) + self .segment(segment_label) return self .dropout(x)
__init__:
vocab_size:词表大小,决定 embedding lookup table 的行数 以及 token id 的合法范围
embed_size:BERT 的隐藏维度,匹配论文里的hidden_size
在论文中:
1 2 BASE (L=12, H=768, A=12, Total Parameters=110M) LARGE (L=24, H=1024, A=16, Total Parameters=340M).
TokenEmbedding :
1 2 3 class TokenEmbedding (nn.Embedding): def __init__ (self, vocab_size, embed_size=512 ): super ().__init__(vocab_size, embed_size, padding_idx=0 )
关键约束:padding_idx=0
PositionalEmbedding :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class PositionalEmbedding (nn.Module): def __init__ (self, d_model, max_len=512 ): super ().__init__() pe = torch.zeros(max_len, d_model).float () pe.require_grad = False position = torch.arange(0 , max_len).float ().unsqueeze(1 ) div_term = (torch.arange(0 , d_model, 2 ).float () * -(math.log(10000.0 ) / d_model)).exp() pe[:, 0 ::2 ] = torch.sin(position * div_term) pe[:, 1 ::2 ] = torch.cos(position * div_term) pe = pe.unsqueeze(0 ) self .register_buffer('pe' , pe) def forward (self, x ): return self .pe[:, :x.size(1 )]
参数
含义
d_model
embedding 维度 D
max_len
最大支持序列长度
pe.require_grad = False:位置编码不参与训练
SegmentEmbedding :
1 2 3 class SegmentEmbedding (nn.Embedding): def __init__ (self, embed_size=512 ): super ().__init__(3 , embed_size, padding_idx=0 )
这里num_embedding=3是因为
id
语义
0
padding
1
sentence A
2
sentence B
同一个 token,在 A 句 和 B 句中 embedding 会被整体平移到不同位置
项
输入
输出
信息来源
Token
token id
[B,L,D]
词语身份
Position
序列长度
[1,L,D]
位置信息
Segment
segment id
[B,L,D]
句子身份
Transformer 后面的线性层和 attention 会自动学会如何“解码”这三种信息的混合
attention 构建MultiHeadedAttention类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class MultiHeadedAttention (nn.Module): """ Take in model size and number of heads. """ def __init__ (self, h, d_model, dropout=0.1 ): super ().__init__() assert d_model % h == 0 self .d_k = d_model // h self .h = h self .linear_layers = nn.ModuleList([nn.Linear(d_model, d_model) for _ in range (3 )]) self .output_linear = nn.Linear(d_model, d_model) self .attention = Attention() self .dropout = nn.Dropout(p=dropout)
assert d_model % h == 0:每个 head 必须分到相同维度
每个 head 的维度为 d_k
以BASE为例
1 2 3 d_model = 768 h = 12 → d_k = 64
self.linear_layers = nn.ModuleList([nn.Linear(d_model, d_model) for _ in range(3)])构建三个线性层获得Q/K/V
先用一个 Linear 把 d_model 映射到 d_model,再 reshape 成 h 个 head
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def forward (self, query, key, value, mask=None ): batch_size = query.size(0 ) query, key, value = [l(x).view(batch_size, -1 , self .h, self .d_k).transpose(1 , 2 ) for l, x in zip (self .linear_layers, (query, key, value))] x, attn = self .attention(query, key, value, mask=mask, dropout=self .dropout) x = x.transpose(1 , 2 ).contiguous().view(batch_size, -1 , self .h * self .d_k) return self .output_linear(x)
在 BERT 的 self-attention 里,通常是
线性映射 + reshape
1 2 3 4 5 6 7 query, key, value = [ l(x) .view(batch_size, -1 , self .h, self .d_k) .transpose(1 , 2 ) for l, x in zip (self .linear_layers, (query, key, value)) ]
点积注意力计算:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 class Attention (nn.Module): """ Compute 'Scaled Dot Product Attention """ def forward (self, query, key, value, mask=None , dropout=None ): scores = torch.matmul(query, key.transpose(-2 , -1 )) \ / math.sqrt(query.size(-1 )) if mask is not None : scores = scores.masked_fill(mask == 0 , -1e9 ) p_attn = F.softmax(scores, dim=-1 ) if dropout is not None : p_attn = dropout(p_attn) return torch.matmul(p_attn, value), p_attn
输出为[B, h, L, d_k]
然后实现多头合并
1 2 3 x = x.transpose(1 , 2 ) \ .contiguous() \ .view(batch_size, -1 , self .h * self .d_k)
实现
1 2 3 4 5 Embedding output: [B, L, D] ↓ MultiHeadAttention ↓ [B, L, D](但内容已经是“上下文化的”)
让 token 彼此看见对方
utils
文件
角色
feed_forward.py
FFN(逐 token 的非线性变换)
gelu.py
BERT 使用的激活函数
layer_norm.py
LayerNorm(特征维归一化)
sublayer.py
残差 + LayerNorm + Dropout 的统一封装
feed_forward.py
1 2 3 4 5 6 7 8 9 10 11 12 13 class PositionwiseFeedForward (nn.Module): "Implements FFN equation." def __init__ (self, d_model, d_ff, dropout=0.1 ): super (PositionwiseFeedForward, self ).__init__() self .w_1 = nn.Linear(d_model, d_ff) self .w_2 = nn.Linear(d_ff, d_model) self .dropout = nn.Dropout(dropout) self .activation = GELU() def forward (self, x ): return self .w_2(self .dropout(self .activation(self .w_1(x))))
FFN 不跨 token,因为FFN的线性层只作用在最后一维,不混合L那一维度,是单 token 的非线性表达能力
选用GELU是因为
attention 输出是连续概率混合
硬 ReLU 会破坏分布连续性
GELU 更符合“概率门控”的语义
gelu.py
1 2 3 4 5 6 7 8 class GELU (nn.Module): """ Paper Section 3.4, last paragraph notice that BERT used the GELU instead of RELU """ def forward (self, x ): return 0.5 * x * (1 + torch.tanh(math.sqrt(2 / math.pi) * (x + 0.044715 * torch.pow (x, 3 ))))
tanh 近似: $$ \operatorname{GELU}(x) \approx 0.5 x\left(1+\tanh \left(\sqrt{\frac{2}{\pi}}\left(x+0.044715 x^{3}\right)\right)\right) $$Hendrycks & Gimpel (2016) 提出的版本
tanh 库表示: $$ \begin{aligned} \operatorname{GELU}(x) & =\frac{x}{2}\left(1+\operatorname{erf}\left(\frac{x}{\sqrt{2}}\right)\right) \\ \operatorname{erf}(x) & =\frac{2}{\sqrt{\pi}} \int_{0}^{x} e^{-t^{2}} d t \end{aligned} $$
layer_norm.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 class LayerNorm (nn.Module): "Construct a layernorm module (See citation for details)." def __init__ (self, features, eps=1e-6 ): super (LayerNorm, self ).__init__() self .a_2 = nn.Parameter(torch.ones(features)) self .b_2 = nn.Parameter(torch.zeros(features)) self .eps = eps def forward (self, x ): mean = x.mean(-1 , keepdim=True ) std = x.std(-1 , keepdim=True ) return self .a_2 * (x - mean) / (std + self .eps) + self .b_2
对最后一维(embedding 维)进行单序列归一化
sublayer.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 class SublayerConnection (nn.Module): """ A residual connection followed by a layer norm. Note for code simplicity the norm is first as opposed to last. """ def __init__ (self, size, dropout ): super (SublayerConnection, self ).__init__() self .norm = LayerNorm(size) self .dropout = nn.Dropout(dropout) def forward (self, x, sublayer ): "Apply residual connection to any sublayer with the same size." return x + self .dropout(sublayer(self .norm(x)))
在原始论文《Attention Is All You Need》中是y = LayerNorm(x + Dropout(Sublayer(x))),是Post-LN结构
先算子层(Attention / FFN),再做残差,最后 LayerNorm
理论上“更干净”,但深层训练不稳定
这里等价于 y = x + Dropout(Sublayer(LayerNorm(x))),是Pre-LN结构
梯度可以直接沿残差路径流动,深层模型训练稳定得多,几乎所有现代 BERT / GPT / LLaMA都用这个
构建TransformerBlock类
将之前的类连接上
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 class TransformerBlock (nn.Module): """ Bidirectional Encoder = Transformer (self-attention) Transformer = MultiHead_Attention + Feed_Forward with sublayer connection """ def __init__ (self, hidden, attn_heads, feed_forward_hidden, dropout ): """ :param hidden: hidden size of transformer :param attn_heads: head sizes of multi-head attention :param feed_forward_hidden: feed_forward_hidden, usually 4*hidden_size :param dropout: dropout rate """ super ().__init__() self .attention = MultiHeadedAttention(h=attn_heads, d_model=hidden) self .feed_forward = PositionwiseFeedForward(d_model=hidden, d_ff=feed_forward_hidden, dropout=dropout) self .input_sublayer = SublayerConnection(size=hidden, dropout=dropout) self .output_sublayer = SublayerConnection(size=hidden, dropout=dropout) self .dropout = nn.Dropout(p=dropout) def forward (self, x, mask ): x = self .input_sublayer(x, lambda _x: self .attention.forward(_x, _x, _x, mask=mask)) x = self .output_sublayer(x, self .feed_forward) return self .dropout(x)
forward:
参数
shape
含义
x
[B, L, hidden]
embedding 或上一层输出
mask
[B, 1, 1, L] 或可 broadcast
attention mask
Attention 子层 :
1 2 3 4 x = self .input_sublayer( x, lambda _x: self .attention.forward(_x, _x, _x, mask=mask) )
等价于:
1 2 3 x_norm = LayerNorm(x) attn_out = MultiHeadAttention(x_norm, x_norm, x_norm, mask) x = x + Dropout(attn_out)
Attention 在 FFN 之前
LayerNorm 在残差之前(Pre-LN)
token 交互发生在这里
FFN 子层 :
1 x = self .output_sublayer(x, self .feed_forward)
等价于:
1 2 3 x_norm = LayerNorm(x) ffn_out = FeedForward(x_norm) x = x + Dropout(ffn_out)
每个 token 已经是“上下文化表示”
FFN 做的是单 token 的非线性增强
bert 构建BERT类,把离散 token 序列,映射成“多层上下文化的连续表示”
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 class BERT (nn.Module): """ BERT model : Bidirectional Encoder Representations from Transformers. """ def __init__ (self, vocab_size, hidden=768 , n_layers=12 , attn_heads=12 , dropout=0.1 ): """ :param vocab_size: vocab_size of total words :param hidden: BERT model hidden size 论文里的 d_model,embedding 维度 :param n_layers: numbers of Transformer blocks(layers) 对应论文的12和24 :param attn_heads: number of attention heads :param dropout: dropout rate """ super ().__init__() self .hidden = hidden self .n_layers = n_layers self .attn_heads = attn_heads self .feed_forward_hidden = hidden * 4 self .embedding = BERTEmbedding(vocab_size=vocab_size, embed_size=hidden) self .transformer_blocks = nn.ModuleList( [TransformerBlock(hidden, attn_heads, hidden * 4 , dropout) for _ in range (n_layers)]) def forward (self, x, segment_info ): mask = (x > 0 ).unsqueeze(1 ).repeat(1 , x.size(1 ), 1 ).unsqueeze(1 ) x = self .embedding(x, segment_info) for transformer in self .transformer_blocks: x = transformer.forward(x, mask) return x
构造 attention mask:
1 mask = (x > 0 ).unsqueeze(1 ).repeat(1 , x.size(1 ), 1 ).unsqueeze(1 )
这是encoder-style 全可见 mask,不做未来屏蔽 ,这是和GPT的区别
而不同任务需要不同映射
任务
需要什么
MLM
token-level vocab logits
NSP
sentence-level binary logits
分类
[CLS] 向量
NER
token-level tag logits
language_model 把“纯 BERT Encoder”变成“可训练预训练模型”
构建BERTLM类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class BERTLM (nn.Module): """ BERT Language Model Next Sentence Prediction Model + Masked Language Model """ def __init__ (self, bert: BERT, vocab_size ): """ :param bert: BERT model which should be trained :param vocab_size: total vocab size for masked_lm """ super ().__init__() self .bert = bert self .next_sentence = NextSentencePrediction(self .bert.hidden) self .mask_lm = MaskedLanguageModel(self .bert.hidden, vocab_size)
NSP head:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class NextSentencePrediction (nn.Module): """ 2-class classification model : is_next, is_not_next """ def __init__ (self, hidden ): """ :param hidden: BERT model output size """ super ().__init__() self .linear = nn.Linear(hidden, 2 ) self .softmax = nn.LogSoftmax(dim=-1 ) def forward (self, x ): return self .softmax(self .linear(x[:, 0 ]))
输入x[:, 0]相当于取每个样本的第 0 个 token 的表示
因为在BERT里第0个位置([CLS])被设计成整句的全局语义代表
输出is_next / not_next
在dataset.py中返回包含is_next_label,训练时
1 nsp_loss = NLLLoss(nsp_logits, is_next_label)
为什么后来模型删除了 NSP
因为 NSP 这种二分类任务并不能有效教会模型句子级语义关系,且会干扰更重要的 MLM 学习
NSP 的负样本设计过于简单,会占用并干扰 [CLS] 的表示能力,后续工作中
RoBERTa:完全移除 NSP,仅保留 MLM,并通过更大数据和更强训练策略学习上下文关系
ALBERT:用SOP(Sentence Order Prediction) 替代 NSP,构造更困难、更语义相关的负样本
MLM head:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 class MaskedLanguageModel (nn.Module): """ predicting origin token from masked input sequence n-class classification problem, n-class = vocab_size """ def __init__ (self, hidden, vocab_size ): """ :param hidden: output size of BERT model :param vocab_size: total vocab size """ super ().__init__() self .linear = nn.Linear(hidden, vocab_size) self .softmax = nn.LogSoftmax(dim=-1 ) def forward (self, x ): return self .softmax(self .linear(x))
输入:x: [B, L, hidden];输出:[B, L, vocab_size]
对每一个 token 位置预测一个 vocab 分布
1 mlm_loss = NLLLoss(ignore_index=0 )
一次forward同时算两个任务
1 2 3 4 class BERTLM (nn.Module): def forward (self, x, segment_label ): x = self .bert(x, segment_label) return self .next_sentence(x), self .mask_lm(x)
BERT Encoder 只跑一次,输出同时喂给两个 head
输出是一个 tuple :
1 2 3 4 ( nsp_logits, # [B, 2] mlm_logits # [B, L, vocab_size] )
trainer
optim_schedule 构建ScheduledOptim类
对学习率进行控制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 class ScheduledOptim (): '''A simple wrapper class for learning rate scheduling''' def __init__ (self, optimizer, d_model, n_warmup_steps ): self ._optimizer = optimizer self .n_warmup_steps = n_warmup_steps self .n_current_steps = 0 self .init_lr = np.power(d_model, -0.5 ) def step_and_update_lr (self ): "Step with the inner optimizer" self ._update_learning_rate() self ._optimizer.step() def zero_grad (self ): "Zero out the gradients by the inner optimizer" self ._optimizer.zero_grad() def _get_lr_scale (self ): return np.min ([ np.power(self .n_current_steps, -0.5 ), np.power(self .n_warmup_steps, -1.5 ) * self .n_current_steps]) def _update_learning_rate (self ): ''' Learning rate scheduling per step ''' self .n_current_steps += 1 lr = self .init_lr * self ._get_lr_scale() for param_group in self ._optimizer.param_groups: param_group['lr' ] = lr
是一个“包着 Adam 的学习率调度器”,用的是 Transformer 论文里的 Noam Scheduler
在 Transformer / BERT 里,一开始 learning rate 不能太大,后面又必须逐渐变小,否则:
在《Attention Is All You Need》论文里提出了这个公式: $$ lr = d_{model}^{-0.5} * \min(step^{-0.5}, step * warmup^{-1.5}) $$step_and_update_lr:
先更新 learning rate
再做 optimizer.step()
保证每一步用的 lr 都是“当前步数对应的值”
pretrain 整个库的核心,构建BERTTrainer类,主要实现三件事
组装 BERT + 预训练 head
定义 loss / optimizer / device
执行 完整训练循环
__init__:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 class BERTTrainer : """ BERTTrainer make the pretrained BERT model with two LM training method. 1. Masked Language Model : 3.3.1 Task #1: Masked LM 2. Next Sentence prediction : 3.3.2 Task #2: Next Sentence Prediction please check the details on README.md with simple example. """ def __init__ (self, bert: BERT, vocab_size: int , train_dataloader: DataLoader, test_dataloader: DataLoader = None , lr: float = 1e-4 , betas=(0.9 , 0.999 ), weight_decay: float = 0.01 , warmup_steps=10000 , with_cuda: bool = True , cuda_devices=None , log_freq: int = 10 ): """ :param bert: BERT model which you want to train :param vocab_size: total word vocab size :param train_dataloader: train dataset data loader :param test_dataloader: test dataset data loader [can be None] :param lr: learning rate of optimizer :param betas: Adam optimizer betas :param weight_decay: Adam optimizer weight decay param :param with_cuda: traning with cuda :param log_freq: logging frequency of the batch iteration """ cuda_condition = torch.cuda.is_available() and with_cuda self .device = torch.device("cuda:0" if cuda_condition else "cpu" ) self .bert = bert self .model = BERTLM(bert, vocab_size).to(self .device) if with_cuda and torch.cuda.device_count() > 1 : print ("Using %d GPUS for BERT" % torch.cuda.device_count()) self .model = nn.DataParallel(self .model, device_ids=cuda_devices) self .train_data = train_dataloader self .test_data = test_dataloader self .optim = Adam(self .model.parameters(), lr=lr, betas=betas, weight_decay=weight_decay) self .optim_schedule = ScheduledOptim(self .optim, self .bert.hidden, n_warmup_steps=warmup_steps) self .criterion = nn.NLLLoss(ignore_index=0 ) self .log_freq = log_freq print ("Total Parameters:" , sum ([p.nelement() for p in self .model.parameters()]))
self.bert:裸的 Encoder(用来保存)
self.model:BERT + MLM + NSP head(用来训练)
👉 训练的是 BERTLM,不是 BERT
DataLoader 接入在dataset.py 里构造的字典
1 2 3 4 5 6 { "bert_input", "segment_label", "bert_label", "is_next" }
Optimizer + Scheduler:Trainer 以后只调用 optim_schedule,不再直接调用 Adam
Loss 函数:使用NLLLoss
项目
NLLLoss
CrossEntropyLoss
输入
log 概率
原始 logits
是否内部做 softmax
❌ 不做
✅ 做
是否内部取 log
❌ 不做
✅ 做
本质
负对数似然
LogSoftmax + NLL
使用难度
容易用错
更安全
为什么可以复用同一个 NLLLoss 类
NSP:[B, 2] vs [B]
MLM:[B, L, vocab] vs [B, L]
非 mask 的 label = 0,被 ignore
iteration:训练真正发生的地方
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 def iteration (self, epoch, data_loader, train=True ): """ loop over the data_loader for training or testing if on train status, backward operation is activated and also auto save the model every peoch :param epoch: current epoch index :param data_loader: torch.utils.data.DataLoader for iteration :param train: boolean value of is train or test :return: None """ str_code = "train" if train else "test" data_iter = tqdm.tqdm(enumerate (data_loader), desc="EP_%s:%d" % (str_code, epoch), total=len (data_loader), bar_format="{l_bar}{r_bar}" ) avg_loss = 0.0 total_correct = 0 total_element = 0 for i, data in data_iter: data = {key: value.to(self .device) for key, value in data.items()} next_sent_output, mask_lm_output = self .model.forward(data["bert_input" ], data["segment_label" ]) next_loss = self .criterion(next_sent_output, data["is_next" ]) mask_loss = self .criterion(mask_lm_output.transpose(1 , 2 ), data["bert_label" ]) loss = next_loss + mask_loss if train: self .optim_schedule.zero_grad() loss.backward() self .optim_schedule.step_and_update_lr() correct = next_sent_output.argmax(dim=-1 ).eq(data["is_next" ]).sum ().item() avg_loss += loss.item() total_correct += correct total_element += data["is_next" ].nelement() post_fix = { "epoch" : epoch, "iter" : i, "avg_loss" : avg_loss / (i + 1 ), "avg_acc" : total_correct / total_element * 100 , "loss" : loss.item() } if i % self .log_freq == 0 : data_iter.write(str (post_fix)) print ("EP%d_%s, avg_loss=" % (epoch, str_code), avg_loss / len (data_iter), "total_acc=" , total_correct * 100.0 / total_element)
前向传播(BERT + 两个 head)
输出
shape
next_sent_output
[B, 2]
mask_lm_output
[B, L, vocab_size]
1 mask_loss = self .criterion(mask_lm_output.transpose(1 , 2 ), data["bert_label" ])
为什么要 transpose(1,2)?
因为 nn.NLLLoss 要求“类别维在第 2 维(dim=1)”,而 MLM 的输出把 vocab 维放在了最后,所以必须 transpose
1 2 input: [N, C, *] target: [N, *]
在准确率上只统计 NSP
1 correct = next_sent_output.argmax(dim=-1 ).eq(data["is_next" ])
因为:
MLM 的准确率不太有直觉意义
而且 mask 比例低
只保存 Encoder,不保存 head,因为预训练完成后 MLM / NSP head 通常会被丢弃,下游任务会接新的 head
整体流程:
1 2 3 4 5 6 7 8 9 10 11 12 Dataset → DataLoader ↓ BERTLM.forward ├─ BERT Encoder ├─ NSP head └─ MLM head ↓ loss = NSP + MLM ↓ backward ↓ ScheduledOptim.step
总结 BERT = Embedding → 多层 Transformer Encoder → 预训练任务(MLM + NSP)
论文模块
代码文件
Input Representation
embedding.py(token / segment / position)
Transformer Encoder
transformer.py + bert.py
MLM
language_model.py → MaskedLanguageModel
NSP
language_model.py → NextSentencePrediction
Training
pretrain.py + optim_schedule.py
下游微调:
任务类型
输入示例
输出方式
代表数据集
句子对分类任务
[CLS] 句子A [SEP] 句子B [SEP]
使用 [CLS] 向量分类
MNLI, QQP, QNLI
单句分类任务
[CLS] 句子A [SEP]
使用 [CLS] 向量分类
SST-2, CoLA
问答任务
[CLS] 问题 [SEP] 上下文 [SEP]
计算所有词元的起止位置
SQuAD
单句标注任务
[CLS] 句子A [SEP]
使用所有词元向量分别分类
CoNLL-2003 NER
常用语库具体
句子对分类任务:
语库
输入
输出
模型学什么
MNLI
前提句(premise) 假设句(hypothesis)
entailment(蕴含) contradiction(矛盾) neutral(中立)
判断两个句子在语义逻辑上的关系
QQP
两个问题句子
是否语义等价(是 / 否)
识别不同表述下的同一语义
单句分类任务:
语库
输入
输出
模型学什么
SST-2
一句话
positive negative
情感分析
CoLA
一句话
语法是否合理(yes/no)
隐式语法规则,而不是语义
单句标注任务:
语库
输入
输出
模型学什么
CoNLL-2003 NER
一句话
对每个 token 预测标签
逐词理解 + 上下文依赖