Y1ran/NLP-BERT–ChineseVersion:这个项目的代码解读与学习,了解BERT与复现论文细节

BERT原文:BERT: Pre-training of Deep Bidirectional Transformers for Language Understanding - ACL Anthology

NLP的主要两个任务:

  • 识别和分类,比如文本分类和情感分析
  • 生成文本,比如机器翻译和聊天机器人

基石都是Transformer架构,但是BERT基于编码器,GPT基于解码器

BERT就像原文中说的,类似于完形填空,是一个双向的模型

项目总览

1
2
3
4
5
6
7
8
9
10
bert_pytorch/
├── dataset/ # 数据 & vocab
├── model/ # BERT 本体(核心)
│ ├── attention/ # Self-Attention 实现
│ ├── embedding/ # Token / Position / Segment Embedding
│ ├── utils/ # FFN / LayerNorm / 残差结构
│ ├── transformer.py
│ ├── bert.py
│ └── language_model.py
└── trainer/ # 预训练流程

dataset

dataset

构建BERTDataset

把一行「句子对」加工成 BERT 预训练需要的 4 个张量

也就是论文里的两项任务:

  1. MLM(Masked Language Model) 掩码语言模型
  2. NSP(Next Sentence Prediction) 下句预测

__init__:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class BERTDataset(Dataset):
def __init__(self, corpus_path, vocab, seq_len, encoding="utf-8", corpus_lines=None, on_memory=True):
self.vocab = vocab # 词表,token->id
self.seq_len = seq_len # BERT 最大长度

self.on_memory = on_memory # True 整份语料读进内存, False 边读边用
self.corpus_lines = corpus_lines # 语料总行数
self.corpus_path = corpus_path # 语料文件路径
self.encoding = encoding

with open(corpus_path, "r", encoding=encoding) as f:
if self.corpus_lines is None and not on_memory:
for _ in tqdm.tqdm(f, desc="Loading Dataset", total=corpus_lines):
self.corpus_lines += 1 # 数一下一共有多少行

if on_memory:
self.lines = [line[:-1].split("\t") # 去掉换行符,按 tab 分成句子对
for line in tqdm.tqdm(f, desc="Loading Dataset", total=corpus_lines)]
self.corpus_lines = len(self.lines)

if not on_memory:
self.file = open(corpus_path, "r", encoding=encoding)
self.random_file = open(corpus_path, "r", encoding=encoding)

for _ in range(random.randint(self.corpus_lines if self.corpus_lines < 1000 else 1000)):
self.random_file.__next__() # 把 random_file 的指针随机推进到文件中某个位置,近似随机

语料文件格式为:句子A\t句子B

on_memory=True代表

  • 一次性把整个语料文件读进内存
  • 每一行变成:[sentence1, sentence2]

这只有小语料库比较合适,大语料库会让内存很紧张


__getitem__:初始语句获取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def __getitem__(self, item):
t1, t2, is_next_label = self.random_sent(item)
...
def random_sent(self, index):
t1, t2 = self.get_corpus_line(index)

# output_text, label(isNotNext:0, isNext:1)
if random.random() > 0.5:
return t1, t2, 1
else:
return t1, self.get_random_line(), 0

def get_corpus_line(self, item):
if self.on_memory:
return self.lines[item][0], self.lines[item][1]
else:
line = self.file.__next__()
if line is None:
self.file.close()
self.file = open(self.corpus_path, "r", encoding=self.encoding)
line = self.file.__next__()

t1, t2 = line[:-1].split("\t")
return t1, t2

def get_random_line(self):
if self.on_memory:
return self.lines[random.randrange(len(self.lines))][1]

line = self.file.__next__()
if line is None:
self.file.close()
self.file = open(self.corpus_path, "r", encoding=self.encoding)
for _ in range(random.randint(self.corpus_lines if self.corpus_lines < 1000 else 1000)):
self.random_file.__next__()
line = self.random_file.__next__()
return line[:-1].split("\t")[1]

无论如何先取

1
2
t1 = 第 index 行的第一句
t2 = 第 index 行的第二句

get_corpus_line取决于 on_memory,如果True直接按 index 取,否则按数据流顺序从文件里读下一行(如果文件读完从头开始)

然后再根据random的值,替换t2

  • 50%:真实相邻句子 → label = 1

  • 50%:随机句子 → label = 0


__getitem__:对语句进行扰动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def __getitem__(self, item):
...
t1_random, t1_label = self.random_word(t1)
t2_random, t2_label = self.random_word(t2)
...
def random_word(self, sentence):
tokens = sentence.split() # 假设 sentence 已经是 空格分词后的文本
output_label = []

for i, token in enumerate(tokens):
prob = random.random()
# 15%的概率参与MLM,Mask又分为三种情况
if prob < 0.15:
prob /= 0.15 # 归一化概率,方便后面再做 80 / 10 / 10 的切分

# 80% randomly change token to mask token
if prob < 0.8:
# 替换成mask的index
tokens[i] = self.vocab.mask_index

# 10% randomly change token to random token
elif prob < 0.9:
# 替换成随机的index
tokens[i] = random.randrange(len(self.vocab))

# 10% randomly change token to current token
else:
# 保持原来的index
tokens[i] = self.vocab.stoi.get(token, self.vocab.unk_index)

# 这些位置的label填原来index(非0),参与MLM Loss计算
output_label.append(self.vocab.stoi.get(token, self.vocab.unk_index))

else:
tokens[i] = self.vocab.stoi.get(token, self.vocab.unk_index)
# 这些位置的填0,不参与MLM Loss计算
output_label.append(0)

return tokens, output_label

举例:

1
2
3
4
5
6
7
8
sentence = "我 爱 自然 语言 处理"
我 → 10
爱 → 11
自然 → 12
语言 → 13
处理 → 14
[MASK] → 3
[UNK] → 1
位置 i token random.random() output_label操作
0 0.62 填0
1 0.04 换为[MASK],填3
2 自然 0.81 填0
3 语言 0.12 换为随机词,比如填25
4 处理 0.30 填0

token返回:[10, 3, 12, 25, 14],对应文本 “我 [MASK] 自然 [随机词] 处理”,是刻意扰动后的输入(相当于增强)

output_label返回:[0, 11, 0, 13, 0],始终保存原始正确答案


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def __getitem__(self, item):
...
# [CLS] tag = SOS tag, [SEP] tag = EOS tag
t1 = [self.vocab.sos_index] + t1_random + [self.vocab.eos_index]
t2 = t2_random + [self.vocab.eos_index]

t1_label = [self.vocab.pad_index] + t1_label + [self.vocab.pad_index]
t2_label = t2_label + [self.vocab.pad_index]
# t1部分赋1,t2部分赋2
segment_label = ([1 for _ in range(len(t1))] + [2 for _ in range(len(t2))])[:self.seq_len]
# 根据BERT的容量进行裁断
bert_input = (t1 + t2)[:self.seq_len]
bert_label = (t1_label + t2_label)[:self.seq_len]
# 先裁断再padding,对齐到固定长度,包括input(pad token),label(pad_index),segment_label(通常0)
padding = [self.vocab.pad_index for _ in range(self.seq_len - len(bert_input))]
bert_input.extend(padding), bert_label.extend(padding), segment_label.extend(padding)

给扰动后的 t1,t2 token 和 label 加上特殊标记[CLS]和[SEP]

构成输入结构:[CLS] sentence A [SEP] sentence B [SEP]

然后给句子加上 segment_label,这就是 BERT 能区分 “两句话”的关键

最终的输出张量:

1
2
3
4
5
6
7
8
9
def __getitem__(self, item):
...
output = {
"bert_input": bert_input, # token ids
"bert_label": bert_label, # MLM labels
"segment_label": segment_label,
"is_next": is_next_label # NSP label
}
return {key: torch.tensor(value) for key, value in output.items()}
key 作用
bert_input 输入给 BERT Encoder
bert_label MLM loss 的监督
segment_label 句子 A / B 区分
is_next NSP 任务标签

vocab

1
2
3
TorchVocab  →  通用词表机制(排序、stoi/itos)
Vocab → 固定 BERT 所需的特殊 token 语义
WordVocab → 把“真实文本”变成“可训练的 id 序列”

构建TorchVocab

注释部分:

1
2
3
4
5
6
7
"""
定义一个词汇表对象,用于将字段数值化
属性:
freqs:一个 collections.Counter 对象,存储用于构建词汇表的数据中各个词元的频率
stoi:一个 collections.defaultdict 实例,将词元字符串映射到数值标识符
itos:一个列表,其中包含按数值标识符索引的词元字符串
"""

itos:id → token(index to string)

stoi:token → id(string to index)

并且支持:

  • 限制词表大小 max_size
  • 过滤低频词 min_freq
  • 预置特殊符号 specials(比如 <pad> <unk> 等),会被放到词表最前面

__init__

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class TorchVocab(object):
def __init__(self, counter, max_size=None, min_freq=1, specials=['<pad>', '<oov>'],
vectors=None, unk_init=None, vectors_cache=None):
self.freqs = counter
counter = counter.copy()
min_freq = max(min_freq, 1) # 设定最小词频

self.itos = list(specials)
# frequencies of special tokens are not counted when building vocabulary
# in frequency order
for tok in specials:
del counter[tok]

max_size = None if max_size is None else max_size + len(self.itos)

先把 specials 放进 itos,并且把 specialscounter 里删掉

不论在语料里出现多少,都强行占据最前面的 id

max_sizespecials 的长度算进去,避免把特殊符挤掉

1
2
3
4
5
def __init__():
...
# sort by frequency, then alphabetically
words_and_frequencies = sorted(counter.items(), key=lambda tup: tup[0])
words_and_frequencies.sort(key=lambda tup: tup[1], reverse=True)

一个很经典的“稳定排序”写法:

  1. 先按 token 字典序排序
  2. 再按频率降序排序(Python sort 稳定)

同样的数据、同样的参数,词表 id 不会因为 hash/遍历顺序漂移

1
2
3
4
5
6
7
8
9
def __init__():
...
for word, freq in words_and_frequencies:
if freq < min_freq or len(self.itos) == max_size:
break
self.itos.append(word)

# stoi is simply a reverse dict for itos
self.stoi = {tok: i for i, tok in enumerate(self.itos)}

按规则逐个加入词典 itos,并且根据 itos 创建反向字典 stoi

之后就可以实现 stoi[' '] = iditos[id] = ' '


构建Vocab

把“通用词表”绑定到 BERT 物理语义

1
2
3
4
5
6
7
8
9
10
11
12
13
class Vocab(TorchVocab):
def __init__(self, counter, max_size=None, min_freq=1):
self.pad_index = 0
self.unk_index = 1
self.eos_index = 2
self.sos_index = 3
self.mask_index = 4
super().__init__(
counter,
specials=["<pad>", "<unk>", "<eos>", "<sos>", "<mask>"],
max_size=max_size,
min_freq=min_freq
)
index token 在 BERT 里的角色
0 <pad> 真空态(padding,不参与任何 loss)
1 <unk> 未知词坍缩态
2 <eos> 句子终止
3 <sos> 句子起始
4 <mask> MLM 扰动态

构建WordVocab

1
2
3
4
5
6
7
8
9
10
11
class WordVocab(Vocab):
def __init__(self, texts, max_size=None, min_freq=1):
counter = Counter()
for line in texts:
if isinstance(line, list):
words = line
else:
words = line.replace("\n", "").replace("\t", "").split() # 以空格分词
for word in words:
counter[word] += 1
super().__init__(counter, max_size=max_size, min_freq=min_freq)

输入的texts它可以是两种形式之一

已经分好词:

1
2
3
4
texts = [
["我", "爱", "自然", "语言"],
["语言", "模型"]
]

原始字符串:

1
2
3
4
texts = [
"我 爱 自然 语言",
"语言 模型"
]

初始化后并统计词频

to_seq, from_seq最重要的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def to_seq(self, sentence, seq_len=None, with_eos=False, with_sos=False, with_len=False):
if isinstance(sentence, str):
sentence = sentence.split()

seq = [self.stoi.get(word, self.unk_index) for word in sentence]

# 可选加特殊 token 起始标记和结束标记
if with_eos:
seq += [self.eos_index] # this would be index 1
if with_sos:
seq = [self.sos_index] + seq

# 保存“未 padding 前”的真实长度(很多下游任务会用)
origin_seq_len = len(seq)
# 根据seq_len 判断是否需要填充或者截断
if seq_len is None:
pass
elif len(seq) <= seq_len:
seq += [self.pad_index for _ in range(seq_len - len(seq))]
else:
seq = seq[:seq_len]

return (seq, origin_seq_len) if with_len else seq

# to_seq 的逆操作
def from_seq(self, seq, join=False, with_pad=False):
words = [self.itos[idx]
if idx < len(self.itos)
else "<%d>" % idx
for idx in seq
if with_pad or idx != self.pad_index]

# join控住是否拼接成字符串
return " ".join(words) if join else words

输入sentence同样两种形式

1
seq = [self.stoi.get(word, self.unk_index) for word in sentence]
  • 在词表里 → 对应 id
  • 不在词表里 → <unk>(坍缩态)

origin_seq_len = len(seq) 用途通常是:

  • RNN(真实长度)
  • attention mask
  • loss 归一化
1
2
3
4
5
6
words = [
self.itos[idx] if idx < len(self.itos)
else "<%d>" % idx
for idx in seq
if with_pad or idx != self.pad_index
]
  • idx < len(self.itos) → 正常词
  • 否则 → 打印成 <12345>,防止 crash
  • with_pad=False → 默认不保留 pad

build函数:

独立的“词表构建脚本入口”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def build():
import argparse

parser = argparse.ArgumentParser()
parser.add_argument("-c", "--corpus_path", required=True, type=str)
parser.add_argument("-o", "--output_path", required=True, type=str)
parser.add_argument("-s", "--vocab_size", type=int, default=None)
parser.add_argument("-e", "--encoding", type=str, default="utf-8")
parser.add_argument("-m", "--min_freq", type=int, default=1)
args = parser.parse_args()

with open(args.corpus_path, "r", encoding=args.encoding) as f:
vocab = WordVocab(f, max_size=args.vocab_size, min_freq=args.min_freq)

print("VOCAB SIZE:", len(vocab))
vocab.save_vocab(args.output_path)

从原始语料中构建一份 WordVocab,并把这份“冻结后的词表”保存到磁盘

而后训练阶段和推理阶段都直接``load_vocab`

model

Snipaste_2026-01-10_16-33-16

BERT与Transformer不同在于在词嵌入和位置嵌入中间还加了一个片段嵌入(segment)

embedding

1
2
3
4
5
6
7
8
9
10
11
12
13
token_id      ∈ ℕ
segment_id ∈ {0,1,2}
position_id ∈ {0,1,2,...}

↓ 各自 lookup / 计算

TokenEmbedding → [B, L, D]
SegmentEmbedding → [B, L, D]
PositionalEmbedding → [1, L, D]

↓ 按元素相加

Embedding Output → [B, L, D]

构建BERTEmbedding类:

把三种“互补的离散信息”投影到同一个向量空间,然后相加

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class BERTEmbedding(nn.Module):
"""
BERT Embedding which is consisted with under features
1. TokenEmbedding : normal embedding matrix
2. PositionalEmbedding : adding positional information using sin, cos
2. SegmentEmbedding : adding sentence segment info, (sent_A:1, sent_B:2)

sum of all these features are output of BERTEmbedding
"""

def __init__(self, vocab_size, embed_size, dropout=0.1):
"""
:param vocab_size: total vocab size
:param embed_size: embedding size of token embedding
:param dropout: dropout rate
"""
super().__init__()
self.token = TokenEmbedding(vocab_size=vocab_size, embed_size=embed_size)
self.position = PositionalEmbedding(d_model=self.token.embedding_dim)
self.segment = SegmentEmbedding(embed_size=self.token.embedding_dim)
self.dropout = nn.Dropout(p=dropout)
self.embed_size = embed_size

def forward(self, sequence, segment_label):
x = self.token(sequence) + self.position(sequence) + self.segment(segment_label)
return self.dropout(x)

__init__

vocab_size:词表大小,决定 embedding lookup table 的行数 以及 token id 的合法范围

embed_size:BERT 的隐藏维度,匹配论文里的hidden_size

在论文中:

1
2
BASE (L=12, H=768, A=12, Total Parameters=110M)
LARGE (L=24, H=1024, A=16, Total Parameters=340M).

TokenEmbedding

1
2
3
class TokenEmbedding(nn.Embedding):
def __init__(self, vocab_size, embed_size=512):
super().__init__(vocab_size, embed_size, padding_idx=0)

关键约束:padding_idx=0

PositionalEmbedding

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class PositionalEmbedding(nn.Module):

def __init__(self, d_model, max_len=512):
super().__init__()

# Compute the positional encodings once in log space.
pe = torch.zeros(max_len, d_model).float()
pe.require_grad = False

position = torch.arange(0, max_len).float().unsqueeze(1)
div_term = (torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model)).exp()

pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)

pe = pe.unsqueeze(0)
self.register_buffer('pe', pe)

def forward(self, x):
return self.pe[:, :x.size(1)]
参数 含义
d_model embedding 维度 D
max_len 最大支持序列长度

pe.require_grad = False:位置编码不参与训练

SegmentEmbedding

1
2
3
class SegmentEmbedding(nn.Embedding):
def __init__(self, embed_size=512):
super().__init__(3, embed_size, padding_idx=0)

这里num_embedding=3是因为

id 语义
0 padding
1 sentence A
2 sentence B

同一个 token,在 A 句 和 B 句中 embedding 会被整体平移到不同位置

输入 输出 信息来源
Token token id [B,L,D] 词语身份
Position 序列长度 [1,L,D] 位置信息
Segment segment id [B,L,D] 句子身份

Transformer 后面的线性层和 attention 会自动学会如何“解码”这三种信息的混合

attention

构建MultiHeadedAttention

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class MultiHeadedAttention(nn.Module):
"""
Take in model size and number of heads.
"""

def __init__(self, h, d_model, dropout=0.1):
super().__init__()
assert d_model % h == 0

# We assume d_v always equals d_k
self.d_k = d_model // h
self.h = h

self.linear_layers = nn.ModuleList([nn.Linear(d_model, d_model) for _ in range(3)])
self.output_linear = nn.Linear(d_model, d_model)
self.attention = Attention()

self.dropout = nn.Dropout(p=dropout)

assert d_model % h == 0:每个 head 必须分到相同维度

每个 head 的维度为 d_k

以BASE为例

1
2
3
d_model = 768
h = 12
→ d_k = 64

self.linear_layers = nn.ModuleList([nn.Linear(d_model, d_model) for _ in range(3)])构建三个线性层获得Q/K/V

先用一个 Linear 把 d_model 映射到 d_model,再 reshape 成 h 个 head

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def forward(self, query, key, value, mask=None):
batch_size = query.size(0)

# 1) Do all the linear projections in batch from d_model => h x d_k
query, key, value = [l(x).view(batch_size, -1, self.h, self.d_k).transpose(1, 2)
for l, x in zip(self.linear_layers, (query, key, value))]

# 2) Apply attention on all the projected vectors in batch.
x, attn = self.attention(query, key, value, mask=mask, dropout=self.dropout)

# 3) "Concat" using a view and apply a final linear.
x = x.transpose(1, 2).contiguous().view(batch_size, -1, self.h * self.d_k)

return self.output_linear(x)

在 BERT 的 self-attention 里,通常是

1
query = key = value = x

线性映射 + reshape

1
2
3
4
5
6
7
query, key, value = [
l(x)
.view(batch_size, -1, self.h, self.d_k) # [B, L, h, d_k]
.transpose(1, 2) # [B, h, L, d_k] Attention.forward 期望的输入格式
for l, x in zip(self.linear_layers, (query, key, value))
]

点积注意力计算:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class Attention(nn.Module):
"""
Compute 'Scaled Dot Product Attention
"""

def forward(self, query, key, value, mask=None, dropout=None):
scores = torch.matmul(query, key.transpose(-2, -1)) \
/ math.sqrt(query.size(-1))
# [B, h, L, d_k] × [B, h, d_k, L] → [B, h, L, L]
# 不允许看的位置在 softmax 后概率≈0
if mask is not None:
scores = scores.masked_fill(mask == 0, -1e9)

p_attn = F.softmax(scores, dim=-1)

if dropout is not None:
p_attn = dropout(p_attn)

return torch.matmul(p_attn, value), p_attn

输出为[B, h, L, d_k]

然后实现多头合并

1
2
3
x = x.transpose(1, 2) \
.contiguous() \
.view(batch_size, -1, self.h * self.d_k)

实现

1
2
3
4
5
Embedding output: [B, L, D]

MultiHeadAttention

[B, L, D](但内容已经是“上下文化的”)

让 token 彼此看见对方

utils

文件 角色
feed_forward.py FFN(逐 token 的非线性变换)
gelu.py BERT 使用的激活函数
layer_norm.py LayerNorm(特征维归一化)
sublayer.py 残差 + LayerNorm + Dropout 的统一封装

feed_forward.py

1
2
3
4
5
6
7
8
9
10
11
12
13
class PositionwiseFeedForward(nn.Module):
"Implements FFN equation."

def __init__(self, d_model, d_ff, dropout=0.1):
super(PositionwiseFeedForward, self).__init__()
self.w_1 = nn.Linear(d_model, d_ff)
self.w_2 = nn.Linear(d_ff, d_model)
self.dropout = nn.Dropout(dropout)
self.activation = GELU()

def forward(self, x):
return self.w_2(self.dropout(self.activation(self.w_1(x))))

FFN 不跨 token,因为FFN的线性层只作用在最后一维,不混合L那一维度,是单 token 的非线性表达能力

选用GELU是因为

  • attention 输出是连续概率混合

  • 硬 ReLU 会破坏分布连续性

  • GELU 更符合“概率门控”的语义

    gelu.py

    1
    2
    3
    4
    5
    6
    7
    8
    class GELU(nn.Module):
    """
    Paper Section 3.4, last paragraph notice that BERT used the GELU instead of RELU
    """

    def forward(self, x):
    return 0.5 * x * (1 + torch.tanh(math.sqrt(2 / math.pi) * (x + 0.044715 * torch.pow(x, 3))))

    tanh 近似:
    $$
    \operatorname{GELU}(x) \approx 0.5 x\left(1+\tanh \left(\sqrt{\frac{2}{\pi}}\left(x+0.044715 x^{3}\right)\right)\right)
    $$
    Hendrycks & Gimpel (2016) 提出的版本

    tanh 库表示:
    $$
    \begin{aligned}
    \operatorname{GELU}(x) & =\frac{x}{2}\left(1+\operatorname{erf}\left(\frac{x}{\sqrt{2}}\right)\right) \\
    \operatorname{erf}(x) & =\frac{2}{\sqrt{\pi}} \int_{0}^{x} e^{-t^{2}} d t
    \end{aligned}
    $$

layer_norm.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class LayerNorm(nn.Module):
"Construct a layernorm module (See citation for details)."

def __init__(self, features, eps=1e-6):
super(LayerNorm, self).__init__()
self.a_2 = nn.Parameter(torch.ones(features))
self.b_2 = nn.Parameter(torch.zeros(features))
self.eps = eps

def forward(self, x):
mean = x.mean(-1, keepdim=True)
std = x.std(-1, keepdim=True)
return self.a_2 * (x - mean) / (std + self.eps) + self.b_2

对最后一维(embedding 维)进行单序列归一化

sublayer.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class SublayerConnection(nn.Module):
"""
A residual connection followed by a layer norm.
Note for code simplicity the norm is first as opposed to last.
"""

def __init__(self, size, dropout):
super(SublayerConnection, self).__init__()
self.norm = LayerNorm(size)
self.dropout = nn.Dropout(dropout)

def forward(self, x, sublayer):
"Apply residual connection to any sublayer with the same size."
return x + self.dropout(sublayer(self.norm(x)))

在原始论文《Attention Is All You Need》中是y = LayerNorm(x + Dropout(Sublayer(x))),是Post-LN结构

先算子层(Attention / FFN),再做残差,最后 LayerNorm

理论上“更干净”,但深层训练不稳定

这里等价于 y = x + Dropout(Sublayer(LayerNorm(x))),是Pre-LN结构

梯度可以直接沿残差路径流动,深层模型训练稳定得多,几乎所有现代 BERT / GPT / LLaMA都用这个

transformer

构建TransformerBlock

将之前的类连接上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class TransformerBlock(nn.Module):
"""
Bidirectional Encoder = Transformer (self-attention)
Transformer = MultiHead_Attention + Feed_Forward with sublayer connection
"""

def __init__(self, hidden, attn_heads, feed_forward_hidden, dropout):
"""
:param hidden: hidden size of transformer
:param attn_heads: head sizes of multi-head attention
:param feed_forward_hidden: feed_forward_hidden, usually 4*hidden_size
:param dropout: dropout rate
"""

super().__init__()
self.attention = MultiHeadedAttention(h=attn_heads, d_model=hidden)
self.feed_forward = PositionwiseFeedForward(d_model=hidden, d_ff=feed_forward_hidden, dropout=dropout)
self.input_sublayer = SublayerConnection(size=hidden, dropout=dropout)
self.output_sublayer = SublayerConnection(size=hidden, dropout=dropout)
self.dropout = nn.Dropout(p=dropout)

def forward(self, x, mask):
x = self.input_sublayer(x, lambda _x: self.attention.forward(_x, _x, _x, mask=mask))
x = self.output_sublayer(x, self.feed_forward)
return self.dropout(x)

forward

参数 shape 含义
x [B, L, hidden] embedding 或上一层输出
mask [B, 1, 1, L] 或可 broadcast attention mask

Attention 子层

1
2
3
4
x = self.input_sublayer(
x,
lambda _x: self.attention.forward(_x, _x, _x, mask=mask)
)

等价于:

1
2
3
x_norm = LayerNorm(x)
attn_out = MultiHeadAttention(x_norm, x_norm, x_norm, mask)
x = x + Dropout(attn_out)
  • Attention 在 FFN 之前

  • LayerNorm 在残差之前(Pre-LN)

  • token 交互发生在这里

FFN 子层

1
x = self.output_sublayer(x, self.feed_forward)

等价于:

1
2
3
x_norm = LayerNorm(x)
ffn_out = FeedForward(x_norm)
x = x + Dropout(ffn_out)
  • 每个 token 已经是“上下文化表示”
  • FFN 做的是单 token 的非线性增强

bert

构建BERT类,把离散 token 序列,映射成“多层上下文化的连续表示”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class BERT(nn.Module):
"""
BERT model : Bidirectional Encoder Representations from Transformers.
"""

def __init__(self, vocab_size, hidden=768, n_layers=12, attn_heads=12, dropout=0.1):
"""
:param vocab_size: vocab_size of total words
:param hidden: BERT model hidden size 论文里的 d_model,embedding 维度
:param n_layers: numbers of Transformer blocks(layers) 对应论文的12和24
:param attn_heads: number of attention heads
:param dropout: dropout rate
"""

super().__init__()
self.hidden = hidden
self.n_layers = n_layers
self.attn_heads = attn_heads

# paper noted they used 4*hidden_size for ff_network_hidden_size
self.feed_forward_hidden = hidden * 4

# embedding for BERT, sum of positional, segment, token embeddings
self.embedding = BERTEmbedding(vocab_size=vocab_size, embed_size=hidden)

# multi-layers transformer blocks, deep network
self.transformer_blocks = nn.ModuleList(
[TransformerBlock(hidden, attn_heads, hidden * 4, dropout) for _ in range(n_layers)])

def forward(self, x, segment_info):
# attention masking for padded token
# torch.ByteTensor([batch_size, 1, seq_len, seq_len)
# x>0代表非pad,x.size = [B, L]
# x.unsqueeze(1).repeat(1, x.size(1), 1) = [B, L, L]
mask = (x > 0).unsqueeze(1).repeat(1, x.size(1), 1).unsqueeze(1)
# mask shape [B, 1, L, L],最终再广播
# embedding the indexed sequence to sequence of vectors
x = self.embedding(x, segment_info)

# running over multiple transformer blocks
for transformer in self.transformer_blocks:
x = transformer.forward(x, mask)

return x # 最后一层的隐藏状态 [B, L, hidden],没有做任何后操作

构造 attention mask

1
mask = (x > 0).unsqueeze(1).repeat(1, x.size(1), 1).unsqueeze(1)

这是encoder-style 全可见 mask,不做未来屏蔽,这是和GPT的区别

而不同任务需要不同映射

任务 需要什么
MLM token-level vocab logits
NSP sentence-level binary logits
分类 [CLS] 向量
NER token-level tag logits

language_model

把“纯 BERT Encoder”变成“可训练预训练模型”

构建BERTLM

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class BERTLM(nn.Module):
"""
BERT Language Model
Next Sentence Prediction Model + Masked Language Model
"""

def __init__(self, bert: BERT, vocab_size):
"""
:param bert: BERT model which should be trained
:param vocab_size: total vocab size for masked_lm
"""

super().__init__()
self.bert = bert
self.next_sentence = NextSentencePrediction(self.bert.hidden)
self.mask_lm = MaskedLanguageModel(self.bert.hidden, vocab_size)

NSP head

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class NextSentencePrediction(nn.Module):
"""
2-class classification model : is_next, is_not_next
"""

def __init__(self, hidden):
"""
:param hidden: BERT model output size
"""
super().__init__()
self.linear = nn.Linear(hidden, 2)
self.softmax = nn.LogSoftmax(dim=-1)

def forward(self, x):
return self.softmax(self.linear(x[:, 0])) # [B, hidden]
# 用整数索引某一维,会“消掉”这一维
Snipaste_2026-01-10_18-20-37

输入x[:, 0]相当于取每个样本的第 0 个 token 的表示

因为在BERT里第0个位置([CLS])被设计成整句的全局语义代表

输出is_next / not_next

dataset.py中返回包含is_next_label,训练时

1
nsp_loss = NLLLoss(nsp_logits, is_next_label)

为什么后来模型删除了 NSP

因为 NSP 这种二分类任务并不能有效教会模型句子级语义关系,且会干扰更重要的 MLM 学习

NSP 的负样本设计过于简单,会占用并干扰 [CLS] 的表示能力,后续工作中

  • RoBERTa:完全移除 NSP,仅保留 MLM,并通过更大数据和更强训练策略学习上下文关系
  • ALBERT:用SOP(Sentence Order Prediction) 替代 NSP,构造更困难、更语义相关的负样本

MLM head

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class MaskedLanguageModel(nn.Module):
"""
predicting origin token from masked input sequence
n-class classification problem, n-class = vocab_size
"""

def __init__(self, hidden, vocab_size):
"""
:param hidden: output size of BERT model
:param vocab_size: total vocab size
"""
super().__init__()
self.linear = nn.Linear(hidden, vocab_size)
self.softmax = nn.LogSoftmax(dim=-1)

def forward(self, x):
return self.softmax(self.linear(x))
image-20260110180638502_(1)

输入:x: [B, L, hidden];输出:[B, L, vocab_size]

对每一个 token 位置预测一个 vocab 分布

1
mlm_loss = NLLLoss(ignore_index=0) # 只对 mask 的位置算 loss

一次forward同时算两个任务

1
2
3
4
class BERTLM(nn.Module):
def forward(self, x, segment_label):
x = self.bert(x, segment_label)
return self.next_sentence(x), self.mask_lm(x)

BERT Encoder 只跑一次,输出同时喂给两个 head

输出是一个 tuple

1
2
3
4
(
nsp_logits, # [B, 2]
mlm_logits # [B, L, vocab_size]
)

trainer

Snipaste_2026-01-10_16-32-19

optim_schedule

构建ScheduledOptim

对学习率进行控制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class ScheduledOptim():
'''A simple wrapper class for learning rate scheduling'''

def __init__(self, optimizer, d_model, n_warmup_steps):
# optimizer: 实际的 Adam 优化器
# d_model: BERT 的 hidden size
# n_warmup_steps: 预热步数
self._optimizer = optimizer
self.n_warmup_steps = n_warmup_steps
self.n_current_steps = 0
self.init_lr = np.power(d_model, -0.5)

def step_and_update_lr(self):
"Step with the inner optimizer"
self._update_learning_rate()
self._optimizer.step()

def zero_grad(self):
"Zero out the gradients by the inner optimizer"
self._optimizer.zero_grad()

def _get_lr_scale(self):
return np.min([
np.power(self.n_current_steps, -0.5),
np.power(self.n_warmup_steps, -1.5) * self.n_current_steps])

def _update_learning_rate(self):
''' Learning rate scheduling per step '''

self.n_current_steps += 1
lr = self.init_lr * self._get_lr_scale()

for param_group in self._optimizer.param_groups:
param_group['lr'] = lr

是一个“包着 Adam 的学习率调度器”,用的是 Transformer 论文里的 Noam Scheduler

在 Transformer / BERT 里,一开始 learning rate 不能太大,后面又必须逐渐变小,否则:

  • 初期容易梯度爆炸
  • 后期不收敛或震荡

在《Attention Is All You Need》论文里提出了这个公式:
$$
lr = d_{model}^{-0.5} * \min(step^{-0.5}, step * warmup^{-1.5})
$$
step_and_update_lr:

  1. 先更新 learning rate
  2. 再做 optimizer.step()

保证每一步用的 lr 都是“当前步数对应的值”

pretrain

整个库的核心,构建BERTTrainer类,主要实现三件事

  1. 组装 BERT + 预训练 head
  2. 定义 loss / optimizer / device
  3. 执行 完整训练循环

__init__

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class BERTTrainer:
"""
BERTTrainer make the pretrained BERT model with two LM training method.

1. Masked Language Model : 3.3.1 Task #1: Masked LM
2. Next Sentence prediction : 3.3.2 Task #2: Next Sentence Prediction

please check the details on README.md with simple example.

"""

def __init__(self, bert: BERT, vocab_size: int,
train_dataloader: DataLoader, test_dataloader: DataLoader = None,
lr: float = 1e-4, betas=(0.9, 0.999), weight_decay: float = 0.01, warmup_steps=10000,
with_cuda: bool = True, cuda_devices=None, log_freq: int = 10):
"""
:param bert: BERT model which you want to train
:param vocab_size: total word vocab size
:param train_dataloader: train dataset data loader
:param test_dataloader: test dataset data loader [can be None]
:param lr: learning rate of optimizer
:param betas: Adam optimizer betas
:param weight_decay: Adam optimizer weight decay param
:param with_cuda: traning with cuda
:param log_freq: logging frequency of the batch iteration
"""

# Setup cuda device for BERT training, argument -c, --cuda should be true
cuda_condition = torch.cuda.is_available() and with_cuda
self.device = torch.device("cuda:0" if cuda_condition else "cpu")

# This BERT model will be saved every epoch
self.bert = bert
# Initialize the BERT Language Model, with BERT model
self.model = BERTLM(bert, vocab_size).to(self.device)

# Distributed GPU training if CUDA can detect more than 1 GPU
if with_cuda and torch.cuda.device_count() > 1:
print("Using %d GPUS for BERT" % torch.cuda.device_count())
self.model = nn.DataParallel(self.model, device_ids=cuda_devices)

# Setting the train and test data loader
self.train_data = train_dataloader
self.test_data = test_dataloader

# Setting the Adam optimizer with hyper-param
self.optim = Adam(self.model.parameters(), lr=lr, betas=betas, weight_decay=weight_decay)
self.optim_schedule = ScheduledOptim(self.optim, self.bert.hidden, n_warmup_steps=warmup_steps)

# Using Negative Log Likelihood Loss function for predicting the masked_token
self.criterion = nn.NLLLoss(ignore_index=0)

self.log_freq = log_freq

print("Total Parameters:", sum([p.nelement() for p in self.model.parameters()]))

  • self.bert:裸的 Encoder(用来保存)
  • self.model:BERT + MLM + NSP head(用来训练)

👉 训练的是 BERTLM,不是 BERT

DataLoader 接入在dataset.py 里构造的字典

1
2
3
4
5
6
{
"bert_input",
"segment_label",
"bert_label",
"is_next"
}

Optimizer + Scheduler:Trainer 以后只调用 optim_schedule,不再直接调用 Adam

Loss 函数:使用NLLLoss

项目 NLLLoss CrossEntropyLoss
输入 log 概率 原始 logits
是否内部做 softmax ❌ 不做 ✅ 做
是否内部取 log ❌ 不做 ✅ 做
本质 负对数似然 LogSoftmax + NLL
使用难度 容易用错 更安全

为什么可以复用同一个 NLLLoss 类

  • NSP:[B, 2] vs [B]
  • MLM:[B, L, vocab] vs [B, L]
  • 非 mask 的 label = 0,被 ignore

iteration:训练真正发生的地方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def iteration(self, epoch, data_loader, train=True):
"""
loop over the data_loader for training or testing
if on train status, backward operation is activated
and also auto save the model every peoch

:param epoch: current epoch index
:param data_loader: torch.utils.data.DataLoader for iteration
:param train: boolean value of is train or test
:return: None
"""
str_code = "train" if train else "test"

# Setting the tqdm progress bar
data_iter = tqdm.tqdm(enumerate(data_loader),
desc="EP_%s:%d" % (str_code, epoch),
total=len(data_loader),
bar_format="{l_bar}{r_bar}")

avg_loss = 0.0
total_correct = 0
total_element = 0

for i, data in data_iter:
# 0. batch_data will be sent into the device(GPU or cpu)
data = {key: value.to(self.device) for key, value in data.items()}

# 1. forward the next_sentence_prediction and masked_lm model
next_sent_output, mask_lm_output = self.model.forward(data["bert_input"], data["segment_label"])

# 2-1. NLL(negative log likelihood) loss of is_next classification result
next_loss = self.criterion(next_sent_output, data["is_next"])

# 2-2. NLLLoss of predicting masked token word
mask_loss = self.criterion(mask_lm_output.transpose(1, 2), data["bert_label"])

# 2-3. Adding next_loss and mask_loss : 3.4 Pre-training Procedure
loss = next_loss + mask_loss

# 3. backward and optimization only in train
if train:
self.optim_schedule.zero_grad()
loss.backward()
self.optim_schedule.step_and_update_lr()

# next sentence prediction accuracy
correct = next_sent_output.argmax(dim=-1).eq(data["is_next"]).sum().item()
avg_loss += loss.item()
total_correct += correct
total_element += data["is_next"].nelement()

post_fix = {
"epoch": epoch,
"iter": i,
"avg_loss": avg_loss / (i + 1),
"avg_acc": total_correct / total_element * 100,
"loss": loss.item()
}

if i % self.log_freq == 0:
data_iter.write(str(post_fix))

print("EP%d_%s, avg_loss=" % (epoch, str_code), avg_loss / len(data_iter), "total_acc=",
total_correct * 100.0 / total_element)

前向传播(BERT + 两个 head)

输出 shape
next_sent_output [B, 2]
mask_lm_output [B, L, vocab_size]
1
mask_loss = self.criterion(mask_lm_output.transpose(1, 2), data["bert_label"])

为什么要 transpose(1,2)

因为 nn.NLLLoss 要求“类别维在第 2 维(dim=1)”,而 MLM 的输出把 vocab 维放在了最后,所以必须 transpose

1
2
input:  [N, C, *]
target: [N, *]

在准确率上只统计 NSP

1
correct = next_sent_output.argmax(dim=-1).eq(data["is_next"])

因为:

  • MLM 的准确率不太有直觉意义

  • 而且 mask 比例低

只保存 Encoder,不保存 head,因为预训练完成后 MLM / NSP head 通常会被丢弃,下游任务会接新的 head

整体流程:

1
2
3
4
5
6
7
8
9
10
11
12
Dataset → DataLoader

BERTLM.forward
├─ BERT Encoder
├─ NSP head
└─ MLM head

loss = NSP + MLM

backward

ScheduledOptim.step

总结

BERT = Embedding → 多层 Transformer Encoder → 预训练任务(MLM + NSP)

论文模块 代码文件
Input Representation embedding.py(token / segment / position)
Transformer Encoder transformer.py + bert.py
MLM language_model.pyMaskedLanguageModel
NSP language_model.pyNextSentencePrediction
Training pretrain.py + optim_schedule.py

下游微调:

任务类型 输入示例 输出方式 代表数据集
句子对分类任务 [CLS] 句子A [SEP] 句子B [SEP] 使用 [CLS] 向量分类 MNLI, QQP, QNLI
单句分类任务 [CLS] 句子A [SEP] 使用 [CLS] 向量分类 SST-2, CoLA
问答任务 [CLS] 问题 [SEP] 上下文 [SEP] 计算所有词元的起止位置 SQuAD
单句标注任务 [CLS] 句子A [SEP] 使用所有词元向量分别分类 CoNLL-2003 NER

常用语库具体

句子对分类任务:

语库 输入 输出 模型学什么
MNLI 前提句(premise)
假设句(hypothesis)
entailment(蕴含)
contradiction(矛盾)
neutral(中立)
判断两个句子在语义逻辑上的关系
QQP 两个问题句子 是否语义等价(是 / 否) 识别不同表述下的同一语义

单句分类任务:

语库 输入 输出 模型学什么
SST-2 一句话 positive
negative
情感分析
CoLA 一句话 语法是否合理(yes/no) 隐式语法规则,而不是语义

单句标注任务:

语库 输入 输出 模型学什么
CoNLL-2003 NER 一句话 对每个 token 预测标签 逐词理解 + 上下文依赖