常见面试题
1.注意力机制¶
MHA¶
multiheadattention(torch.nn.Module):
这表示定义一个名为 multiheadattention 的类,并且它继承自 nn.Module。nn.Module 是 PyTorch 中所有神经网络模块的基类,提供参数管理、子模块注册、前向传播接口、模型保存加载、.to(device)、.eval() 等核心功能。继承它的本质含义是:让 multiheadattention 成为一个“可训练的神经网络模块”,内部定义的层(如 nn.Linear)会自动被注册为可学习参数,从而能被优化器管理并参与反向传播。没有继承 nn.Module,这个类就只是普通 Python 类,不具备深度学习框架的功能。
super().init():
这行代码调用父类 nn.Module 的构造函数。作用是完成基类的初始化,包括建立参数容器、子模块字典、缓冲区结构等内部机制。如果不调用它,虽然类语法上可以运行,但模块中的参数不会被正确注册,model.parameters() 可能为空,优化器无法更新参数,模型状态也无法正确保存。简言之,它确保当前类真正成为一个“完整初始化的 PyTorch 模块”。
import torch
import math
class multiheadattention(torch.nn.Module):
def __init__(self, d, h):
super().__init__()
self.d = d
self.h = h
self.k = d // h
self.wqkv = torch.nn.Linear(d, d*3)
self.wo = torch.nn.Linear(d, d)
def forward(self, x, mask = None):
B, L, D = x.shape
qkv = self.wqkv(x)
q, k, v = torch.chunk(qkv, 3, -1)
# 可以用reshap代替transpose
q = q.view(B, L, self.h, self.k).transpose(1, 2)
k = k.view(B, L, self.h, self.k).transpose(1, 2)
v = v.view(B, L, self.h, self.k).transpose(1, 2)
attention_score =torch.matmul(q, k.transpose(-1,-2)) / math.sqrt(self.k)
if mask is not None:
attention_score = attention_score.masked_fill(mask=mask, value=-1e9)
attention_weight = torch.softmax(attention_score, dim=-1)
context = torch.matmul(attention_weight, v).transpose(1, 2).contiguous().view(B, L, D)
output = self.wo(context)
return output, attention_weight
# --- 测试代码 ---
batch_size = 5
max_seq_len = 10
d_model = 64
head = 4
x = torch.randn(batch_size, max_seq_len, d_model)
attention_model = multiheadattention(d_model, head)
output, attention = attention_model(x)
print("代码运行成功!")
print("输出张量的形状:", output.shape)
print("注意力权重的形状:", attention.shape)
代码运行成功! 输出张量的形状: torch.Size([5, 10, 64]) 注意力权重的形状: torch.Size([5, 4, 10, 10])
MQA¶
多查询注意力:所有查询头共用单一的键值头
与原 MultiHeadAttention 的差异:
- Q: 维持与 MHA 相同,shape -> [B, h, L, d_k]
- K/V: 仅产生 1 组共享头,shape -> [B, 1, L, d_k] 这样在推理时 KV cache 只需缓存 1 份(而非 h 份)。
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
class MultiQueryAttention(nn.Module):
def __init__(self, d_model, num_head):
super(MultiQueryAttention, self).__init__()
assert d_model % num_head == 0, "d_model 必须能被 num_head 整除"
self.d_model = d_model
self.num_head = num_head
self.d_k = d_model // num_head
# 与原实现的区别:
# - Q 仍然映射到 d_model(然后 reshape 成 h 个头)
# - K/V 只映射到 d_k(单头维度),且各 1 份
self.wq = nn.Linear(d_model, d_model) # 生成多头的 Q
self.wkv = nn.Linear(d_model, 2 * self.d_k) # 生成共享的 K、V(仅 1 头的维度)
self.wo = nn.Linear(d_model, d_model)
def forward(self, x, mask=None):
B, L, _ = x.shape
# 1) 计算 Q、K、V
q = self.wq(x) # [B, L, d_model]
kv = self.wkv(x) # [B, L, 2*d_k]
k, v = torch.chunk(kv, 2, dim=-1) # [B, L, d_k], [B, L, d_k]
# 2) 形状整理
# Q: [B, L, h, d_k] -> [B, h, L, d_k]
q = q.view(B, L, self.num_head, self.d_k).transpose(1, 2) # [B, h, L, d_k]
# 共享 K/V:加一个“伪 head 维”=1,方便广播到 h
# K/V: [B, L, d_k] -> [B, 1, L, d_k],unsqueeze(dim) = 在 dim 这个位置插入一个长度为1的轴
k = k.unsqueeze(1) # [B, 1, L, d_k]
v = v.unsqueeze(1) # [B, 1, L, d_k]
# 3) 注意力分数:Q 与共享 K
# scores: [B, h, L, L],这里利用了 K 在 head 维度上的广播
scores = torch.matmul(q, k.transpose(-1, -2)) / math.sqrt(self.d_k) # [B, h, L, L]
if mask is not None:
# 要求 mask 能广播到 [B, h, L, L]
# 例如 mask 形状可为 [B, 1, 1, L](causal/ padding),或 [B, 1, L, L]
scores = scores.masked_fill(mask, -1e9)
attn = torch.softmax(scores, dim=-1) # [B, h, L, L]
# 4) 加权求和:与共享 V 相乘(同样通过广播)
context = torch.matmul(attn, v) # [B, h, L, d_k]
# 5) 还原回 [B, L, d_model] 并输出
context = context.transpose(1, 2).contiguous() # [B, L, h, d_k]
context = context.view(B, L, self.d_model) # [B, L, d_model]
output = self.wo(context) # [B, L, d_model]
return output, attn
# --- 简单测试(与原 MHA 测试保持一致) ---
if __name__ == "__main__":
batch_size = 2
d_model = 10
head = 2
max_seq_len = 5
x = torch.randn(batch_size, max_seq_len, d_model)
attention_model = MultiQueryAttention(d_model, head)
output, attention = attention_model(x)
print("代码运行成功!(MQA)")
print("输出张量的形状:", output.shape) # 期望: [B, L, d_model]
print("注意力权重的形状:", attention.shape) # 期望: [B, h, L, L]
代码运行成功!(MQA) 输出张量的形状: torch.Size([2, 5, 10]) 注意力权重的形状: torch.Size([2, 2, 5, 5])
GQA¶
分组查询注意力:采用折中策略,将h个查询头分为g组
GQA: 把 h 个 Query 头分成 g 组;组内共享 1 套 K/V
- num_head = h
- num_kv_head = g (1 < g <= h, 且 h % g == 0)
- 形状约定: Q: [B, h, L, d_k] K,V(分组): [B, g, L, d_k]
计算时把 Q reshape 成 [B, g, h_per_group, L, d_k],
与同组的 K,V 做注意力;最后再还原回 [B, h, L, d_k]。
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
class GroupedQueryAttention(nn.Module):
def __init__(self, d_model, num_head, num_kv_head):
super(GroupedQueryAttention, self).__init__()
assert d_model % num_head == 0, "d_model 必须能被 num_head 整除"
assert 1 <= num_kv_head <= num_head, "num_kv_head 必须在 [1, num_head] 范围内"
assert num_head % num_kv_head == 0, "num_head 必须能被 num_kv_head 整除(每组等量分配)"
self.d_model = d_model
self.num_head = num_head # h
self.num_kv_head = num_kv_head # g
self.d_k = d_model // num_head
self.h_per_group = self.num_head // self.num_kv_head # h/g
# Q 仍映射到 d_model(随后 reshape 为 h 个头)
self.wq = nn.Linear(d_model, d_model)
# K/V 只映射到 g * d_k(随后 reshape 为 g 个“KV 头”)
self.wkv = nn.Linear(d_model, 2 * self.num_kv_head * self.d_k)
self.wo = nn.Linear(d_model, d_model)
def forward(self, x, mask=None):
B, L, _ = x.shape
# 1) 投影
# Q: [B, L, d_model]
# KV: [B, L, 2 * g * d_k] -> split -> [B, L, g * d_k] 各自
q = self.wq(x)
kv = self.wkv(x)
k, v = torch.chunk(kv, 2, dim=-1)
# 2) 形状整理
# Q -> [B, h, L, d_k]
q = q.view(B, L, self.num_head, self.d_k).transpose(1, 2)
# K,V -> [B, g, L, d_k]
k = k.view(B, L, self.num_kv_head, self.d_k).transpose(1, 2)
v = v.view(B, L, self.num_kv_head, self.d_k).transpose(1, 2)
# 把 Q 分组: [B, h, L, d_k] -> [B, g, h_per_group, L, d_k],该形状是符合连续分组的,标准 GQA
qg = q.view(B, self.num_kv_head, self.h_per_group, L, self.d_k)
# 为了与组内 K,V 做 batched matmul,给 K,V 加一个组内头维度=1,方便广播
# Kg, Vg: [B, g, 1, L, d_k]
Kg = k.unsqueeze(2)
Vg = v.unsqueeze(2)
# 3) 组内注意力分数: [B, g, h_per_group, L, L]
# 等价于:scores_g[b,g,hg] = qg[b,g,hg] @ Kg[b,g,0]^T / sqrt(d_k)
scores_g = torch.matmul(qg, Kg.transpose(-1, -2)) / math.sqrt(self.d_k)
if mask is not None:
# 要求 mask 能广播到 [B, 1 或 g, 1 或 h_per_group, L, L] 或最终 [B, h, L, L]
# 最常见做法:提供 [B, 1, 1, L, L](或 [B, 1, 1, 1, L] 的causal/pad组合)
scores_g = scores_g.masked_fill(mask, -1e9)
attn_g = torch.softmax(scores_g, dim=-1) # [B, g, h_per_group, L, L]
# 4) 组内加权求和:context_g: [B, g, h_per_group, L, d_k]
context_g = torch.matmul(attn_g, Vg)
# 5) 还原回所有头:先合并 g 与 h_per_group -> h
context = context_g.reshape(B, self.num_head, L, self.d_k).transpose(1, 2).reshape(B, L, self.d_model)
# context = context_g.reshape(B, self.num_head, L, self.d_k) # [B, h, L, d_k]
# context = context.transpose(1, 2).contiguous() # [B, L, h, d_k]
# context = context.view(B, L, self.d_model) # [B, L, d_model]
output = self.wo(context)
# 同样给出注意力权重(按头展平回 [B, h, L, L],便于对齐可视化)
attn = attn_g.reshape(B, self.num_head, L, L)
return output, attn
# --- 简单测试(与原 MHA 测试风格一致) ---
if __name__ == "__main__":
batch_size = 2
d_model = 12
num_head = 6 # h
num_kv_head = 3 # g(每组 2 个 Q 头)
max_seq_len = 5
x = torch.randn(batch_size, max_seq_len, d_model)
gqa = GroupedQueryAttention(d_model, num_head, num_kv_head)
out, attn = gqa(x)
print("代码运行成功!(GQA)")
print("输出张量形状:", out.shape) # 期望: [B, L, d_model]
print("注意力形状 :", attn.shape) # 期望: [B, h, L, L]
代码运行成功!(GQA) 输出张量形状: torch.Size([2, 5, 12]) 注意力形状 : torch.Size([2, 6, 5, 5])
练习
import torch
import torch.nn as nn
import math
class GQA(nn.Module):
def __init__(self, dim, q_head, kv_head):
super().__init__()
self.d = dim
self.q_head = q_head
self.kv_head = kv_head
self.d_k = dim // q_head
self.h_per_kv = q_head // kv_head
self.wq = nn.Linear(dim, dim)
self.wkv = nn.Linear(dim, 2*kv_head*self.d_k)
self.wo = nn.Linear(dim, dim)
def forward(self, x, mask = None):
B, L, D = x.shape
q = self.wq(x)
kv = self.wkv(x)
k, v = torch.chunk(kv, 2, -1)
q = q.view(B, L, self.q_head, self.d_k).transpose(1, 2)
k = k.view(B, L, self.kv_head, self.d_k).transpose(1, 2)
v = v.view(B, L, self.kv_head, self.d_k).transpose(1, 2)
qg = q.view(B, self.kv_head, self.h_per_kv, L, self.d_k)
kg = k.unsqueeze(2)
vg = v.unsqueeze(2)
attention_scores = torch.matmul(qg, kg.transpose(-1, -2)) / math.sqrt(self.d_k)
if mask is not None:
attention_scores = attention_scores.masked_fill(mask, -1e9)
attention_weight = nn.functional.softmax(attention_scores, dim = -1)
context = torch.matmul(attention_weight, vg).reshape(B, self.q_head, L, self.d_k).transpose(1, 2).reshape(B, L, self.d)
output = self.wo(context)
return output, attention_weight
B = 10
L = 25
D = 64
q_head = 8
kv_head = 4
x = torch.randn(B, L, D)
GQA_model = GQA(D, q_head, kv_head)
output, attn = GQA_model(x)
print(output.shape)
print(attn.shape)
torch.Size([10, 25, 64]) torch.Size([10, 4, 2, 25, 25])
2.AUC¶
AUC¶
基于排序(Rank / Mann–Whitney U)的 AUC 计算方法。
输入:
- labels: List[int] 或 1D numpy array 样本真实标签,取值为 {0, 1}
- scores: List[float] 或 1D numpy array 模型预测分数,分数越大表示越可能为正样本
输出:
- auc: float AUC 值,取值范围 [0, 1]
核心思想:对标签进行排序
- 按预测分数从小到大排序
- 扫描排序后的样本序列
- 每遇到一个正样本,统计其前面已有多少负样本 这些负样本都被该正样本“正确地排在后面”
import numpy as np
def auc_rank(labels, scores):
# 转为 numpy array,便于排序和向量化操作
labels = np.asarray(labels)
scores = np.asarray(scores)
print(f"labels:{labels}")
print(f"scores:{scores}")
# 获取按照 score 从小到大排序后的索引
order = np.argsort(scores)
print(f"order:{order}")
# 按排序后的顺序重排标签
labels_sorted = labels[order]
print(f"labels_sorted:{labels_sorted}")
# 正样本数量 |P|
n_pos = np.sum(labels_sorted == 1)
print(f"n_pos:{n_pos}")
# 负样本数量 |N|
n_neg = np.sum(labels_sorted == 0)
print(f"n_neg:{n_neg}")
# 已扫描到的负样本数量(前缀负样本计数)
neg_count = 0
# 排序正确的正负样本对数量
correct = 0.0
# 从低分到高分扫描
for l in labels_sorted:
if l == 1:
# 当前是正样本:
# 它前面的所有负样本都满足 score_neg < score_pos
correct += neg_count
else:
# 当前是负样本,增加负样本计数
neg_count += 1
# AUC = 排序正确的正负样本对 / 总正负样本对
return correct / (n_pos * n_neg)
label = [0, 1, 0, 0, 1, 0, 0, 1]
q = [0.1, 0.9, 0.2, 0.8, 1, 0.2, 0.3, 0.8]
auc = auc_rank(label, q)
print(f"auc:{auc}")
labels:[0 1 0 0 1 0 0 1] scores:[0.1 0.9 0.2 0.8 1. 0.2 0.3 0.8] order:[0 2 5 6 7 3 1 4] labels_sorted:[0 0 0 0 1 0 1 1] n_pos:3 n_neg:5 auc:0.9333333333333333
但上述该方法np.argsort(scores)会存在并列分数的bug问题,正负分数相同的时候,贡献应该为0.5,而不是1或者0
import numpy as np
from sklearn.metrics import roc_auc_score
def auc_rank_fixed(q_list, label):
q_list = np.array(q_list, dtype=float)
label = np.array(label)
# ── Step 1:按分数从小到大排序 ──────────────────────────────────────
# argsort 返回的是"排序后的原始下标",例如:
# q = [0.1, 0.9, 0.2, 0.8, 1.0, 0.2, 0.3, 0.8]
# rank_index = [0, 2, 5, 6, 3, 7, 1, 4] (分数从小到大对应的原始位置)
rank_index = np.argsort(q_list)
q_sorted = q_list[rank_index] # 分数从小到大
label_sorted = label[rank_index] # label 跟着分数一起排好序
total_pos = np.sum(label == 1) # 正样本总数
total_neg = np.sum(label == 0) # 负样本总数
# ── Step 2:理解 AUC 的含义 ─────────────────────────────────────────
# AUC = "随机取一个正样本和一个负样本,正样本分数 > 负样本分数" 的概率
# 具体:
# concordant(正确序对):正样本分数 > 负样本分数,贡献 1.0
# tie(平局): 正样本分数 = 负样本分数,贡献 0.5
# discordant(错误序对):正样本分数 < 负样本分数,贡献 0.0
# AUC = 所有正负样本对的贡献之和 / 正负样本对总数
cum_neg = 0 # 记录"当前组之前"已经遍历过的负样本数量
concordant = 0.0 # 累计贡献
i, n = 0, len(label_sorted)
# ── Step 3:分组遍历(每组内分数完全相同)───────────────────────────
while i < n:
# 3a. 找到与位置 i 分数相同的区间 [i, j)
# 例如 score=0.2 出现在位置 2 和 3,则 i=2, j=4
j = i
while j < n and q_sorted[j] == q_sorted[i]:
j += 1
# 现在 label_sorted[i:j] 就是当前分数组内的所有样本
group_pos = np.sum(label_sorted[i:j] == 1) # 组内正样本数
group_neg = np.sum(label_sorted[i:j] == 0) # 组内负样本数
# 3b. 计算当前组内正样本与"所有负样本"形成的序对贡献
#
# 情况①:组前负样本(cum_neg 个)
# 这些负样本分数 < 当前组分数
# → 每个正样本都比它们分数高 → 每对贡献 1.0
# → 新增 concordant = group_pos × cum_neg × 1.0
#
# 情况②:组内负样本(group_neg 个)
# 这些负样本与当前组正样本分数相同 → tie
# → 每对贡献 0.5
# → 新增 concordant = group_pos × group_neg × 0.5
concordant += group_pos * cum_neg + group_pos * group_neg * 0.5
# 3c. 当前组遍历完毕,把组内负样本数累加到 cum_neg
# 供后续更高分数的正样本使用
cum_neg += group_neg
i = j # 移动到下一组
# ── Step 4:归一化 ──────────────────────────────────────────────────
# 正负样本对总数 = total_pos × total_neg
return concordant / (total_pos * total_neg)
# ── 测试 ────────────────────────────────────────────────────────────────
label = [0, 1, 0, 0, 1, 0, 0, 1]
q = [0.1, 0.9, 0.2, 0.8, 1, 0.2, 0.3, 0.8]
print(f"AUC: {auc_rank_fixed(q, label)}")
AUC: 0.9666666666666667
import numpy as np
def auc_rank(q_list, label):
q_list = np.array(q_list)
label = np.array(label)
rank_index = np.argsort(q_list)
q_list_ranked = q_list[rank_index]
label_ranked = label[rank_index]
total_pos = np.sum(label_ranked == 1)
total_neg = np.sum(label_ranked == 0)
l, n= 0, len(label_ranked)
cum_neg, cum_pos = 0, 0
while l < n:
r = l
while r < n and q_list_ranked[l] == q_list_ranked[r]:
r += 1
group_neg = np.sum(label_ranked[l:r] == 0)
group_pos = np.sum(label_ranked[l:r] == 1)
cum_pos += group_pos * cum_neg + group_pos * group_neg * 0.5
cum_neg += group_neg
l = r
return cum_pos / (total_pos * total_neg)
q = [0.1, 0.9, 0.2, 0.8, 1, 0.2, 0.3, 0.8]
label = [0, 1, 0, 0, 1, 0, 0, 1]
auc = auc_rank(q, label)
print(f"auc:{auc}")
auc:0.9666666666666667
GAUC¶
基于排序的 GAUC 计算方法(Group AUC)。
GAUC 通过在每个用户内部分别计算 AUC,然后进行加权平均, 从而消除用户间基线差异的影响,更准确地评估模型的用户内排序能力。
输入:
- user_ids: List[str] 或 1D numpy array 每个样本对应的用户标识
- labels: List[int] 或 1D numpy array
样本真实标签,取值为 {0, 1} - scores: List[float] 或 1D numpy array 模型预测分数,分数越大表示越可能为正样本
- weight_type: str, 默认 'impression'
权重类型,可选值:
- 'impression': 按用户曝光次数加权(工业界常用)
- 'uniform': 等权重,每个用户权重为 1
输出:
- gauc: float 加权后的 GAUC 值,取值范围 [0, 1]
- user_auc_dict: dict 每个用户的 AUC 值字典,用于分析不同用户的排序质量
核心思想:
- 将样本按 user_id 分组
- 在每个用户内部,使用 Rank-based 方法计算 AUC
- 根据指定的权重类型对所有用户的 AUC 进行加权平均
import numpy as np
from collections import defaultdict
def _auc_single_user(user_scores, user_labels):
"""单用户 AUC,处理 tie(同分算 0.5)"""
order = np.argsort(user_scores)
sorted_scores = user_scores[order]
sorted_labels = user_labels[order]
n_pos = np.sum(sorted_labels == 1)
n_neg = np.sum(sorted_labels == 0)
cum_neg = 0
correct_pairs = 0.0
l, n = 0, len(sorted_labels)
while l < n:
r = l
# 找到同分的一组
while r < n and sorted_scores[r] == sorted_scores[l]:
r += 1
# [l, r) 为同分组
group_pos = np.sum(sorted_labels[l:r] == 1)
group_neg = np.sum(sorted_labels[l:r] == 0)
# 同分组内正负对算 0.5,组前负样本算完全正确
correct_pairs += group_pos * cum_neg + group_pos * group_neg * 0.5
# 注意先计算正确的配对,再更新累计负样本,因为group_neg本组内和正样本预测分数相等的
cum_neg += group_neg
l = r
return correct_pairs / (n_pos * n_neg)
def gauc_rank(user_ids, labels, scores, weight_type='impression'):
# 去掉细节,GAUC 的本质只有两步:
# GAUC = Σ(用户i的AUC × 用户i的权重) / Σ(用户i的权重)
user_ids = np.array(user_ids)
labels = np.array(labels)
scores = np.array(scores)
user_sample_dict = defaultdict(list)
for idx, uid in enumerate(user_ids):
user_sample_dict[uid].append(idx)
user_auc_dict = {}
total_weighted_auc = 0.0
total_weight = 0.0
for uid, indices in user_sample_dict.items():
user_labels = labels[indices]
user_scores = scores[indices]
n_pos = np.sum(user_labels == 1)
n_neg = np.sum(user_labels == 0)
# 无正样本或者负样本的用户,无法计算auc,直接过滤
if n_pos == 0 or n_neg == 0:
continue
user_auc = _auc_single_user(user_scores, user_labels)
user_auc_dict[uid] = user_auc
if weight_type == 'impression':
weight = len(indices)
elif weight_type == 'uniform':
weight = 1.0
else:
raise ValueError(f"Unknown weight_type: {weight_type}")
total_weighted_auc += user_auc * weight
total_weight += weight
if total_weight == 0:
raise ValueError("No valid user found for AUC computation.")
gauc = total_weighted_auc / total_weight
return gauc, user_auc_dict
label = [0, 1, 0, 1, 1, 0, 0, 0, 1, 1, 0]
q = [0.1, 0.9, 0.2, 0.8, 1, 0.2, 0.3, 0.9, 0.7, 0.9, 0.7]
user_id = [1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3 ]
gauc_rank(user_id, label, q, weight_type='impression')
(np.float64(0.9318181818181818),
{np.int64(1): np.float64(1.0),
np.int64(2): np.float64(1.0),
np.int64(3): np.float64(0.75)})
PN¶
import numpy as np
def calculate_regression_pn(preds, labels):
"""
计算回归任务的 PN (Pairwise Accuracy)
复杂度: O(N log N)
"""
preds = np.array(preds)
labels = np.array(labels)
# 1. 根据预测分从小到大排序
idx = np.argsort(preds)
preds_sorted = preds[idx]
labels_sorted = labels[idx]
n = len(labels)
correct_pairs = 0.0
total_pairs = 0.0
# 2. 统计逻辑:遍历每个样本,计算它与之前所有样本形成的 pair
# 为保证 O(N log N),这里需要利用类似归并排序或累加的思想
# 但针对回归,最通用的工业界实现通常使用 rank data
# 下面采用一种直观且高效的处理相同预测分(Ties)的逻辑
i = 0
accumulated_labels_sum = 0
accumulated_count = 0
# 为了计算 total_pairs (y_i > y_j),我们需要知道标签的分布
# 简单的做法是直接计算所有 (i, j) 满足 y_i != y_j 的对数
# 在这里,我们通过双指针处理预测分相同的“块”
l = 0
while l < n:
r = l
while r < n and preds_sorted[r] == preds_sorted[l]:
r += 1
# 当前块内的样本 [l:r] 预测分相同
group_labels = labels_sorted[l:r]
group_size = r - l
for curr_label in group_labels:
# 1. 统计预测分严格更小的样本 (0 到 l-1)
# 只要之前样本的 label < curr_label,就是正确的 pair
# 这里为了简化,我们直接用循环演示逻辑,高性能版可使用树状数组
for prev_idx in range(0, l):
if curr_label > labels_sorted[prev_idx]:
correct_pairs += 1.0
total_pairs += 1.0
elif curr_label < labels_sorted[prev_idx]:
total_pairs += 1.0
# 2. 统计预测分相同的样本 (l 到 r-1)
# 按照 AUC 定义,预测分相同贡献 0.5
for peer_idx in range(l, r):
if curr_label > labels_sorted[peer_idx]:
correct_pairs += 0.5
total_pairs += 1.0
elif curr_label < labels_sorted[peer_idx]:
total_pairs += 0.5
total_pairs += 0.5 # 实际上是总对数加 1,贡献 0.5
l = r
return correct_pairs / total_pairs if total_pairs > 0 else 0
# 示例数据
preds = [0.1, 0.9, 0.2, 0.8, 1.0]
labels = [1.2, 5.5, 2.0, 2.0, 7.1] # 连续值标签
pn_score = calculate_regression_pn(preds, labels)
print(f"Regression PN Score: {pn_score:.4f}")
Regression PN Score: 1.0000
3.损失函数¶
交叉熵:二元 vs. 多元的本质辨析¶
结论:数学本质相同,但工程实现上需要区分,且有重要的数值稳定性差异。
数学本质:统一公式
交叉熵的通用定义为:
$$H(p, q) = -\sum_{c=1}^{C} p(c) \log q(c)$$
- 多元(C 类):对所有类别求和
- 二元(C=2):代入 $p_1 = y,\ p_0 = 1-y$,退化为:
$$H = -[y \log q + (1-y)\log(1-q)]$$
所以二元交叉熵是多元交叉熵在 C=2 时的特例,数学上完全一致。
BCE二类交叉熵实现¶
1. 基础 BCE 公式
BCE 的标准形式是基于概率定义的。假设 $y$ 为真实标签 (0 或 1),$p$ 为模型预测为 1 的概率(即 Sigmoid 的输出),则损失 $L$ 为:
$$L = -[y \cdot \log(p) + (1-y) \cdot \log(1-p)]$$
这个公式非常直观,但存在一个致命缺陷:当 $p$ 趋近于 0 或 1 时,$\\log(p)$ 或 $\\log(1-p)$ 会导致 log(0),产生负无穷大的结果,这在计算上是灾难性的。
2. 数值稳定的 BCE 公式
为了解决这个问题,我们不应该使用模型输出的概率 $p$,而应直接使用未经 Sigmoid 激活的原始输出——Logits,我们记为 $x$。
通过将 $p = \\text{sigmoid}(x)$ 代入原始公式并进行数学推导与化简,我们可以得到一个等价且高度数值稳定的形式:
$$L = \max(x, 0) - x \cdot y + \log(1 + e^{-|x|})$$
这个公式的核心优势在于,指数项中的 $-|x|$ 永远是非正数,这使得 $e^{-|x|}$ 的结果被限制在 $(0, 1]$ 区间内,从根本上避免了浮点数上溢的风险。它不依赖任何 epsilon 裁剪之类的技巧,是数值计算上的最优解。
import torch
import torch.nn as nn
def BCEloss_stable(y_predict, label):
"稳定 BCE 公式必须使用 logits, 而不是使用概率p"
loss = torch.clamp(y_predict, min=0) - y_predict * label + torch.log(1 + torch.exp(-torch.abs(y_predict)))
return loss.mean()
y_predict = torch.tensor([5., -4., 5., -6.])
label = torch.tensor([1., 0., 1., 0.])
bce_loss = BCEloss_stable(y_predict, label)
print(f"bce损失为{bce_loss}")
bce损失为0.008514078333973885
import torch
def BCEloss(y_predict, label):
p = torch.sigmoid(y_predict)
per_sample_loss = -(label * torch.log(p) + (1 - label) * torch.log(1 - p))
return per_sample_loss.mean()
y_predict = torch.tensor([5., -4., 5., -6.])
label = torch.tensor([1., 0., 1., 0.])
bce_loss = BCEloss(y_predict,label)
print(f"bce损失为{bce_loss}")
bce损失为0.008514076471328735
import torch
def BCEloss(y_predict, label):
p = torch.sigmoid(y_predict)
# 或者 p = 1 / (1+torch.exp(-y_predict))
per_sample_loss = -(label * torch.log(p) + (1 - label) * torch.log(1 - p))
return per_sample_loss.mean()
y_predict = torch.tensor([5., -4., 5., -6.])
label = torch.tensor([1., 0., 1., 0.])
bce_loss = BCEloss(y_predict,label)
print(f"bce损失为{bce_loss}")
bce损失为0.008514076471328735
多类交叉熵(Multi-Class Cross Entropy)¶
在二分类中,我们建模的是一个 Bernoulli 分布;而在多分类问题中,本质上是在建模一个 Categorical 分布。假设类别数为 $C$,模型输出 logits 向量:
$$ \mathbf{x} = (x_1, x_2, ..., x_C) $$
真实标签为 one-hot 向量 $\mathbf{y}$,其中:
$$ y_k = 1 \text{ 表示真实类别为 } k $$
- 基础形式:Softmax + Cross Entropy
首先通过 Softmax 将 logits 转换为概率分布:
$$ p_i = \frac{e^{x_i}}{\sum_{j=1}^{C} e^{x_j}} $$
交叉熵定义为:
$$ L = -\sum_{i=1}^{C} y_i \log p_i $$
由于 $y$ 是 one-hot,只有正确类别 $k$ 处为 1,因此损失可简化为:
$$ L = -\log p_k $$
代入 Softmax:
$$ L = -\log \left(\frac{e^{x_k}}{\sum_{j=1}^{C} e^{x_j}}\right) $$
进一步化简:
$$ L = -x_k + \log\left(\sum_{j=1}^{C} e^{x_j}\right) $$
- 数学本质:极大似然估计
多类交叉熵本质是对 Categorical 分布进行极大似然估计:
$$ \mathcal{L} = \prod_{i=1}^{N} p_{y_i} $$
取负对数得到:
$$ L = -\sum_{i=1}^{N} \log p_{y_i} $$
因此:
多类交叉熵 = Softmax(负对数似然(NLL))
实现方式一:显式 Softmax 后 log
逻辑直观
但多做了一步除法(Softmax)
精度略差于化简形式
import torch
def corss_entropy(logits, target):
B, D = logits.shape
# 0. 值稳定:减去每行最大值
max_logits = torch.max(logits, dim=1, keepdim=True)[0]
logits_stable = logits - max_logits
# 1. 取指数
exp_logits = torch.exp(logits_stable) # B, D
# 2. 每一行求和
exp_sum_logits = torch.sum(exp_logits, dim=-1, keepdim=True)
# 3. 计算 softmax 概率
softmax_logits = exp_logits / exp_sum_logits
# 4. 取正样本概率
pos_logits = softmax_logits[torch.arange(B),target]
# 5. 取负对数
loss = -torch.log(pos_logits)
# 6. 平均
return loss.mean()
logits = torch.tensor([[1.0000, 0.4985, 0.6664, 0.2533],
[0.4985, 1.0000, 0.8408, 0.5431],
[0.6664, 0.8408, 1.0000, 0.8372],
[0.2533, 0.5431, 0.8372, 1.0000]])
target = torch.tensor([0, 1, 2, 3])
print(corss_entropy(logits, target))
tensor(1.1176)
实现方式二:化简形式 $$ L = -x_k + \log\left(\sum_{j=1}^{C} e^{x_j}\right) $$
import torch
def cross_entropy(logits, target):
"""
logits: (N, C)
target: (N,)
"""
B, D = logits.shape
# 1. 数值稳定:减去每行最大值,防止exp爆炸
max_logits = torch.max(logits, dim=1, keepdim=True)[0] # (B,D) → (B,1)
logits_stable = logits - max_logits
# 2. 计算 log(sum(exp()))
log_sum_exp = torch.log(torch.sum(torch.exp(logits_stable), dim=1))
# 3. 取正确类别的 logit
correct_logits = logits_stable[torch.arange(B), target] # 第i行取第target[i]列, NumPy/PyTorch tensor 才支持这种矩阵索引。
# 4. 负对数似然
loss = -correct_logits + log_sum_exp
return loss.mean()
logits = torch.tensor([[1.0000, 0.4985, 0.6664, 0.2533],
[0.4985, 1.0000, 0.8408, 0.5431],
[0.6664, 0.8408, 1.0000, 0.8372],
[0.2533, 0.5431, 0.8372, 1.0000]])
target = torch.tensor([0, 1, 2, 3])
print(cross_entropy(logits, target))
tensor(1.1176)
import torch
def cross_entropy(logits, target):
log_sum_exp = torch.logsumexp(logits, dim=-1)
pos_logits = logits[torch.arange(logits.size(0)), target]
loss = -pos_logits + log_sum_exp
return loss.mean()
logits = torch.tensor([[1.0000, 0.4985, 0.6664, 0.2533],
[0.4985, 1.0000, 0.8408, 0.5431],
[0.6664, 0.8408, 1.0000, 0.8372],
[0.2533, 0.5431, 0.8372, 1.0000]])
target = torch.tensor([0, 1, 2, 3])
print(cross_entropy(logits, target))
tensor(1.1176)
import torch
def cross_entropy_with_soft_target(y_true, y_pred, eps=1e-15):
"""
y_true: shape (B, C), 概率分布 (如 Label Smoothing 后的标签或 Teacher 模型输出)
y_pred: shape (B, C), 必须是 Softmax 后的概率分布
"""
# 1. 数值稳定性:防止 log(0) 导致 NaN
# torch.clamp 将输入压缩到 [min, max] 区间
y_pred = torch.clamp(y_pred, min=eps, max=1.0 - eps)
# 2. 计算元素级交叉熵: - y_true * log(y_pred)
# y_true 与 torch.log(y_pred) 进行 Element-wise product
per_element_loss = -y_true * torch.log(y_pred)
# 3. 按类别维度求和,得到每个样本的 loss (B,)
per_sample_loss = torch.sum(per_element_loss, dim=1)
# 4. 对 Batch 求平均
return torch.mean(per_sample_loss)
# --- 验证示例 ---
B, C = 2, 3
# 模拟软标签 (例如 Label Smoothing: [0.9, 0.05, 0.05])
y_true = torch.tensor([[0.9, 0.05, 0.05],
[0.1, 0.8, 0.1]])
# 模拟模型输出概率
y_pred = torch.tensor([[0.8, 0.1, 0.1],
[0.2, 0.7, 0.1]])
loss = cross_entropy_with_soft_target(y_true, y_pred)
print(f"Soft Target CE Loss: {loss.item():.6f}")
Soft Target CE Loss: 0.553815
softmax¶
import torch
def softmax(logits, dim=-1):
# 1. 计算最大值以保证数值稳定性
max_logits = torch.max(logits, dim=dim, keepdim=True)[0]
# 2. 将 Logit Space 映射到非负的指数空间
exp_logits = torch.exp(logits - max_logits)
# 3. 计算归一化分母
exp_sum = torch.sum(exp_logits, dim=dim, keepdim=True)
# 4. 得到最终概率分布
return exp_logits / exp_sum
# 验证
sample_input = torch.tensor([[1.0, 2.0, 3.0, 1000.0],
[0.5, 0.5, 0.5, 0.5]])
print(softmax(sample_input))
tensor([[0.0000, 0.0000, 0.0000, 1.0000],
[0.2500, 0.2500, 0.2500, 0.2500]])
MSE¶
import numpy as np
def MSE_Loss(lables, y_predict):
per_sample_loss = (lables-y_predict)**2
return np.mean(per_sample_loss)
x = np.array([1,2,3,4])
y = np.array([1,2,4,4])
mes_loss = MSE_Loss(y,x)
print(f"bce损失为{mes_loss}")
bce损失为0.25
focal_Loss¶
当面临类别极度不平衡的数据时,标准的交叉熵损失会因大量易分样本的主导而失效。Focal Loss 通过引入动态调制因子,强制模型聚焦于训练过程中的“硬核”样本,是解决此类问题的关键技术。
核心公式回顾
Focal Loss 在标准交叉熵的基础上,增加了权重因子 $\\alpha$ 和调制因子 $(1-p\_t)^\gamma$。其统一形式简洁而强大:
$$L_{\text{focal}} = -\alpha_t (1-p_t)^\gamma \log(p_t)$$
其中:
- $p\_t$ 定义为:当真实标签 $y=1$ 时,$p\_t=p$;当 $y=0$ 时,$p\_t=1-p$。这里的 $p$ 是模型预测为正类的概率。
- $\\alpha\_t$ 是类别平衡参数,$\\gamma$ 是聚焦参数,用于抑制易分样本的损失贡献。
类别平衡参数 α 用于调节正负类别的整体权重比例:当正样本稀少时应增大 α(如 0.5→0.75 甚至更高),提高正样本在总损失中的占比;若负样本更重要或误报代价更高,则减小 α。本质是对“类别频率不平衡”进行线性重加权。聚焦参数 γ 用于控制对易分样本的抑制强度:γ=0 时退化为普通交叉熵;γ 越大(常用 1~3,默认 2),对高置信度样本的损失衰减越强,模型越专注于难样本;但过大可能导致训练不稳定或收敛变慢。实践中通常先根据类别比例确定 α,再在 γ∈[1,3] 内调节,以验证集表现为准微调。
该实现接收模型输出的概率 (经过 Sigmoid 激活后) 作为输入。
import numpy as np
def focal_loss_prob(y_pred_prob, y_true, alpha=0.25, gamma=2.0, reduction="mean", eps=1e-9):
p = np.clip(y_pred_prob, eps, 1 - eps)
pt = y_true * p + (1 - y_true) * (1 - p)
alpha_t = y_true * alpha + (1 - y_true) * (1 - alpha)
loss = -alpha_t * (1 - pt) ** gamma * np.log(pt)
if reduction == "mean":
return np.mean(loss)
elif reduction == "sum":
return np.sum(loss)
else:
return loss
x = np.array([0.9,0.1,0.9,0.7])
y = np.array([1,0,1,0])
focal_loss损失为 = focal_loss_prob(x,y)
print(f"focal_loss损失为{focal_loss损失为}")
focal_loss损失为0.11094425300887605
infoNCE¶
一、问题建模:对比预测任务
设:
- 给定一个 anchor 表示 $q $
- 一个正样本 $k^+$(与 q 匹配)
- K 个负样本 ${k_1^-, \dots, k_K^-}$
构造候选集合:
$$ \mathcal{K} = \{k^+, k_1^-, \dots, k_K^-\} $$
目标:
在这 (K+1) 个候选中,正确识别出与 q 匹配的那个样本。
这本质是一个 (K+1) 分类问题。
二、用 softmax 建模条件概率
定义打分函数(相似度):
$$ s(q, k) = \frac{f(q)^\top f(k)}{\tau} $$
其中:
- $ f(\cdot) $ 为编码器
- $ \tau $ 为 temperature
定义条件概率:
$$ p(k_i \mid q) = \frac{\exp(s(q,k_i))} {\sum_{k_j \in \mathcal{K}} \exp(s(q,k_j))} $$
这就是 softmax 形式。
三、最大似然估计
希望模型最大化“正样本被选中的概率”:
$$ \max \log p(k^+ \mid q) $$
展开:
$$ \log \frac{\exp(s(q,k^+))} {\sum_{k_j \in \mathcal{K}} \exp(s(q,k_j))} $$
取负号得到损失:
$$ \mathcal{L}_{InfoNCE} = - \log \frac{\exp(s(q,k^+))} {\exp(s(q,k^+)) + \sum_{j=1}^{K} \exp(s(q,k_j^-))} $$
这就是 InfoNCE 的基本形式。
四、与交叉熵的等价性
设:
- logits 为 $ s(q, k_i) $
- 正样本 index = 0
则
$$ \mathcal{L} = \text{CrossEntropy}(\text{logits}, \text{label}=0) $$
因此:
InfoNCE 本质就是一个带温度的 softmax 交叉熵。
五、从密度比估计角度理解(更深层本质)
假设数据来自联合分布 ( p(q,k) ),负样本来自边缘分布 ( p(k) )。
理论上有:
$$ \frac{p(q,k)}{p(q)p(k)} \propto \exp(s(q,k)) $$
InfoNCE 实际是在学习这个密度比。
Oord 等人在 CPC 论文中证明:
$$ I(q;k) \ge \log(K+1) - \mathcal{L}_{InfoNCE} $$
即:
最小化 InfoNCE 等价于最大化互信息下界。
六、最终标准表达式
单样本形式:
$$ \boxed{ \mathcal{L} = - \log \frac{\exp(f(q)^\top f(k^+)/\tau)} {\sum_{k \in \mathcal{K}} \exp(f(q)^\top f(k)/\tau)} } $$
Batch 形式(以 batch 作为负样本):
$$ \mathcal{L} = - \frac{1}{N} \sum_{i=1}^{N} \log \frac{\exp(z_i^\top z_i^+/\tau)} {\sum_{j=1}^{N} \exp(z_i^\top z_j/\tau)} $$
七、本质总结
InfoNCE 的核心思想是:
- 将“匹配问题”转化为分类问题
- 用 softmax 建模概率
- 用交叉熵优化
- 等价于学习密度比
- 等价于最大化互信息下界
从工程角度:
它只是一个带温度的 softmax 交叉熵。 从理论角度:它是互信息估计器。
import torch
import torch.nn.functional as F
def infoNCE(user_emb, item_emb, tempreture = 0.5):
# 归一化
user_emb = F.normalize(user_emb, dim = -1)
item_emb = F.normalize(item_emb, dim = -1)
# 相似矩阵
sim = torch.matmul(user_emb, item_emb.transpose(0, 1)) / tempreture
# 标签
labels = torch.arange(user_emb.size(0), device=user_emb.device)
# 多类交叉熵计算
loss = F.cross_entropy(sim, label) # Compute the cross entropy loss between input logits and target.
return loss
user_emb = torch.tensor([[1.1, 24.0, 5.9], [3.3, 9.2, 32.1], [12.1, 11.1, 15.3], [0.8, 0.1, 0.4]])
item_emb = torch.tensor([[1.1, 24.0, 5.9], [3.3, 9.2, 32.1], [12.1, 11.1, 15.3], [0.8, 0.1, 0.4]])
print(infoNCE(user_emb, item_emb, 1))
tensor(1.1176)
import torch
import torch.nn.functional as F
def info_nce(user_emb, item_emb, temperature=1.0):
user_emb = F.normalize(user_emb, dim=-1)
item_emb = F.normalize(item_emb, dim=-1)
sim = torch.matmul(user_emb, item_emb.T) / temperature
labels = torch.arange(user_emb.size(0), device=user_emb.device)
loss_u2i = F.cross_entropy(sim, labels)
loss_i2u = F.cross_entropy(sim.T, labels)
loss = (loss_u2i + loss_i2u) / 2
return loss
user_emb = torch.tensor([[1.1, 24.0, 5.9], [3.3, 9.2, 32.1], [12.1, 11.1, 15.3], [0.8, 0.1, 0.4]])
item_emb = torch.tensor([[1.1, 24.0, 5.9], [3.3, 9.2, 32.1], [12.1, 11.1, 15.3], [0.8, 0.1, 0.4]])
print(infoNCE(user_emb, item_emb, 1))
tensor(1.1176)
手写MLP¶
import torch
import torch.nn as nn
class MLP(nn.Module):
"""
多层感知机(MLP)的完整实现
Args:
input_dim (int): 输入特征维度
hidden_dims (list): 隐藏层维度列表,每个元素代表对应隐藏层的神经元数量
output_dim (int): 输出维度
activation (str): 激活函数类型,支持 'relu', 'tanh', 'sigmoid'
dropout_rate (float): Dropout概率,用于正则化
batch_norm (bool): 是否使用批归一化
"""
def __init__(self, input_dim, hidden_dims, output_dim, activation='relu', dropout_rate=0.0, batch_norm=False):
super(MLP, self).__init__()
self.input_dim = input_dim
self.hidden_dims = hidden_dims
self.output_dim = output_dim
self.activation_name = activation
self.dropout_rate = dropout_rate
self.batch_norm = batch_norm
# 构建网络层
layers = []
layer_dims = [input_dim] + hidden_dims + [output_dim]
# 逐层构建网络
for i in range(len(layer_dims) - 1):
# 线性变换层
linear_layer = nn.Linear(layer_dims[i], layer_dims[i + 1])
layers.append(linear_layer)
# 如果不是最后一层,添加激活函数、批归一化和dropout
if i < len(layer_dims) - 2:
# 批归一化
if self.batch_norm:
layers.append(nn.LayerNorm(layer_dims[i + 1]))
# 激活函数
if activation == 'relu':
layers.append(nn.ReLU())
elif activation == 'tanh':
layers.append(nn.Tanh())
elif activation == 'sigmoid':
layers.append(nn.Sigmoid())
else:
raise ValueError(f"不支持的激活函数: {activation}")
# Dropout正则化
if dropout_rate > 0:
layers.append(nn.Dropout(dropout_rate))
# 将所有层组合成Sequential模块
self.network = nn.Sequential(*layers)
# 初始化网络参数
self._initialize_weights()
def _initialize_weights(self):
"""
网络参数初始化
使用Xavier/Glorot初始化方法,这对于深层网络的训练非常重要
"""
for module in self.modules():
if isinstance(module, nn.Linear):
# Xavier均匀分布初始化
nn.init.xavier_uniform_(module.weight)
# 偏置项初始化为0
if module.bias is not None:
nn.init.constant_(module.bias, 0)
def forward(self, x):
return self.network(x)
import torch
import torch.nn as nn
# 常见顺序是:Linear -> LayerNorm -> Activation -> Dropout
class MLP(nn.Module):
def __init__(self, input_dim, hidden_dims, output_dim, activation='relu', dropout=0.1, layernorm=True):
super().__init__()
layer_dims = [input_dim] + hidden_dims + [output_dim]
layers = []
for i in range(len(layer_dims) - 1):
layers.append(nn.Linear(layer_dims[i], layer_dims[i+1]))
if i < len(layer_dims) - 2: # 最后一层不要激活/Dropout/Norm
if layernorm:
layers.append(nn.LayerNorm(layer_dims[i+1])) # nn.LayerNorm 必须指定 特征维度大小(即输入最后一维的大小)
if activation == 'relu':
layers.append(nn.ReLU())
elif activation == 'sigmoid':
layers.append(nn.Sigmoid())
elif activation == 'tanh':
layers.append(nn.Tanh())
# Dropout接收的输入是经过ReLU激活后的向量,然后对这些激活值进行随机屏蔽,直接影响的是传递给下一层的信息
if dropout > 0:
layers.append(nn.Dropout(dropout))
# *被称为"解包操作符"(unpacking operator),它的作用是将一个可迭代对象(如列表、元组)中的元素逐个取出,作为独立的参数传递给函数。
self.network = nn.Sequential(*layers)
self.initialize_weight()
def initialize_weight(self):
for module in self.modules(): # 调用self.modules()会依次返回整个MLP模型、第一个Linear层、LayerNorm层、ReLU层、第二个Linear层等等
if isinstance(module, nn.Linear): # Linear层有权重矩阵和偏置向量需要初始化
nn.init.xavier_uniform_(module.weight) # PyTorch的设计体系中,函数名末尾的下划线表示这是一个"就地操作"(in-place operation),意思是它会直接修改传入的张量,而不是返回一个新的张量。
if module.bias is not None:
nn.init.constant_(module.bias, 0)
def forward(self, x):
return self.network(x)
mlp = MLP(32, [128, 64, 32], 1)
x = torch.randn(1, 2, 32)
print(x)
y = mlp(x)
print(y)
tensor([[[-1.2597, -0.3917, -2.1761, -1.6370, 0.9768, -0.1885, 0.2397,
0.1150, 0.9735, 1.2439, -0.0406, 0.5806, 1.0668, 0.0576,
-0.1426, 0.6159, -0.3554, 1.6073, 3.1268, -0.0107, 0.2118,
-0.8624, 1.1218, -0.4589, 1.9104, -0.9769, 0.3840, 0.5005,
0.6873, 0.2684, -0.0317, 0.2847],
[ 0.1464, 0.3139, -1.0614, -0.2200, 0.0767, -1.1065, 0.4512,
-1.3406, 1.3963, -0.7353, -1.5034, -0.8260, 0.6051, -0.4990,
-0.0943, -0.3740, 1.2251, -0.1666, -0.2752, -0.2456, 1.2412,
-0.5601, 0.0153, 0.4588, -1.2241, -2.3180, -1.7191, -0.8325,
0.4759, 0.6389, -0.8476, 0.1833]]])
tensor([[[-0.4256],
[-0.3233]]], grad_fn=<ViewBackward0>)
import torch
import torch.nn as nn
import torch.optim as optim
class MLP(nn.Module):
def __init__(self, input_dim, hidden_dims, output_dim, dropout=0.0):
"""
input_dim: 输入维度
hidden_dims: 隐层维度列表,如 [128, 64]
output_dim: 输出维度
"""
super().__init__()
layers = []
prev_dim = input_dim
for h_dim in hidden_dims:
layers.append(nn.Linear(prev_dim, h_dim))
layers.append(nn.ReLU())
if dropout > 0:
layers.append(nn.Dropout(dropout))
prev_dim = h_dim
layers.append(nn.Linear(prev_dim, output_dim))
self.net = nn.Sequential(*layers)
self.criterion = nn.BCEWithLogitsLoss() # 内置数值稳定的 sigmoid + BCE
def forward(self, x):
return self.net(x) # 输出 logits,不过 sigmoid
def train_step(self, x, y, optimizer):
self.train()
optimizer.zero_grad()
logits = self.forward(x).squeeze(-1)
loss = self.criterion(logits, y.float())
loss.backward()
optimizer.step()
return loss.item()
@torch.no_grad()
def predict(self, x):
self.eval()
logits = self.forward(x).squeeze(-1)
return torch.sigmoid(logits)
# ── 使用示例 ──────────────────────────────────────────
if __name__ == '__main__':
torch.manual_seed(42)
X = torch.randn(1000, 16)
y = (X[:, 0] + X[:, 1] > 0).float()
model = MLP(input_dim=16, hidden_dims=[64, 32], output_dim=1, dropout=0.1)
optimizer = optim.Adam(model.parameters(), lr=1e-3)
for epoch in range(20):
loss = model.train_step(X, y, optimizer)
if (epoch + 1) % 5 == 0:
probs = model.predict(X)
acc = ((probs > 0.5).float() == y).float().mean()
print(f"Epoch {epoch+1:3d} | loss: {loss:.4f} | acc: {acc:.4f}")
Epoch 5 | loss: 0.6776 | acc: 0.7000 Epoch 10 | loss: 0.6613 | acc: 0.7940 Epoch 15 | loss: 0.6406 | acc: 0.8190 Epoch 20 | loss: 0.6214 | acc: 0.8370
import torch
import math
# 优化器基类
class Optimizer:
def __init__(self, params, lr):
"""
params: 所有需要更新的参数列表,如 [w1, b1, w2, b2, ...]
"""
self.params = params
self.lr = lr
def step(self):
raise NotImplementedError
def zero_grad(self):
for p in self.params:
if p.grad is not None:
p.grad.zero_()
# SGD 优化器
class SGD(Optimizer):
def __init__(self, params, lr=1e-2, momentum=0.0):
"""
momentum: 动量系数,0 表示普通 SGD
动量公式:v = momentum * v - lr * grad
w = w + v
"""
super().__init__(params, lr)
self.momentum = momentum
# 为每个参数维护一个速度缓冲 v
self.velocity = [torch.zeros_like(p) for p in self.params]
def step(self):
with torch.no_grad():
for p, v in zip(self.params, self.velocity):
if p.grad is None:
continue
v.mul_(self.momentum).sub_(self.lr * p.grad) # v = momentum*v - lr*grad
p.add_(v) # w = w + v
# Adam 优化器
class Adam(Optimizer):
def __init__(self, params1, lr=1e-3, beta1=0.9, beta2=0.999, eps=1e-8):
"""
beta1: 一阶矩(梯度均值)的衰减系数
beta2: 二阶矩(梯度方差)的衰减系数
eps: 防止除零的小量
"""
super().__init__(params1, lr)
self.beta1 = beta1
self.beta2 = beta2
self.eps = eps
self.t = 0 # 时间步,用于偏差修正
self.m = [torch.zeros_like(p) for p in self.params] # 一阶矩
self.v = [torch.zeros_like(p) for p in self.params] # 二阶矩
def step(self):
self.t += 1
# 偏差修正系数:早期 t 小时,m/v 被低估,需放大
bias_correction1 = 1 - self.beta1 ** self.t
bias_correction2 = 1 - self.beta2 ** self.t
with torch.no_grad():
for p, m, v in zip(self.params, self.m, self.v):
if p.grad is None:
continue
# 更新一阶矩:梯度的指数移动平均
m.mul_(self.beta1).add_((1 - self.beta1) * p.grad)
# 更新二阶矩:梯度平方的指数移动平均
v.mul_(self.beta2).add_((1 - self.beta2) * p.grad ** 2)
m_hat = m / bias_correction1 # 修正后的一阶矩
v_hat = v / bias_correction2 # 修正后的二阶矩
p.sub_(self.lr * m_hat / (v_hat.sqrt() + self.eps))
# 线性层
class Linear:
def __init__(self, in_dim, out_dim):
std = math.sqrt(2.0 / in_dim)
self.w = torch.randn(in_dim, out_dim) * std
self.b = torch.zeros(out_dim)
self.w.requires_grad_(True)
self.b.requires_grad_(True)
def forward(self, x):
return x @ self.w + self.b
@property
def params(self):
return [self.w, self.b]
# ══════════════════════════════════════════════════
# MLP 模型
# ══════════════════════════════════════════════════
class ManualMLP:
def __init__(self, input_dim, hidden_dims, output_dim):
self.layers = []
dims = [input_dim] + hidden_dims + [output_dim]
for i in range(len(dims) - 1):
self.layers.append(Linear(dims[i], dims[i + 1]))
def parameters(self):
"""收集所有层的参数,供优化器使用(对标 nn.Module.parameters())"""
params = []
for layer in self.layers:
params.extend(layer.params)
return params
def relu(self, x):
return torch.clamp(x, min=0)
def sigmoid(self, x):
return 1 / (1 + torch.exp(-x))
def bce_loss(self, logits, y_true):
loss = (torch.clamp(logits, min=0)
- logits * y_true
+ torch.log(1 + torch.exp(-torch.abs(logits))))
return loss.mean()
def forward(self, x):
out = x
for i, layer in enumerate(self.layers):
out = layer.forward(out)
if i != len(self.layers) - 1:
out = self.relu(out)
return out
def train_step(self, x, y_true, optimizer):
logits = self.forward(x).squeeze(-1)
loss = self.bce_loss(logits, y_true)
loss.backward()
optimizer.step()
optimizer.zero_grad()
return loss.item()
@torch.no_grad()
def predict(self, x):
logits = self.forward(x).squeeze(-1)
return self.sigmoid(logits)
# ══════════════════════════════════════════════════
# 使用示例
# ══════════════════════════════════════════════════
if __name__ == '__main__':
torch.manual_seed(42)
X = torch.randn(1000, 16)
y = (X[:, 0] + X[:, 1] > 0).float()
model = ManualMLP(input_dim=16, hidden_dims=[64, 32], output_dim=1)
# ✅ 切换优化器只需改这一行,模型代码完全不变
# optimizer = SGD(model.parameters(), lr=1e-2, momentum=0.9)
optimizer = Adam(model.parameters(), lr=1e-3)
for epoch in range(20):
loss = model.train_step(X, y, optimizer)
if (epoch + 1) % 5 == 0:
probs = model.predict(X)
acc = ((probs > 0.5).float() == y).float().mean()
print(f"Epoch {epoch+1:3d} | loss: {loss:.4f} | acc: {acc:.4f}")
Epoch 5 | loss: 0.6682 | acc: 0.6150 Epoch 10 | loss: 0.6039 | acc: 0.6860 Epoch 15 | loss: 0.5504 | acc: 0.7550 Epoch 20 | loss: 0.5024 | acc: 0.8020
混合精度训练AMP¶
import torch
from torch import nn, optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
# 1.基础配置
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
dtype = torch.bfloat16
# 2.数据准备
# transforms.ToTensor:()第一,数据类型转换(numpy或者image数据转为tensor);第二,数值归一化。将0-255范围的像素值缩放到0-1范围;第三,维度重排[通道数,高度,宽度]
# Compose将多个transform串联成一个处理链,数据依次通过每个环节。
transform = transforms.Compose([transforms.ToTensor()])
train_dataset = datasets.MNIST(root = r'D:\科研\搜广推\04_手撕算法题\大模型_推荐算法_手撕题\data', train=True, download=False, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=256, shuffle=True, pin_memory=True, num_workers=4)
# 3.模型
class simplemodel(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(
nn.Flatten(),
nn.Linear(28*28, 1024),
nn.ReLU(),
nn.Linear(1024, 512),
nn.ReLU(),
nn.Linear(512, 256),
nn.ReLU(),
nn.Linear(256, 10)
)
def forward(self, x):
return self.net(x)
# 损失函数、优化器等设置
"""
需要移动到 GPU 的是参与大规模计算的张量数据,包括模型参数和输入数据
损失函数和优化器是操作这些张量的“工具”,它们会自动在张量所在的设备上执行操作,所以不需要手动移动
"""
model = simplemodel().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)
# 4.训练设置
"""
训练模式
Dropout 层会按照设定的概率 p 随机地将一部分神经元的输出置为零。这是一种有效的正则化手段,可以防止模型过拟合
BN 层会计算当前批次数据的均值和方差,并用它们来归一化数据。同时,它还会维护一个全局的均值和方差,这个全局值是根据训练过程中所有批次的统计数据通过滑动平均更新的
推理模式
Dropout 层会“关闭”,不再随机丢弃任何神经元,而是让所有神经元的输出都通过。这样可以保证在预测时模型的输出是确定和稳定的
BN 层会“冻结”,不再计算当前批次的均值和方差,而是直接使用在整个训练集上学习到的全局均值和方差来进行归一化。这保证了在推理时,即使输入只有一个样本,也能得到一致和稳定的结果
"""
model.train()
def main():
for batch_idx, (inputs, targets) in enumerate(train_loader):
inputs, targets = inputs.to(device, non_blocking=True), targets.to(device, non_blocking=True)
with torch.autocast("cuda", dtype=dtype):
outputs = model(inputs)
loss = criterion(outputs, targets)
loss.backward() # PyTorch 会计算出模型中每个参数的梯度,默认情况下,这些新计算出的梯度会加到该参数已有的梯度上(如果已有梯度存在的话),而不是覆盖它们
optimizer.step() # 更新梯度
optimizer.zero_grad()
if batch_idx % 10 == 0:
print(f"epoch[{1}], step[{batch_idx}], loss:{loss.item()}")
if __name__ == '__main__':
main()
epoch[1], step[0], loss:2.3021240234375 epoch[1], step[10], loss:2.2501220703125 epoch[1], step[20], loss:2.13714599609375 epoch[1], step[30], loss:1.9133148193359375 epoch[1], step[40], loss:1.6000289916992188 epoch[1], step[50], loss:1.2425899505615234 epoch[1], step[60], loss:0.8824491500854492 epoch[1], step[70], loss:0.787874698638916 epoch[1], step[80], loss:0.6451437473297119 epoch[1], step[90], loss:0.5639933347702026 epoch[1], step[100], loss:0.5306609869003296 epoch[1], step[110], loss:0.4603962302207947 epoch[1], step[120], loss:0.39849042892456055 epoch[1], step[130], loss:0.4373186528682709 epoch[1], step[140], loss:0.44588130712509155 epoch[1], step[150], loss:0.44594523310661316 epoch[1], step[160], loss:0.37336426973342896 epoch[1], step[170], loss:0.3856193721294403 epoch[1], step[180], loss:0.4086637496948242 epoch[1], step[190], loss:0.3687782883644104 epoch[1], step[200], loss:0.33676016330718994 epoch[1], step[210], loss:0.36682552099227905 epoch[1], step[220], loss:0.3146677613258362 epoch[1], step[230], loss:0.3786121606826782
MMOE¶
import torch
import torch.nn as nn
import torch.nn.functional as F
class MMOE(nn.Module):
def __init__(self, input_dim, num_experts, num_tasks, expert_hidden_dims, task_hidden_dims):
super().__init__()
self.input_dim = input_dim
self.num_experts = num_experts
self.num_tasks = num_tasks
# 构建 Expert 网络
self.experts = nn.ModuleList([
self._build_mlp(self.input_dim, expert_hidden_dims)
for _ in range(self.num_experts)
])
# 构建 Gate 网络:每个 Task 一个独立 Gate,输出维度为 num_experts
self.gates = nn.ModuleList([
self._build_mlp(self.input_dim, [num_experts])
for _ in range(self.num_tasks)
])
# Task Tower 的输入维度 = Expert 最后一层的输出维度
expert_output_dim = expert_hidden_dims[-1] if expert_hidden_dims else self.input_dim
self.tasks = nn.ModuleList([
self._build_mlp(expert_output_dim, task_hidden_dims)
for _ in range(num_tasks)
])
def _build_mlp(self, input_dim, hidden_dims):
layers = []
for hidden_dim in hidden_dims:
layers.append(nn.Linear(input_dim, hidden_dim))
layers.append(nn.GELU())
input_dim = hidden_dim
return nn.Sequential(*layers)
def forward(self, x):
# Expert 输出:list of (B, expert_output_dim) → stack 后 (B, num_experts, expert_output_dim)
expert_outputs = [expert(x) for expert in self.experts] # list:num_experts个(B, expert_output_dim)
expert_outputs_stacked = torch.stack(expert_outputs, dim=1) # (B, num_experts, expert_output_dim)
# Gate 输出:list of (B, num_experts),softmax 归一化
gate_outputs = [F.softmax(gate(x), dim=-1) for gate in self.gates] # list:num_tasks个(B, num_experts)
task_outputs = []
for i in range(self.num_tasks):
# current_gate_weights: (B, num_experts)
# unsqueeze(-1) → (B, num_experts, 1),与 expert_outputs_stacked 广播相乘
current_gate_weights = gate_outputs[i]
weighted_experts = expert_outputs_stacked * current_gate_weights.unsqueeze(-1)
task_specific_input = torch.sum(weighted_experts, dim=1) # (B, expert_output_dim)
task_output = self.tasks[i](task_specific_input) # (B, task_output_dim)
task_outputs.append(task_output)
return task_outputs
# ── 测试 ──────────────────────────────────────────────
input_dim = 32
num_experts = 3
num_tasks = 2
expert_hidden_dims = [64, 128, 72]
task_hidden_dims = [144, 192, 96, 32, 1]
mmoe_network = MMOE(input_dim, num_experts, num_tasks, expert_hidden_dims, task_hidden_dims)
user_emb = torch.randn([10, 32])
task_outputs = mmoe_network(user_emb)
for i, output in enumerate(task_outputs):
print(f"Task {i} output shape: {output.shape}")
Task 0 output shape: torch.Size([10, 1]) Task 1 output shape: torch.Size([10, 1])
import torch
import torch.nn as nn
import torch.nn.functional as F
# 常见顺序是:Linear -> LayerNorm -> Activation -> Dropout
class MLP(nn.Module):
def __init__(self, input_dim, hidden_dims, output_dim, activation='relu', dropout=0.1, layernorm=True):
super().__init__()
layer_dims = [input_dim] + hidden_dims + [output_dim]
layers = []
for i in range(len(layer_dims) - 1):
layers.append(nn.Linear(layer_dims[i], layer_dims[i+1]))
if i < len(layer_dims) - 2: # 最后一层不要激活/Dropout/Norm
if layernorm:
layers.append(nn.LayerNorm(layer_dims[i+1])) # nn.LayerNorm 必须指定 特征维度大小(即输入最后一维的大小)
if activation == 'relu':
layers.append(nn.ReLU())
elif activation == 'sigmoid':
layers.append(nn.Sigmoid())
elif activation == 'tanh':
layers.append(nn.Tanh())
# Dropout接收的输入是经过ReLU激活后的向量,然后对这些激活值进行随机屏蔽,直接影响的是传递给下一层的信息
if dropout > 0:
layers.append(nn.Dropout(dropout))
# *被称为"解包操作符"(unpacking operator),它的作用是将一个可迭代对象(如列表、元组)中的元素逐个取出,作为独立的参数传递给函数。
self.network = nn.Sequential(*layers)
self.initialize_weight()
def initialize_weight(self):
for module in self.modules(): # 调用self.modules()会依次返回整个MLP模型、第一个Linear层、LayerNorm层、ReLU层、第二个Linear层等等
if isinstance(module, nn.Linear): # Linear层有权重矩阵和偏置向量需要初始化
nn.init.xavier_uniform_(module.weight) # PyTorch的设计体系中,函数名末尾的下划线表示这是一个"就地操作"(in-place operation),意思是它会直接修改传入的张量,而不是返回一个新的张量。
if module.bias is not None:
nn.init.constant_(module.bias, 0)
def forward(self, x):
return self.network(x)
class MMOE(nn.Module):
def __init__(self, input_dim, num_experts, num_tasks, expert_hidden_dims, task_hidden_dims):
super().__init__()
self.num_experts = num_experts
self.num_tasks = num_tasks
# ── Expert 网络 ──────────────────────────────────
# expert_hidden_dims = [64, 128, 72]
# → MLP(input_dim=32, hidden_dims=[64, 128], output_dim=72)
self.experts = nn.ModuleList([
MLP(input_dim, expert_hidden_dims[:-1], expert_hidden_dims[-1])
for _ in range(num_experts)
])
# ── Gate 网络 ────────────────────────────────────
# Gate 只需输出 num_experts 个 logits,无隐藏层
# → MLP(input_dim=32, hidden_dims=[], output_dim=num_experts)
self.gates = nn.ModuleList([
MLP(input_dim, [], num_experts)
for _ in range(num_tasks)
])
# ── Task Tower ───────────────────────────────────
# task_hidden_dims = [144, 192, 96, 32, 1]
# → MLP(input_dim=expert_output_dim, hidden_dims=[144,192,96,32], output_dim=1)
expert_output_dim = expert_hidden_dims[-1]
self.tasks = nn.ModuleList([
MLP(expert_output_dim, task_hidden_dims[:-1], task_hidden_dims[-1])
for _ in range(num_tasks)
])
def forward(self, x):
# Expert 输出:list of (B, expert_output_dim) → stack 后 (B, num_experts, expert_output_dim)
expert_outputs = [expert(x) for expert in self.experts] # list:num_experts个(B, expert_output_dim)
expert_outputs_stacked = torch.stack(expert_outputs, dim=1) # (B, num_experts, expert_output_dim)
# Gate 输出:list of (B, num_experts),softmax 归一化
gate_outputs = [F.softmax(gate(x), dim=-1) for gate in self.gates] # list:num_tasks个(B, num_experts)
task_outputs = []
for i in range(self.num_tasks):
# current_gate_weights: (B, num_experts)
# unsqueeze(-1) → (B, num_experts, 1),与 expert_outputs_stacked 广播相乘
current_gate_weights = gate_outputs[i]
weighted_experts = expert_outputs_stacked * current_gate_weights.unsqueeze(-1)
task_specific_input = torch.sum(weighted_experts, dim=1) # (B, expert_output_dim)
task_output = self.tasks[i](task_specific_input) # (B, task_output_dim)
task_outputs.append(task_output)
return task_outputs
# ── 测试 ──────────────────────────────────────────────
input_dim = 32
num_experts = 3
num_tasks = 2
expert_hidden_dims = [64, 128, 72]
task_hidden_dims = [144, 192, 96, 32, 1]
mmoe_network = MMOE(input_dim, num_experts, num_tasks, expert_hidden_dims, task_hidden_dims)
user_emb = torch.randn([10, 32])
task_outputs = mmoe_network(user_emb)
for i, output in enumerate(task_outputs):
print(f"Task {i} output shape: {output.shape}")
Task 0 output shape: torch.Size([10, 1]) Task 1 output shape: torch.Size([10, 1])
RQVAE¶
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn.cluster import KMeans
import numpy as np
from tqdm import tqdm
# --- 1. 基础组件 (Building Blocks) ---
# 我们首先定义构成完整模型的所有独立、可复用的模块。
class RQEncoder(nn.Module):
"""
编码器模块:
负责将高维输入向量压缩为低维潜在表示。
"""
def __init__(self, input_dim: int, hidden_dims: list, latent_dim: int):
super().__init__()
layers = []
in_dim = input_dim
for hidden_dim in hidden_dims:
layers.extend([
nn.Linear(in_dim, hidden_dim),
nn.BatchNorm1d(hidden_dim),
nn.ReLU()
])
in_dim = hidden_dim
layers.append(nn.Linear(in_dim, latent_dim))
self.encoder = nn.Sequential(*layers)
def forward(self, x):
return self.encoder(x)
class RQDecoder(nn.Module):
"""
解码器模块:
负责将量化后的低维向量重建为原始维度。
"""
def __init__(self, latent_dim: int, hidden_dims: list, output_dim: int):
super().__init__()
layers = []
in_dim = latent_dim
for hidden_dim in reversed(hidden_dims):
layers.extend([
nn.Linear(in_dim, hidden_dim),
nn.BatchNorm1d(hidden_dim),
nn.ReLU()
])
in_dim = hidden_dim
layers.append(nn.Linear(in_dim, output_dim))
self.decoder = nn.Sequential(*layers)
def forward(self, x):
return self.decoder(x)
class VQEmbedding(nn.Module):
"""
单层向量量化模块 (Vector Quantization Embedding)
包含一个码本 (codebook),负责将输入向量映射到码本中最接近的向量
"""
def __init__(self, num_embeddings: int, embedding_dim: int, commitment_cost: float = 0.25):
super().__init__()
# defines the size of the codebook
self.num_embeddings = num_embeddings
# must be identical to the dimensionality of the latent
self.embedding_dim = embedding_dim
# to balance the encoder's learning and the codebook's learning
self.commitment_cost = commitment_cost
# 将码本注册为可学习的参数
"""The matrix is filled with random numbers sampled from a standard normal distribution
(mean 0, variance 1). This serves as the initial state of our codebook before
it is trained or properly initialized with K-means."""
self.embeddings = nn.Parameter(torch.randn(num_embeddings, embedding_dim))
# 状态开关,用于确保后续使用真实数据(通过K-Means)初始化码本的操作只执行一次,不会在训练中被重复调用
self.initialized_with_data = False
def initialize_from_data(self, data: torch.Tensor):
"""
K-Means聚类算法用一小批真实数据来为码本提供一个“有根据的、高质量的”初始状态,
避免随机初始化陷阱,导致训练初期模型很难收敛,甚至出现码本崩溃的现象
arges
data: [n_samples, embedding_dim],原始Embedding通过未经训练的Encoder后得到的潜在向量集合,
它代表了数据在潜在空间的初始分布
"""
if self.initialized_with_data:
return
# 将输入的PyTorch张量转换为NumPy数组
data_np = data.detach().cpu().numpy()
n_samples = data_np.shape[0]
if n_samples < self.num_embeddings:
# 样本小于码本大小时,有放回地抽样
# 从现有的 n_samples 个样本中,随机抽取 self.num_embeddings 个索引
indices = np.random.choice(n_samples, self.num_embeddings, replace=True)
centroids = data_np[indices]
else:
# 初始化一个K-Means聚类器对象,n_init:KMeans算法的独立运行次数,max_iter:单次运行的最大迭代次数
kmeans = KMeans(n_clusters=self.num_embeddings, n_init='auto', max_iter=100)
# 数据上执行K-Means算法
kmeans.fit(data_np)
# 从.cluster_centers_属性中提取出计算好的聚类中心
centroids = kmeans.cluster_centers_
"""
torch.from_numpy(centroids): 将NumPy格式的质心数组转换回PyTorch张量。
.copy_(...): 这是一个原地操作(in-place operation),它将质心张量的数据复制到self.embeddings这个模型参数中。
使用.data是直接操作底层数据,可以避免干扰PyTorch的自动求导图。
"""
self.embeddings.data.copy_(torch.from_numpy(centroids))
self.initialized_with_data = True
def forward(self, inputs: torch.Tensor):
"""
inputs: [batch_size, embedding_dim]
forward 方法接收一批由Encoder传来的连续向量,为每个向量执行以下四个核心任务:
寻找最近邻: 在码本中为每个输入向量找到最匹配的那个码本向量。
计算量化损失: 计算两部分损失——一部分用于更新码本自身,另一部分用于更新前面的Encoder。
梯度“戏法”: 使用一种名为“梯度直通估计器”的技巧,解决量化过程中梯度无法传播的问题。
返回结果: 输出量化后的向量、找到的索引(即语义码),以及计算出的损失
"""
# 高效地计算每个输入向量到码本中所有向量的平方欧氏距离
# inputs:(B, D); self.embeddings:(k, D)
# (B, 1) + (k,) - (B, K),都广播机制为(B, K)
distances = (
torch.sum(inputs**2, dim=1, keepdim=True) +
torch.sum(self.embeddings**2, dim=1) -
2 * torch.matmul(inputs, self.embeddings.t())
)
# 执行量化对 distances 矩阵[batch_size, num_embeddings]的每一行(dim=1),
# 找到值最小的那个元素的索引, 这个索引就是距离最近的码本向量的索引
indices = torch.argmin(distances, dim=1)
# 根据索引获取量化后的向量,[batch_size, embedding_dim]
quantized = F.embedding(indices, self.embeddings)
# 码本损失,梯度不会流向Encoder,只会流向 quantized,目标是更新码本
codebook_loss = F.mse_loss(quantized, inputs.detach())
# 承诺损失,梯度不会流向quantized,只会流向 Encoder,目标是更新Encoder
commitment_loss = F.mse_loss(inputs, quantized.detach()) * self.commitment_cost
# 该层的总量化损失
total_loss = codebook_loss + commitment_loss
# Straight-Through Estimator (梯度直通)
# 解决量化操作不可导的问题,torch.argmin是一个离散的查找操作,无法计算梯度。这意味着从Decoder传回的梯度会在这里中断,无法到达Encoder。
# d quantized / d inputs = 1, 从而能使梯度传到inputs,再到encoder,否则只能到quantized。
quantized = inputs + (quantized - inputs).detach()
return quantized, indices, total_loss
class ResidualVQ(nn.Module):
"""
残差向量量化 (Residual Vector Quantization)。
包含多个VQEmbedding层,对前一层的残差进行逐层量化。
Args:
num_layers: int: 指定流水线中包含多少个VQEmbedding层,也就是量化的层数。
num_embeddings_list: list: 一个整数列表,其长度必须等于num_layers。它为每一层VQEmbedding分别指定码本的大小。例如 [256, 128, 128]
embedding_dim: int: 潜在空间的维度,这个维度对于所有层都是相同的。
commitment_cost: float: 默认的承诺损失权重,会传递给它创建的所有VQEmbedding子模块。
"""
def __init__(self, num_layers: int, num_embeddings_list: list, embedding_dim: int, commitment_cost: float = 0.25):
super().__init__()
self.num_layers = num_layers
self.vq_layers = nn.ModuleList([
VQEmbedding(num_embeddings_list[i], embedding_dim, commitment_cost)
for i in range(num_layers)
])
def initialize_from_data(self, data: torch.Tensor):
"""逐层初始化所有码本。"""
# 创建一个输入数据的副本
residual = data.clone()
# 遍历每一个VQEmbedding层,准备逐个初始化。
for i, vq_layer in enumerate(self.vq_layers):
print(f"[INFO] Initializing codebook layer {i+1}/{self.num_layers}...")
vq_layer.initialize_from_data(residual)
with torch.no_grad():
quantized, _, _ = vq_layer(residual) #ec0
residual -= quantized #r0 -ec0
def forward(self, inputs: torch.Tensor, commitment_cost: float = None):
"""
inputs: torch.Tensor: 从编码器传来的一批潜在向量。
commitment_cost: float = None: 一个可选参数,允许在训练中动态地改变承诺损失的权重
"""
residual = inputs
quantized_total = torch.zeros_like(inputs)
indices_list = [] # 用于收集每一层产生的索引(语义码)
loss_total = 0.0
for vq_layer in self.vq_layers:
# 支持动态传入commitment_cost
if commitment_cost is not None:
vq_layer.commitment_cost = commitment_cost
quantized, indices, loss = vq_layer(residual)
residual -= quantized
quantized_total += quantized
indices_list.append(indices)
loss_total += loss
"""将收集到的多个一维索引张量堆叠成一个二维张量,形状为[batch_size, num_layers]。这便是最终的、多层次的语义ID。"""
return quantized_total, torch.stack(indices_list, dim=1), loss_total
# --- 2. 整合模型 (The Main Model) ---
# 使用上面定义的基础组件,拼装成完整的RQ-VAE模型。
class RQVAE(nn.Module):
"""
完整的残差量化变分自编码器 (RQ-VAE) 模型。
通过组合RQEncoder, ResidualVQ, 和RQDecoder模块构建。
"""
def __init__(self, input_dim: int, hidden_dims: list, latent_dim: int,
num_vq_layers: int, num_embeddings_list: list, commitment_cost: float = 0.25):
super().__init__()
self.encoder = RQEncoder(input_dim, hidden_dims, latent_dim)
self.vq = ResidualVQ(num_vq_layers, num_embeddings_list, latent_dim, commitment_cost)
self.decoder = RQDecoder(latent_dim, hidden_dims, output_dim=input_dim)
def encode(self, x: torch.Tensor) -> torch.Tensor:
"""编码输入到潜在空间。"""
return self.encoder(x)
def decode(self, z_q: torch.Tensor) -> torch.Tensor:
"""从量化后的潜在表示解码。"""
return self.decoder(z_q)
def forward(self, x: torch.Tensor, commitment_cost: float = None):
"""完整的前向传播过程。"""
z_e = self.encode(x)
z_q, indices, vq_loss = self.vq(z_e, commitment_cost)
x_recon = self.decode(z_q)
recon_loss = F.mse_loss(x_recon, x)
total_loss = recon_loss + vq_loss
return {
'x_recon': x_recon,
'indices': indices,
'total_loss': total_loss,
'recon_loss': recon_loss,
'vq_loss': vq_loss
}
@torch.no_grad()
def get_semantic_ids(self, x: torch.Tensor) -> torch.Tensor:
"""(推理时使用) 获取输入的语义ID。"""
self.eval()
z_e = self.encode(x)
_, indices, _ = self.vq(z_e)
return indices
def initialize_codebooks(self, dataloader, device, max_samples=100000):
"""使用数据集初始化所有码本,这是训练前的关键步骤。"""
print("\n[IMPORTANT] Collecting data for codebook initialization...")
init_data_list = []
total_samples = 0
# 切换到评估模式,关闭BN等层的训练行为
self.encoder.eval()
with torch.no_grad():
for batch in tqdm(dataloader, desc="Collecting data"):
# 兼容多种DataLoader输出格式
emb_batch = batch[1] if isinstance(batch, (list, tuple)) else batch
emb_batch = emb_batch.to(device)
z_e = self.encoder(emb_batch)
init_data_list.append(z_e.cpu())
total_samples += z_e.shape[0]
if total_samples >= max_samples:
break
init_data = torch.cat(init_data_list, dim=0)
self.vq.initialize_from_data(init_data)
print("[SUCCESS] All codebooks initialized with data.")
token mixing¶
import torch
import torch.nn as nn
class RankMixerTokenMixing(nn.Module):
"""
论文原版 Token Mixing:完全 parameter-free 的块矩阵转置。
核心操作(公式 3+4):
将 X ∈ R^(T×D) 视为 T×H 块矩阵(每块 d_h 维),
做块转置得到 H×T 块矩阵,reshape 成 H×(T·d_h)。
令 H=T,则输出 shape 仍为 (T, D),残差连接成立。
参数:零(parameter-free)
"""
def __init__(self, dim: int, num_tokens: int):
super().__init__()
# 论文明确规定 H = T
H = num_tokens
assert dim % H == 0, f"dim({dim}) 必须能被 num_tokens/H({H}) 整除"
self.T = num_tokens
self.H = H # H = T
self.d_h = dim // H # 每个 head 的维度
self.norm = nn.LayerNorm(dim)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
x: (B, T, D)
"""
B, T, D = x.shape
residual = x
# Step 1: Split — (B, T, D) → (B, T, H, d_h)
x = x.view(B, T, self.H, self.d_h)
# Step 2: 块矩阵转置 — swap T(dim1) 和 H(dim2)
# (B, T, H, d_h) → (B, H, T, d_h)
# 对应论文公式(4): s^h = Concat(x_1^h, x_2^h, ..., x_T^h)
x = x.transpose(1, 2).contiguous()
# Step 3: Merge — (B, H, T, d_h) → (B, H, T*d_h)
# 因为 H=T,所以 (B, H, T*d_h) = (B, T, D) ✓ shape 与输入相同
x = x.view(B, self.H, T * self.d_h)
# Step 4: 残差 + LayerNorm(公式 5)
return self.norm(residual + x)
# ── 验证 ──
B, T, D = 4, 16, 64 # d_h = 64/16 = 4
model = RankMixerTokenMixing(dim=D, num_tokens=T)
x = torch.randn(B, T, D)
out = model(x)
print(f"输入: {x.shape} → 输出: {out.shape}") # (4,16,64) → (4,16,64)
输入: torch.Size([4, 16, 64]) → 输出: torch.Size([4, 16, 64])
逻辑回归¶
import torch
class LogisticRegressionHandmade:
def __init__(self, input_dim, lr=0.01):
# 1. 参数初始化:建议使用正态分布,增加模型的打破对称性能力
self.w = torch.randn(input_dim, 1, requires_grad=True)
self.b = torch.zeros(1, requires_grad=True)
self.lr = lr
def sigmoid(self, z):
# 2. 手写 Sigmoid
return 1 / (1 + torch.exp(-z))
def forward(self, x):
# 3. 前向传播:z = Xw + b
z = torch.matmul(x, self.w) + self.b
return self.sigmoid(z)
def compute_loss(self, y_pred, y_true):
# 4. 手写数值稳定的 BCE Loss
# 增加 epsilon 防止 log(0)
eps = 1e-10
loss = -torch.mean(y_true * torch.log(y_pred + eps) + (1 - y_true) * torch.log(1 - y_pred + eps))
return loss
def train_step(self, x, y):
# 5. 自动求导与参数更新
y_pred = self.forward(x)
loss = self.compute_loss(y_pred, y)
# 反向传播
loss.backward()
# 6. 参数更新 (SGD)
with torch.no_grad():
self.w -= self.lr * self.w.grad
self.b -= self.lr * self.b.grad
# 必须手动清空梯度,否则会累加
self.w.grad.zero_()
self.b.grad.zero_()
return loss.item()
# --- 测试数据 ---
torch.manual_seed(42)
X = torch.randn(100, 5) # 100个样本,5维特征
# 模拟真实标签:如果第一个特征>0则为1,否则为0
y = (X[:, 0:1] > 0).float()
model = LogisticRegressionHandmade(input_dim=5, lr=0.1)
# 训练循环
for epoch in range(200):
current_loss = model.train_step(X, y)
if epoch % 20 == 0:
print(f"Epoch {epoch}, Loss: {current_loss:.4f}")
Epoch 0, Loss: 1.0614 Epoch 20, Loss: 0.6243 Epoch 40, Loss: 0.4340 Epoch 60, Loss: 0.3423 Epoch 80, Loss: 0.2923 Epoch 100, Loss: 0.2613 Epoch 120, Loss: 0.2401 Epoch 140, Loss: 0.2244 Epoch 160, Loss: 0.2123 Epoch 180, Loss: 0.2025