4. 数据加载与梯度管理
数据加载¶
PyTorch 的数据加载体系围绕 Dataset 和 DataLoader 两个核心抽象构建。
Dataset 负责"定义数据长什么样",DataLoader 负责"怎么把数据喂给模型"。
理解这一分工是掌握整个数据管线的关键。
import torch
from torch.utils.data import (
Dataset, TensorDataset, Subset, DataLoader,
IterableDataset, SequentialSampler, RandomSampler,
SubsetRandomSampler, WeightedRandomSampler, BatchSampler
)
from torch.utils.data.distributed import DistributedSampler
import numpy as np
torch.utils.data.Dataset¶
Dataset 是所有自定义数据集的抽象基类。继承它时必须实现两个方法:
__len__:告诉外界数据集共有多少条样本__getitem__:根据整数索引取出一条样本
这个设计的核心思想是懒加载(lazy loading)——数据不会在初始化时全部读入内存,
而是在被 DataLoader 按需索取时才执行 __getitem__,这对大规模数据集至关重要。
# torch.utils.data.Dataset: 用于创建自定义数据集的抽象类,必须实现 __len__ 和 __getitem__
class UserItemDataset(Dataset):
"""
模拟推荐系统中的用户-物品交互数据集。
真实场景中 __getitem__ 内可以做特征工程、负采样、数据增强等操作。
"""
def __init__(self, user_ids, item_ids, labels):
# 初始化时只保存引用,不做大量计算
self.user_ids = user_ids
self.item_ids = item_ids
self.labels = labels
def __len__(self):
# DataLoader 通过此方法知道数据集边界,避免越界访问
return len(self.labels)
def __getitem__(self, idx):
# 每次被调用时才取出单条样本,支持懒加载
return {
'user_id': self.user_ids[idx],
'item_id': self.item_ids[idx],
'label': self.labels[idx]
}
# 构造示例数据
N = 1000
user_ids = torch.randint(0, 100, (N,))
item_ids = torch.randint(0, 500, (N,))
labels = torch.randint(0, 2, (N,)).float()
dataset = UserItemDataset(user_ids, item_ids, labels)
print(f"数据集大小: {len(dataset)}")
print(f"第0条样本: {dataset[0]}")
数据集大小: 1000
第0条样本: {'user_id': tensor(70), 'item_id': tensor(257), 'label': tensor(0.)}
torch.utils.data.TensorDataset¶
当数据已经是 Tensor 时,不需要手写 Dataset 子类,TensorDataset 可以直接将多个张量打包。
它的索引逻辑是:对所有传入的张量在第 0 维(样本维)上同步切片,因此所有张量的第 0 维长度必须相同。
这是最轻量的 Dataset 实现,适合中小型已预处理好的数据。
# torch.utils.data.TensorDataset: 当数据已经是 Tensor 时,直接将多个张量打包成 Dataset,
# 内部对所有张量在第 0 维做同步索引,无需手写 __getitem__
features = torch.randn(N, 16) # N 条样本,每条 16 维特征
targets = torch.randint(0, 5, (N,)) # N 个类别标签
tensor_ds = TensorDataset(features, targets)
print(f"数据集大小: {len(tensor_ds)}")
feat_sample, label_sample = tensor_ds[0] # 返回一个元组
print(f"特征形状: {feat_sample.shape}, 标签: {label_sample}")
# 支持切片索引,一次取多条
feats_batch, labels_batch = tensor_ds[0:4]
print(f"批量特征形状: {feats_batch.shape}")
数据集大小: 1000 特征形状: torch.Size([16]), 标签: 2 批量特征形状: torch.Size([4, 16])
torch.utils.data.Subset¶
Subset 用索引列表从一个大 Dataset 中"截取"子集,不复制数据,只是记住哪些下标属于子集。
这在划分训练集/验证集时极为常用,且与随机打乱的 SubsetRandomSampler 配合可以更灵活地控制划分方式。
# torch.utils.data.Subset: 从一个大数据集中按给定索引列表选取子集,
# 不复制底层数据,仅维护一个索引映射,内存开销极低
all_indices = torch.randperm(N).tolist() # 先随机打乱全量索引
split = int(N * 0.8)
train_indices = all_indices[:split] # 80% 作为训练集
val_indices = all_indices[split:] # 20% 作为验证集
train_subset = Subset(tensor_ds, train_indices)
val_subset = Subset(tensor_ds, val_indices)
print(f"训练集大小: {len(train_subset)}") # 800
print(f"验证集大小: {len(val_subset)}") # 200
# Subset 访问时会自动映射回原始 Dataset 的真实下标
feat, label = train_subset[0]
print(f"训练集第0条,特征形状: {feat.shape}")
训练集大小: 800 验证集大小: 200 训练集第0条,特征形状: torch.Size([16])
torch.utils.data.DataLoader¶
DataLoader 是数据管线的调度中枢,它将 Dataset 的单条索引能力升级为批量、并行、可重复的数据流。
其核心工作原理是:Sampler 产生索引序列 → DataLoader 按 batch_size 分组 → 多个 worker 并行调用 __getitem__ → collate_fn 将多条样本拼接为一个 batch → 送入训练循环。
理解这条流水线后,num_workers、pin_memory、collate_fn 等参数的含义就会非常清晰。
# torch.utils.data.DataLoader: 封装 Dataset,提供批量处理、打乱数据、多线程加载等功能,
# 是训练循环中数据侧的核心组件
train_loader = DataLoader(
dataset=train_subset,
batch_size=32, # 每个 mini-batch 包含 32 条样本
shuffle=True, # 每个 epoch 开始时随机打乱顺序(内部用 RandomSampler 实现)
num_workers=2, # 用 2 个子进程并行预取数据,减少 GPU 等待 I/O 的时间
pin_memory=True, # 将数据固定在锁页内存中,加速 CPU→GPU 的数据传输
drop_last=True, # 丢弃最后一个不足 batch_size 的批次,避免 BatchNorm 等对小 batch 敏感
)
val_loader = DataLoader(
dataset=val_subset,
batch_size=64, # 验证时 batch 可以更大,节省时间
shuffle=False, # 验证集不需要打乱,保证结果可复现
num_workers=2,
pin_memory=True,
)
print(f"训练集共 {len(train_loader)} 个 batch")
print(f"验证集共 {len(val_loader)} 个 batch")
# 查看一个 batch 的形状
for feats, labels in train_loader:
print(f"features shape: {feats.shape}") # torch.Size([32, 16])
print(f"labels shape: {labels.shape}") # torch.Size([32])
break # 只看第一个 batch
训练集共 25 个 batch 验证集共 4 个 batch features shape: torch.Size([32, 16]) labels shape: torch.Size([32])
# 自定义 collate_fn 示例:当样本是变长序列时,DataLoader 默认的拼接方式会报错,
# 需要自己写 collate_fn 来做 padding
def custom_collate_fn(batch):
"""
batch 是一个列表,每个元素是 __getitem__ 返回的单条样本。
这里演示将字典列表合并为一个批量字典。
"""
user_ids = torch.stack([item['user_id'] for item in batch])
item_ids = torch.stack([item['item_id'] for item in batch])
labels = torch.stack([item['label'] for item in batch])
return {'user_id': user_ids, 'item_id': item_ids, 'label': labels}
custom_loader = DataLoader(
dataset=dataset, # 使用我们最开始自定义的 UserItemDataset
batch_size=16,
shuffle=True,
collate_fn=custom_collate_fn, # 指定自定义的拼接函数
)
for batch in custom_loader:
print(f"user_id shape: {batch['user_id'].shape}")
print(f"item_id shape: {batch['item_id'].shape}")
print(f"label shape: {batch['label'].shape}")
break
user_id shape: torch.Size([16]) item_id shape: torch.Size([16]) label shape: torch.Size([16])
torch.utils.data.IterableDataset¶
IterableDataset 与 Dataset 的根本区别在于:普通 Dataset 支持随机访问(给定 idx 直接取样本),
而 IterableDataset 只支持顺序迭代,就像读取流数据一样。
它适合两类场景:
- 数据量超大,无法一次性建立全量索引(如 TB 级日志数据)
- 数据是实时流式产生的(如在线学习场景)
使用 IterableDataset 时,不能设置 shuffle=True,也不能用 index-based Sampler。
# torch.utils.data.IterableDataset: 用于创建基于迭代器的数据集,
# 适合大规模数据集或流式数据(无法随机访问、无需建立全量索引)
class StreamDataset(IterableDataset):
def __init__(self, num_samples, feature_dim):
self.num_samples = num_samples
self.feature_dim = feature_dim
def __iter__(self):
# 每次迭代按需生成数据,而非预先全部加载到内存
# 真实场景中这里可以是从 Kafka / HDFS / 数据库逐行读取
for _ in range(self.num_samples):
feature = torch.randn(self.feature_dim)
label = torch.randint(0, 2, (1,)).squeeze()
yield feature, label
stream_ds = StreamDataset(num_samples=200, feature_dim=8)
# 注意:IterableDataset 配合 DataLoader 时不能 shuffle,num_workers>1 时需处理数据切分
stream_loader = DataLoader(stream_ds, batch_size=16)
for feats, labels in stream_loader:
print(f"batch features shape: {feats.shape}")
break
batch features shape: torch.Size([16, 8])
Sampler 体系¶
Sampler 决定了 DataLoader 以什么顺序向 Dataset 请求样本索引。可以将 Sampler 理解为一个"索引生成器": 每次 epoch 开始时,DataLoader 先向 Sampler 要一批索引,再拿着这些索引去 Dataset 取数据。
不同 Sampler 的选择直接影响训练效果,例如对不均衡数据集使用 WeightedRandomSampler 可以显著缓解类别偏置。
# torch.utils.data.SequentialSampler: 按顺序依次抽取样本索引(0, 1, 2, ...),不进行打乱,
# 是验证/测试集的默认采样方式,保证评估结果的可复现性
seq_sampler = SequentialSampler(tensor_ds)
indices = list(seq_sampler)
print(f"前5个索引(顺序): {indices[:5]}") # [0, 1, 2, 3, 4]
# torch.utils.data.RandomSampler: 随机抽取样本,每个 epoch 的数据顺序都会被打乱,
# 是训练集的默认采样方式(DataLoader 设置 shuffle=True 时内部即使用此 Sampler)
rand_sampler = RandomSampler(tensor_ds)
rand_indices = list(rand_sampler)
print(f"前5个索引(随机): {rand_indices[:5]}") # 每次运行结果不同
前5个索引(顺序): [0, 1, 2, 3, 4] 前5个索引(随机): [71, 795, 476, 944, 378]
# torch.utils.data.SubsetRandomSampler: 在给定的索引子集内随机抽取,
# 常用于划分训练集/验证集:先确定哪些下标属于训练集,再在训练集内部随机打乱
# 与 Subset 相比,SubsetRandomSampler 不创建新 Dataset 对象,更轻量
train_idx = list(range(0, 800))
val_idx = list(range(800, 1000))
train_sampler = SubsetRandomSampler(train_idx) # 在前800条中随机采样
val_sampler = SubsetRandomSampler(val_idx) # 在后200条中随机采样
train_loader_v2 = DataLoader(tensor_ds, batch_size=32, sampler=train_sampler)
val_loader_v2 = DataLoader(tensor_ds, batch_size=64, sampler=val_sampler)
print(f"训练批次数: {len(train_loader_v2)}")
print(f"验证批次数: {len(val_loader_v2)}")
训练批次数: 25 验证批次数: 4
# torch.utils.data.WeightedRandomSampler: 根据每个样本的权重进行有放回随机抽取,
# 是处理类别不均衡问题的重要工具——给少数类更高权重,使模型见到更多少数类样本
# 假设 N=1000 条数据中,正样本(label=1)仅占 10%
imbalanced_labels = torch.cat([torch.zeros(900), torch.ones(100)]) # 900负:100正
# 为每个类别计算权重:类别权重 = 1 / 该类别的样本数
class_counts = torch.tensor([900.0, 100.0])
class_weights = 1.0 / class_counts # [1/900, 1/100]
# 将每个样本映射到对应的类别权重
sample_weights = class_weights[imbalanced_labels.long()]
print(f"正样本权重: {sample_weights[900]:.6f}, 负样本权重: {sample_weights[0]:.6f}")
# 正样本权重是负样本的 9 倍,保证两类被等概率采样
weighted_sampler = WeightedRandomSampler(
weights=sample_weights,
num_samples=len(sample_weights), # 每个 epoch 采样的总样本数
replacement=True # 有放回采样(少数类样本会被重复采到)
)
# 验证采样结果是否均衡
imb_ds = TensorDataset(torch.randn(1000, 8), imbalanced_labels)
balanced_loader = DataLoader(imb_ds, batch_size=100, sampler=weighted_sampler)
for _, lbls in balanced_loader:
pos_ratio = lbls.sum() / len(lbls)
print(f"batch 中正样本比例: {pos_ratio:.2f}") # 应接近 0.5,而非原始的 0.1
break
正样本权重: 0.010000, 负样本权重: 0.001111 batch 中正样本比例: 0.44
# torch.utils.data.BatchSampler: 将另一个 Sampler 产生的单条索引分组成批次索引,
# 通常不直接使用,DataLoader 内部会自动将 Sampler 包装为 BatchSampler
# 但显式使用可以实现动态 batch size 等高级需求
base_sampler = RandomSampler(tensor_ds)
batch_sampler = BatchSampler(
sampler=base_sampler,
batch_size=32,
drop_last=False
)
print(f"总批次数: {len(batch_sampler)}") # ceil(1000/32) = 32
# 查看第一个 batch 的索引(一批32个索引,随机排列)
first_batch_indices = next(iter(batch_sampler))
print(f"第一个 batch 的前5个索引: {first_batch_indices[:5]}")
总批次数: 32 第一个 batch 的前5个索引: [784, 280, 145, 425, 431]
# torch.utils.data.distributed.DistributedSampler: 分布式训练专用采样器,
# 将数据集均匀切分给每个进程(rank),确保各进程看到不同的数据分片,避免重复计算
# 注意:每个 epoch 开始前必须调用 sampler.set_epoch(epoch) 保证各 epoch 的随机性
# 以下代码展示单机模拟(真实使用需在 torch.distributed 初始化后运行)
# 假设共 4 个 GPU 进程,当前进程是第 0 个
dist_sampler = DistributedSampler(
dataset=tensor_ds,
num_replicas=4, # 进程总数(world_size)
rank=0, # 当前进程编号
shuffle=True,
drop_last=False
)
print(f"每个进程分配的样本数: {len(dist_sampler)}") # ceil(1000/4) = 250
# 训练循环中的标准用法
# for epoch in range(num_epochs):
# dist_sampler.set_epoch(epoch) # 关键!保证每 epoch 的打乱方式不同
# for batch in dist_loader:
# ...
每个进程分配的样本数: 250
梯度管理¶
PyTorch 的自动微分系统(Autograd)是深度学习框架的核心引擎。
理解它的关键在于理解计算图(Computation Graph)的概念:
每次对 requires_grad=True 的张量执行运算时,PyTorch 会悄悄在背后记录这次运算,
构建一张有向无环图(DAG)。当调用 .backward() 时,系统沿这张图反向遍历,
用链式法则自动计算每个叶节点的梯度。
掌握梯度管理,就是掌握如何控制这张图的构建与销毁。
import torch
import torch.nn as nn
import torch.autograd as autograd
# tensor.grad: 获取张量的梯度,存储在 .grad 属性中
# 只有满足以下条件的张量才会积累梯度:
# 1. requires_grad=True
# 2. 是叶节点(leaf node,即用户直接创建的张量,而非中间运算结果)
x = torch.tensor([2.0, 3.0], requires_grad=True) # 叶节点
y = x[0] ** 2 + x[1] ** 3 # y = x0^2 + x1^3
y.backward()
print(f"dy/dx0 = 2*x0 = {x.grad[0]}") # 2*2 = 4.0
print(f"dy/dx1 = 3*x1^2 = {x.grad[1]}") # 3*9 = 27.0
# 注意:非叶节点(中间结果)默认不保留 .grad,这是为了节省内存
print(f"y 的 grad_fn(说明它是中间节点): {y.grad_fn}")
print(f"y 是否是叶节点: {y.is_leaf}")
dy/dx0 = 2*x0 = 4.0 dy/dx1 = 3*x1^2 = 27.0 y 的 grad_fn(说明它是中间节点): <AddBackward0 object at 0x0000023CB7EB3E50> y 是否是叶节点: False
# tensor.retain_grad: 保留中间节点(非叶节点)的梯度
# 默认情况下,反向传播结束后只有叶节点保留梯度,中间节点的梯度被释放以节省内存
# 调用 retain_grad() 后,该中间节点也会在反向传播后保留其梯度值
x = torch.tensor([2.0], requires_grad=True)
y = x * 3 # 中间节点
z = y ** 2 # 中间节点(最终标量)
y.retain_grad() # 在 backward 前声明要保留 y 的梯度
z.backward()
print(f"dz/dy = 2*y = {y.grad}") # 2 * 6 = 12.0
print(f"dz/dx = dz/dy * dy/dx = {x.grad}") # 12 * 3 = 36.0
dz/dy = 2*y = tensor([12.]) dz/dx = dz/dy * dy/dx = tensor([36.])
# torch.no_grad: 上下文管理器,在其作用域内的所有运算都不会被记录进计算图,
# 即使输入张量有 requires_grad=True,输出张量也不会带有 grad_fn
# 主要用于:1. 验证/推理阶段(节省内存+加速) 2. 参数更新后的后处理操作
x = torch.tensor([2.0], requires_grad=True)
with torch.no_grad():
y = x * 3
print(f"no_grad 内:y.requires_grad = {y.requires_grad}") # False
print(f"no_grad 内:y.grad_fn = {y.grad_fn}") # None
# tensor.detach: 从计算图中分离出一个张量,返回一个共享数据但不参与梯度计算的新张量
# 与 no_grad 的区别:detach 是对单个张量操作,no_grad 是对整个代码块操作
x = torch.tensor([2.0], requires_grad=True)
y = x * 3
z = y.detach() # z 和 y 共享内存,但 z 没有梯度
print(f"detach 后:z.requires_grad = {z.requires_grad}") # False
print(f"detach 后:z.grad_fn = {z.grad_fn}") # None
print(f"z 的值: {z}") # 仍然是 6.0,共享底层数据
no_grad 内:y.requires_grad = False no_grad 内:y.grad_fn = None detach 后:z.requires_grad = False detach 后:z.grad_fn = None z 的值: tensor([6.])
# tensor.requires_grad_: 原地(in-place)修改张量的 requires_grad 属性
# 常见用途:对预训练模型的部分层进行"冻结"(freeze),只训练新增的头部层
# 模拟冻结预训练 Embedding 的场景
pretrained_emb = nn.Embedding(1000, 64)
pretrained_emb.weight.requires_grad_(False) # 冻结,不参与梯度更新
print(f"冻结后 requires_grad: {pretrained_emb.weight.requires_grad}")
# 验证:冻结的参数不会出现在 optimizer 的参数列表中(需手动过滤)
trainable_params = [p for p in pretrained_emb.parameters() if p.requires_grad]
print(f"可训练参数数量: {len(trainable_params)}") # 0,整层被冻结
# 解冻
pretrained_emb.weight.requires_grad_(True)
print(f"解冻后 requires_grad: {pretrained_emb.weight.requires_grad}")
冻结后 requires_grad: False 可训练参数数量: 0 解冻后 requires_grad: True
# tensor.backward: 在标量张量上执行反向传播,计算计算图中所有叶节点的梯度
# 若输出非标量,需传入与输出同形状的 gradient 张量(作为链式法则的初始梯度)
# 标量情况(最常见)
x = torch.tensor([1.0, 2.0, 3.0], requires_grad=True)
loss = (x ** 2).sum() # 标量损失
loss.backward() # 等价于 loss.backward(torch.tensor(1.0))
print(f"标量情况 x.grad = {x.grad}") # [2., 4., 6.]
# 非标量情况:需要提供 gradient 参数(雅可比向量积,JVP)
x = torch.tensor([1.0, 2.0, 3.0], requires_grad=True)
y = x ** 2 # 向量输出,形状与 x 相同
v = torch.tensor([1.0, 0.5, 0.1]) # 外部传入的梯度权重
y.backward(gradient=v) # 计算 (dy/dx) * v,即加权梯度
print(f"非标量情况 x.grad = {x.grad}") # [2*1, 4*0.5, 6*0.1] = [2., 2., 0.6]
标量情况 x.grad = tensor([2., 4., 6.]) 非标量情况 x.grad = tensor([2.0000, 2.0000, 0.6000])
# torch.autograd: PyTorch 自动微分的核心模块,提供自动求导功能
# 每次前向计算时,autograd 引擎悄悄构建一张动态计算图(DAG),
# backward 时沿图反向遍历,用链式法则逐层计算梯度
# 查看计算图结构(grad_fn 链)
x = torch.tensor([2.0], requires_grad=True)
y = x * 3
z = y ** 2
w = z.sum()
print(f"w.grad_fn: {w.grad_fn}") # SumBackward0
print(f"z.grad_fn: {z.grad_fn}") # PowBackward0
print(f"y.grad_fn: {y.grad_fn}") # MulBackward0
print(f"x.grad_fn: {x.grad_fn}") # None(叶节点没有 grad_fn)
# 可以看到每个中间节点都记录了"我是由什么运算得来的",这就是反向传播的路径
w.grad_fn: <SumBackward0 object at 0x0000023CF00ABE50> z.grad_fn: <PowBackward0 object at 0x0000023CF00ABC70> y.grad_fn: <MulBackward0 object at 0x0000023CF00AB880> x.grad_fn: None
# torch.autograd.grad: 直接计算并返回指定输出对指定输入的梯度,
# 与 .backward() 不同,它不会将梯度累加到 .grad 属性,而是作为返回值返回
# 适合高阶导数、梯度惩罚(如 WGAN-GP)等需要将梯度本身参与计算的场景
x = torch.tensor([1.0, 2.0, 3.0], requires_grad=True)
y = (x ** 3).sum() # y = x0^3 + x1^3 + x2^3
# 计算一阶梯度(create_graph=True 表示对梯度本身也构建计算图,用于后续高阶求导)
grad1 = autograd.grad(outputs=y, inputs=x, create_graph=True)[0]
print(f"一阶梯度 dy/dx = 3x^2 = {grad1}") # [3., 12., 27.]
# 利用 create_graph=True,继续对一阶梯度求导得到二阶梯度
grad2 = autograd.grad(outputs=grad1.sum(), inputs=x)[0]
print(f"二阶梯度 d^2y/dx^2 = 6x = {grad2}") # [6., 12., 18.]
一阶梯度 dy/dx = 3x^2 = tensor([ 3., 12., 27.], grad_fn=<MulBackward0>) 二阶梯度 d^2y/dx^2 = 6x = tensor([ 6., 12., 18.])
# torch.autograd.gradcheck: 用数值微分(有限差分法)验证自定义 autograd 运算的梯度正确性,
# 是开发自定义算子时必不可少的调试工具
def my_func(x):
# 一个简单的函数:y = x^3
return x ** 3
# gradcheck 要求输入是 double 精度,并且 requires_grad=True
x = torch.tensor([1.0, 2.0], dtype=torch.float64, requires_grad=True)
is_correct = autograd.gradcheck(my_func, x, eps=1e-6, atol=1e-4)
print(f"梯度验证通过: {is_correct}") # True
# 如果自定义的 backward 实现有 bug,gradcheck 会抛出异常告知具体偏差
梯度验证通过: True
# torch.autograd.Variable: 早期 PyTorch(< 0.4)中用于包装张量并记录梯度历史的类,
# 现已废弃——从 0.4 版本起,Tensor 本身就支持 requires_grad,Variable 仅作向后兼容保留
# 如果你在旧代码中看到它,可以安全地将 Variable(x) 替换为 x
from torch.autograd import Variable
x = torch.tensor([1.0, 2.0])
old_style = Variable(x, requires_grad=True) # 旧写法
new_style = torch.tensor([1.0, 2.0], requires_grad=True) # 等价的新写法
print(f"旧写法类型: {type(old_style)}") # <class 'torch.Tensor'>,本质还是 Tensor
print(f"两者等价: {torch.equal(old_style.data, new_style.data)}")
旧写法类型: <class 'torch.Tensor'> 两者等价: True
# torch.autograd.backward: torch.autograd 模块级别的 backward 函数,
# 与 tensor.backward() 等价,但可以同时对多个张量/多个梯度进行反向传播
# 适合多任务学习、多输出模型等场景
x = torch.tensor([2.0, 3.0], requires_grad=True)
loss1 = (x[0] ** 2) # 任务1损失
loss2 = (x[1] ** 2) # 任务2损失
# 同时对 loss1 和 loss2 进行反向传播,梯度会累加到 x.grad
autograd.backward(
tensors=[loss1, loss2],
grad_tensors=[torch.tensor(1.0), torch.tensor(1.0)] # 各损失对应的权重
)
print(f"x.grad = {x.grad}") # [2*2, 2*3] = [4., 6.]
x.grad = tensor([4., 6.])
# torch.autograd.Function: 通过自定义 forward 和 backward 来创建不可微或需要特殊梯度逻辑的运算,
# 典型用途:直通估计器(Straight-Through Estimator)、量化感知训练、稀疏操作等
class StraightThroughStep(autograd.Function):
"""
直通估计器:前向是阶跃函数(不可微),反向梯度直接传递(假装导数为1)。
常用于二值神经网络(BNN)的训练。
"""
@staticmethod
def forward(ctx, x):
# ctx 用于在 forward 和 backward 之间传递信息
ctx.save_for_backward(x) # 保存 x 供 backward 使用
return (x > 0).float() # 阶跃:>0 取1,否则取0
@staticmethod
def backward(ctx, grad_output):
x, = ctx.saved_tensors
# 直通:梯度直接通过,相当于 d(step)/dx ≈ 1(在 [-1, 1] 范围内)
grad_input = grad_output.clone()
grad_input[x.abs() > 1] = 0 # 超出范围则梯度归零(防止梯度爆炸)
return grad_input
ste = StraightThroughStep.apply
x = torch.tensor([-0.5, 0.3, 1.2, -0.8], requires_grad=True)
y = ste(x) # 前向:阶跃
print(f"前向输出: {y}")
y.sum().backward()
print(f"反向梯度: {x.grad}") # 直通:在 [-1,1] 内为1,超出为0
前向输出: tensor([0., 1., 1., 0.], grad_fn=<StraightThroughStepBackward>) 反向梯度: tensor([1., 1., 0., 1.])
# torch.autograd.functional: 提供函数式接口,用于计算更复杂的梯度操作,
# 如雅可比矩阵(jacobian)和黑塞矩阵(hessian),常用于元学习和二阶优化
from torch.autograd.functional import jacobian, hessian
def f(x):
# f(x) = [x0^2 + x1, x0 * x1^2]
return torch.stack([x[0]**2 + x[1], x[0] * x[1]**2])
x = torch.tensor([1.0, 2.0])
# 计算雅可比矩阵:J[i,j] = df_i / dx_j
J = jacobian(f, x)
print(f"雅可比矩阵:\n{J}")
# [[2*x0, 1], = [[2., 1.],
# [x1^2, 2*x0*x1]] [4., 4.]]
# 计算标量函数的黑塞矩阵:H[i,j] = d^2f / (dx_i dx_j)
def g(x):
return (x[0]**2 * x[1] + x[1]**3).sum()
H = hessian(g, x)
print(f"黑塞矩阵:\n{H}")
雅可比矩阵:
tensor([[2., 1.],
[4., 4.]])
黑塞矩阵:
tensor([[ 4., 2.],
[ 2., 12.]])
# optimizer.zero_grad / tensor.grad.zero_: 梯度清零
# 这是训练循环中极容易遗忘的步骤。PyTorch 默认会累加(accumulate)梯度而非覆盖,
# 这是有意为之的设计,方便梯度累积(gradient accumulation)技巧,
# 但在普通训练中每步更新前必须手动清零,否则梯度会越来越大。
import torch.optim as optim
model = nn.Linear(4, 2)
optimizer = optim.SGD(model.parameters(), lr=0.01)
x = torch.randn(8, 4)
labels = torch.randint(0, 2, (8,))
criterion = nn.CrossEntropyLoss()
# 标准训练步骤
for step in range(3):
optimizer.zero_grad() # 第1步:清零梯度(必须在 backward 之前)
out = model(x) # 第2步:前向传播
loss = criterion(out, labels) # 第3步:计算损失
loss.backward() # 第4步:反向传播
optimizer.step() # 第5步:参数更新
print(f"Step {step}, loss = {loss.item():.4f}")
# 也可以对单个参数张量直接清零(不使用 optimizer 的情况下)
x = torch.tensor([1.0, 2.0], requires_grad=True)
y = (x ** 2).sum()
y.backward()
print(f"清零前 x.grad = {x.grad}")
x.grad.zero_() # 原地清零
print(f"清零后 x.grad = {x.grad}")
Step 0, loss = 0.9879 Step 1, loss = 0.9794 Step 2, loss = 0.9711 清零前 x.grad = tensor([2., 4.]) 清零后 x.grad = tensor([0., 0.])
# torch.nn.utils.clip_grad_norm_: 对所有参数梯度的 L2 范数进行裁剪
# 当总梯度范数超过 max_norm 时,将所有梯度等比例缩小,使总范数等于 max_norm
# 这是防止梯度爆炸(gradient explosion)的标准手段,在 RNN/LSTM/Transformer 训练中几乎必用
model = nn.LSTM(input_size=8, hidden_size=16, batch_first=True)
optimizer = optim.Adam(model.parameters(), lr=1e-3)
x = torch.randn(4, 10, 8)
out, _ = model(x)
loss = out.sum()
loss.backward()
# 裁剪前:查看梯度范数
total_norm_before = sum(p.grad.norm() ** 2 for p in model.parameters() if p.grad is not None) ** 0.5
print(f"裁剪前梯度 L2 范数: {total_norm_before:.4f}")
# 裁剪:将所有参数梯度的整体 L2 范数限制在 1.0 以内
nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
total_norm_after = sum(p.grad.norm() ** 2 for p in model.parameters() if p.grad is not None) ** 0.5
print(f"裁剪后梯度 L2 范数: {total_norm_after:.4f}") # 不超过 1.0
optimizer.step()
optimizer.zero_grad()
# torch.nn.utils.clip_grad_value_: 将每个参数梯度的每个元素值裁剪到 [-clip_value, clip_value]
# 与 clip_grad_norm_ 的区别:norm 版本保留梯度方向(等比缩放),value 版本直接截断每个元素
model = nn.Linear(4, 2)
x = torch.randn(8, 4)
model(x).sum().backward()
nn.utils.clip_grad_value_(model.parameters(), clip_value=0.1)
for name, p in model.named_parameters():
if p.grad is not None:
print(f"{name}: grad max = {p.grad.abs().max():.4f}") # 不超过 0.1
裁剪前梯度 L2 范数: 88.3250 裁剪后梯度 L2 范数: 1.0000 weight: grad max = 0.1000 bias: grad max = 0.1000
# torch.set_grad_enabled(False/True): 全局开关,动态禁用或启用梯度计算
# 与 torch.no_grad() 上下文管理器的区别:
# - no_grad() 是上下文管理器,仅在 with 块内生效
# - set_grad_enabled() 是全局开关,直到下次调用前一直有效
# 常见用途:根据 is_training 标志动态切换模式(如在封装好的 evaluate 函数中)
x = torch.tensor([2.0], requires_grad=True)
# 禁用梯度
torch.set_grad_enabled(False)
y = x * 3
print(f"禁用后 y.requires_grad = {y.requires_grad}") # False
print(f"禁用后 y.grad_fn = {y.grad_fn}") # None
# 重新启用梯度
torch.set_grad_enabled(True)
z = x * 3
print(f"启用后 z.requires_grad = {z.requires_grad}") # True
print(f"启用后 z.grad_fn = {z.grad_fn}") # MulBackward0
# 最佳实践:结合 model.eval() 使用,确保验证阶段完全关闭梯度
def evaluate(model, loader):
model.eval() # 关闭 Dropout、BatchNorm 的训练行为
with torch.no_grad(): # 关闭梯度追踪,节省内存约50%
for batch in loader:
pass # 推理逻辑
model.train() # 评估结束后恢复训练模式
禁用后 y.requires_grad = False 禁用后 y.grad_fn = None 启用后 z.requires_grad = True 启用后 z.grad_fn = <MulBackward0 object at 0x0000023C87405D50>