1.迁移学习 做图像识别是不是每次都意味着要自定义模型,确定好 卷积层 、池化层 和 全连接层 ,然后从0开始训练?不同类型的图像数据集会不会有通用的特征?如果训练样本比较少怎么办?是不是可以把已经训练好的模型,拿过来稍微改改就可以使用的?
迁移学习(Transfer Learning) 就是这种可以将已有模型的知识迁移到新任务的学习方法,具有以下特点:
适合数据量比较少的任务,但是有预训练知识,无需从头训练深度神经网络
更快的训练速度和更少的资源消耗
避免过拟合,训练效果更好,预训练的模型已经在大数据集上学到了通用特征 (如边缘、形状、颜色)
迁移学习常见的使用方式:
冻结预训练模型的大部分参数,仅训练全连接层(FC 层),这样做的优点是训练快,适合小数据集,缺点是预训练模型的特征可能不完全适用于新任务。
解冻部分或全部预训练层,使其适应新任务,优点是特征提取灵活,能学到更多任务相关的特征,缺点是训练时间长,需要手动调节学习率。
下面通过一个小样本的花卉识别来介绍一下迁移学习的训练流程。
2.数据处理 1.数据结构 训练集和验证集都是图片,每一个类型的花放在一个文件夹中,文件夹的名称可以通过cat_to_name.json
文件匹配对应的花名称。
2.数据加载与预处理 由于本次任务训练集数据比较少,为了提高模型的泛化能力和训练效果,需要在做数据预处理的时候做下增强操作。 预处理:
transforms.Resize([96, 96]) :将图片缩放到 96x96 的大小,统一输入尺寸,保证模型输入一致
transforms.ToTensor() :将图像从 PIL 图像或 NumPy 数组转换为 PyTorch 的 Tensor 格式 增强:
transforms.RandomRotation(45) :对图像进行随机旋转,角度范围是 -45 到 45 度之间。目的是增加模型对旋转变化的鲁棒性
transforms.CenterCrop(64) :从图像中心裁剪出 64x64 的区域。减少输入图像的多余部分,增强模型对图像的中心部分的学习
transforms.RandomHorizontalFlip(p=0.5) :有 50% 的概率对图像进行水平翻转,能提高模型对图像左右方向不对称的适应能力
transforms.RandomVerticalFlip(p=0.5) :有 50% 的概率对图像进行垂直翻转,进一步增强模型对图像的多样化学习
transforms.ColorJitter(brightness=0.2, contrast=0.1, saturation=0.1, hue=0.1) :对图像的亮度、对比度、饱和度和色调进行随机调整,增强模型对不同光照条件和颜色变化的鲁棒性
transforms.RandomGrayscale(p=0.025) :有 2.5% 的概率将图像转为灰度图,这样模型能学到更多的图像特征,而不仅仅依赖颜色信息
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) :标准化图像数据,使用 ImageNet 数据集的均值和标准差。标准化可以加速模型训练并提高收敛速度
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 def data_transforms (): return { 'train' : transforms.Compose([ transforms.Resize([96 , 96 ]), transforms.RandomRotation(45 ), transforms.CenterCrop(64 ), transforms.RandomHorizontalFlip(p=0.5 ), transforms.RandomVerticalFlip(p=0.5 ), transforms.ColorJitter(brightness=0.2 , contrast=0.1 , saturation=0.1 , hue=0.1 ), transforms.RandomGrayscale(p=0.025 ), transforms.ToTensor(), transforms.Normalize([0.485 , 0.456 , 0.406 ], [0.229 , 0.224 , 0.225 ]) ]), 'valid' : transforms.Compose([ transforms.Resize([64 , 64 ]), transforms.ToTensor(), transforms.Normalize([0.485 , 0.456 , 0.406 ], [0.229 , 0.224 , 0.225 ]) ]), }
1 2 3 4 5 6 7 8 def load_data_loader (): batch_size = 128 image_datasets = {x: datasets.ImageFolder(os.path.join(Path("./data/" ), x), data_transforms()[x]) for x in ['train' , 'valid' ]} dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=batch_size, shuffle=True ) for x in ['train' , 'valid' ]} return dataloaders['train' ], dataloaders['valid' ]
1 2 train_dl, valid_dl = load_data_loader()
3.绘制部分预处理数据 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 def denormalize (image ): mean = np.array([0.485 , 0.456 , 0.406 ]) std = np.array([0.229 , 0.224 , 0.225 ]) image = image.numpy().transpose((1 , 2 , 0 )) image = std * image + mean image = np.clip(image, 0 , 1 ) return image def draw_dl_8img (data_loader ): images, labels = next (iter (data_loader)) fig, axes = plt.subplots(2 , 4 , figsize=(12 , 6 )) axes = axes.flatten() for i in range (8 ): img = denormalize(images[i]) axes[i].imshow(img) axes[i].axis("off" ) axes[i].set_title(f"Label: {labels[i].item()} " ) plt.tight_layout() plt.show() draw_dl_8img(train_dl)
3.模型配置 1.ResNet-18 ResNet-18 可以用在别的任务上是因为它在 ImageNet 这样的超大规模数据集上训练过,学习到了通用的特征。 在深度学习模型训练中前几层通常学习 通用的低级特征 (比如边缘、纹理、形状等),而后几层学习 高级的特定特征 (比如猫的耳朵、人脸的结构)。前几层的特征对不同任务是通用的,因此可以在其他数据集上直接使用,只需调整最后几层(或者不调整)就能适应新的任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 class BasicBlock (nn.Module): """ResNet 的基本残差块(适用于 ResNet-18 和 ResNet-34)""" expansion = 1 def __init__ (self, in_channels, out_channels, stride=1 , downsample=None ): super (BasicBlock, self ).__init__() self .conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3 , stride=stride, padding=1 , bias=False ) self .bn1 = nn.BatchNorm2d(out_channels) self .relu = nn.ReLU(inplace=True ) self .conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3 , stride=1 , padding=1 , bias=False ) self .bn2 = nn.BatchNorm2d(out_channels) self .downsample = downsample def forward (self, x ): identity = x if self .downsample is not None : identity = self .downsample(x) out = self .conv1(x) out = self .bn1(out) out = self .relu(out) out = self .conv2(out) out = self .bn2(out) out += identity out = self .relu(out) return out class ResNet (nn.Module): """ResNet-18 模型""" def __init__ (self, num_classes=1000 ): super (ResNet, self ).__init__() self .conv1 = nn.Conv2d(3 , 64 , kernel_size=7 , stride=2 , padding=3 , bias=False ) self .bn1 = nn.BatchNorm2d(64 ) self .relu = nn.ReLU(inplace=True ) self .maxpool = nn.MaxPool2d(kernel_size=3 , stride=2 , padding=1 ) self .layer1 = self ._make_layer(64 , 64 , 2 , stride=1 ) self .layer2 = self ._make_layer(64 , 128 , 2 , stride=2 ) self .layer3 = self ._make_layer(128 , 256 , 2 , stride=2 ) self .layer4 = self ._make_layer(256 , 512 , 2 , stride=2 ) self .avgpool = nn.AdaptiveAvgPool2d((1 , 1 )) self .fc = nn.Linear(512 , num_classes) def _make_layer (self, in_channels, out_channels, blocks, stride ): """构建一个 ResNet 层""" downsample = None if stride != 1 or in_channels != out_channels: downsample = nn.Sequential( nn.Conv2d(in_channels, out_channels, kernel_size=1 , stride=stride, bias=False ), nn.BatchNorm2d(out_channels), ) layers = [] layers.append(BasicBlock(in_channels, out_channels, stride, downsample)) for _ in range (1 , blocks): layers.append(BasicBlock(out_channels, out_channels)) return nn.Sequential(*layers) def forward (self, x ): x = self .conv1(x) x = self .bn1(x) x = self .relu(x) x = self .maxpool(x) x = self .layer1(x) x = self .layer2(x) x = self .layer3(x) x = self .layer4(x) x = self .avgpool(x) x = torch.flatten(x, 1 ) x = self .fc(x) return x
上面是ResNet-18 模型的结构,了解即可。因为ResNet-18 已经被 PyTorch 的 torchvision.models 模块包含并预定义了,我们无需手动定义整个模型结构,可直接使用它。
2.自定义ResNet-18 初始化一个预训练的 ResNet-18 模型,并修改它用于新的分类任务
1 2 3 4 5 6 7 8 def init_model (num_classes ): model_ft = models.resnet18(weights="DEFAULT" ) for param in model_ft.parameters(): param.requires_grad = False model_ft.fc = nn.Linear(model_ft.fc.in_features, num_classes) model_ft.to(get_device()) return model_ft
models.resnet18(weights="DEFAULT"
:加载 ResNet-18 预训练模型
param.requires_grad = False
: 冻结 ResNet-18 的所有层,防止其参数被训练
nn.Linear(model_ft.fc.in_features, num_classes)
:修改 ResNet 的最后一层,让它适应新的分类任务,因为当前训练集是102分类,num_classes
为102,表示最后全连接层输出102个类别
model_ft.to(get_device())
:让模型运行在 GPU 或 CPU上
经过这样自定义后, ResNet-18模型只会训练最后的全连接层。
3.优化器 优化器还是使用Adam
Adam 是一种常用的优化算法,全名为 Adaptive Moment Estimation ,它结合了 Momentum 和 RMSprop 的优点,能够自动调整学习率来提高训练效果,Adam 优化器会通过计算一阶矩(梯度的均值)和二阶矩(梯度的平方均值)来动态调整每个参数的学习率,从而提高训练的效率和稳定性。
适用范围:一种自适应优化算法,非常适用于大多数任务,尤其是当模型参数多,训练过程复杂的时候。
在定义Adam
优化器的时候需要注意,只有未冻结的参数才需要训练,冻结的参数不会被训练优化
1 2 3 4 5 6 def get_optimizer (_model_resnet ): params_to_update = [] for name, param in _model_resnet.named_parameters(): if param.requires_grad: params_to_update.append(param) return optim.Adam(params_to_update, lr=1e-2 )
4.学习率 在训练过程中,随着 epoch 增加,模型可能会 收敛变慢 ,也就是训练准确度会提升,如果还是保持原有的 步长(step_size) 往往会使训练效果下降,为了提高模型训练精度需要后期小步调整, Loss 下降变慢时,自动降低学习率。 例如,前 10 轮学习率 0.01,第 10 轮后变 0.001,第20轮后变0.0001,依次类推
1 scheduler = optim.lr_scheduler.StepLR(optimizer_ft, step_size=10 , gamma=0.1 )
step_size=10
:每 10 个epoch让学习率衰减一次
gamma=0.1
:每次衰减时,学习率乘以0.1(缩小10倍)
5.损失函数
CrossEntropyLoss
是PyTorch 中一个常用的损失函数,专门用于 分类任务 ,尤其是多类别分类。它结合了 Softmax 函数和 负对数似然损失 (Negative Log-Likelihood Loss),用于处理分类任务中 概率输出 的问题
适用范围:真实标签应该是一个 整数 ,表示每个样本的真实类别索引(而不是 one-hot 编码)
1 criterion = nn.CrossEntropyLoss()
4.模型训练和评估 1.训练 train_one_epoch
方法定义了一个完整的训练轮次(epoch) ,包括 前向传播、损失计算、反向传播、参数更新 ,并计算 训练损失和准确率
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 def train_one_epoch (_model, dataloader, optimizer, _criterion, device, _scheduler ): """ 训练一个 epoch """ _model.train() running_loss = 0.0 running_corrects = 0 for inputs, labels in dataloader: inputs, labels = inputs.to(device), labels.to(device) optimizer.zero_grad() outputs = _model(inputs) _, predicted = torch.max (outputs, 1 ) loss = _criterion(outputs, labels) loss.backward() optimizer.step() running_loss += loss.item() * inputs.size(0 ) running_corrects += torch.sum ((predicted == labels).int ()) epoch_loss = running_loss / len (dataloader.dataset) epoch_acc = running_corrects / len (dataloader.dataset) return epoch_loss, epoch_acc
2.评估 validate_one_epoch
方法 执行了一次完整的验证(evaluation),其核心功能是 前向传播、计算损失、计算准确率 ,但不会进行 反向传播和参数更新 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 def validate_one_epoch (_model, dataloader, _criterion, device ): """ 进行一个 epoch 的验证 """ _model.eval () running_loss = 0.0 running_corrects = 0 with torch.no_grad(): for inputs, labels in dataloader: inputs, labels = inputs.to(device), labels.to(device) outputs = _model(inputs) _, predicted = torch.max (outputs, 1 ) loss = _criterion(outputs, labels) running_loss += loss.item() * inputs.size(0 ) running_corrects += torch.sum ((predicted == labels).int ()) epoch_loss = running_loss / len (dataloader.dataset) epoch_acc = running_corrects / len (dataloader.dataset) return epoch_loss, epoch_acc
3.筛选与保存 train_model
方法就是一个完整的模型训练过程了,包括
训练多个 epoch(循环多轮数据集)
对每个 epoch 进行训练和验证
记录并打印损失、准确率、学习率变化
保存最佳模型权重到本地
在训练结束后,返回最佳模型参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 def train_model (_model, train_loader, valid_loader, _criterion, optimizer, _scheduler, num_epochs, device, _model_name ): """ 训练完整模型 return 最终模型、损失 & 准确率历史、学习率变化 """ start_time = time.time() best_acc = 0.0 best_model_wts = copy.deepcopy(_model.state_dict()) train_acc_history = [] train_losses = [] valid_acc_history = [] valid_losses = [] learning_rates = [optimizer.param_groups[0 ]['lr' ]] for epoch in range (num_epochs): print (f'Epoch:{epoch + 1 } /{num_epochs} ' ) train_loss, train_acc = train_one_epoch(_model, train_loader, optimizer, _criterion, device, _scheduler) train_losses.append(train_loss) train_acc_history.append(train_acc) valid_loss, valid_acc = validate_one_epoch(_model, valid_loader, _criterion, device) valid_losses.append(valid_loss) valid_acc_history.append(valid_acc) time_elapsed = time.time() - start_time print (f"Time elapsed: {time_elapsed // 60 :.0 f} m {time_elapsed % 60 :.0 f} s" ) print (f"Train Loss: {train_loss:.4 f} Acc: {train_acc:.4 f} " ) print (f"Valid Loss: {valid_loss:.4 f} Acc: {valid_acc:.4 f} " ) if valid_acc > best_acc: best_acc = valid_acc best_model_wts = copy.deepcopy(_model.state_dict()) state = { 'state_dict' : _model.state_dict(), 'best_acc' : best_acc, 'optimizer' : optimizer.state_dict(), } torch.save(state, model_name) print ('Optimizer learning rate : {:.7f}' .format (optimizer.param_groups[0 ]['lr' ])) learning_rates.append(optimizer.param_groups[0 ]['lr' ]) _scheduler.step() print ('-' * 100 ) time_elapsed = time.time() - start_time print (f'Training complete in {time_elapsed // 60 :.0 f} m {time_elapsed % 60 :.0 f} s' ) print (f'Best validation Acc: {best_acc:.4 f} ' ) _model.load_state_dict(best_model_wts) return _model, train_losses, train_acc_history, valid_losses, valid_acc_history, learning_rates
4.一次训练 一次训练20个epoch,训练完把模型保存到本地
1 train_model(model, train_dl, valid_dl, criterion, optimizer_ft, scheduler, 20 , get_device(), model_name)
结果输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 ... ... ... --------------------------------------------------------------------------------- Epoch:19 /20 Time elapsed: 33m 37s Train Loss: 1.6936 Acc: 0.5911 Valid Loss: 3.2956 Acc: 0.3912 Optimizer learning rate : 0.0000100 --------------------------------------------------------------------------------- Epoch:20 /20 Time elapsed: 34m 52s Train Loss: 1.7088 Acc: 0.5882 Valid Loss: 3.2838 Acc: 0.3704 Optimizer learning rate : 0.0000100 --------------------------------------------------------------------------------- Training complete in 35m 1s Best validation Acc: 0.392421
训练完后当前模块文件夹下会有一个best.pt
刚训练好的模型文件,准备二次训练
5.二次训练 加载第一次训练的模型后,解冻模型的所有层,然后使用较小的学习率继续训练 10 轮,更新模型权重。这样做可以让模型达到更好的准确率。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def second_train (_model_name, _train_dl, _valid_dl, _criterion ): _model = init_model(num_classes=102 ) checkpoint = torch.load(_model_name, weights_only=True ) _model.load_state_dict(checkpoint['state_dict' ]) for param in _model.parameters(): param.requires_grad = True _optimizer_next = optim.Adam(_model.parameters(), lr=1e-3 ) _scheduler_next = optim.lr_scheduler.StepLR(optimizer_ft, step_size=7 , gamma=0.1 ) train_model(_model, _train_dl, _valid_dl, _criterion, _optimizer_next, _scheduler_next, 10 , get_device(), _model_name)
二次训练较一次模型训练结果有改善
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 ... ... ... --------------------------------------------------------------------------------- Epoch:9 /10 Time elapsed: 16m 47s Train Loss: 0.6153 Acc: 0.8155 Valid Loss: 1.6921 Acc: 0.6051 Optimizer learning rate : 0.0010000 --------------------------------------------------------------------------------- Epoch:10 /10 Time elapsed: 18m 36s Train Loss: 0.5529 Acc: 0.8361 Valid Loss: 1.6800 Acc: 0.5941 Optimizer learning rate : 0.0010000 --------------------------------------------------------------------------------- Training complete in 18m 36s Best validation Acc: 0.6051
两轮训练完最终对于测试集的验证正确率是59.41%
5.模型测试 展示真实标签和模型预测的对比图,如果预测错误标题显示红色
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 def show_predicted (_model_name ): _model = init_model(num_classes=102 ) checkpoint = torch.load(_model_name, weights_only=True ) _model.load_state_dict(checkpoint['state_dict' ]) label_name = load_label_name() images, labels = next (iter (valid_dl)) _model.eval () output = _model(images) _, predicted_tensor = torch.max (output, 1 ) train_on_gpu = torch.cuda.is_available() predicted = np.squeeze(predicted_tensor.numpy()) if not train_on_gpu else np.squeeze(predicted_tensor.cpu().numpy()) fig, axes = plt.subplots(2 , 4 , figsize=(12 , 6 )) axes = axes.flatten() for i in range (8 ): img = denormalize(images[i]) axes[i].imshow(img) axes[i].axis("off" ) predicted_label = label_name.get(str (predicted[i]), "Unknown" ) true_label = label_name.get(str (labels[i].item()), "Unknown" ) title_color = "green" if predicted_label == true_label else "red" axes[i].set_title("{} ({})" .format (predicted_label, true_label), color=( title_color)) plt.tight_layout() plt.show()
6.总结 本文主要讲的是如何利用预训练模型(ResNet-18)在新数据集上做微调。先冻结大部分网络层,只留最后一层全连接层,再经过训练集训练后解冻全部参数,继续训练,以适应新数据集。 完整流程:
加载 ResNet-18 预训练模型
修改 fc 层,适配新任务
冻结特征层,仅训练 fc 层(特征提取模式)
设置优化器,仅更新 fc 层参数
使用 StepLR 进行学习率衰减
训练(前向传播 + 反向传播 + 梯度更新)
验证模型性能(不更新梯度)
保存最佳模型(防止过拟合)
微调(解冻全部参数,降低学习率)
可视化预测结果,检查分类准确率
7.模型优化 八张图片中有两张识别错误,模型的识别准确率还有提升的空间,如何要进一步提高准确率该怎样做?查了一下,大致可以有以下几种方法:
使用ResNet-50,ResNet-50由于参数更多,可以学习更复杂的特征,分类效果通常更好
采用更好的优化器,例如AdamW (更稳定,L2 正则化)、SGD + Momentum (更适合 Fine-tuning)等
优化学习率调度策略,例如ReduceLROnPlateau (监控验证集 Loss 下降才调整学习率)、CosineAnnealingLR (余弦退火,适用于 Fine-tuning)、OneCycleLR (加速收敛)等
冻结更多层后逐步解冻,先只训练 fc 层,再解冻 ResNet 最后 1-2 个 block,最后解冻整个 ResNet 进行 Fine-tuning
增加 Dropout 防止过拟合
使用 Label Smoothing,目前的 CrossEntropyLoss 过于严格,可以用 Label Smoothing ,防止过拟合
使用混合精度训练(加速 + 提高泛化能力)
使用 Early Stopping,训练更久的同时还能防止过拟合
8.备注 环境:
mac: 15.2
python: 3.12.4
pytorch: 2.5.1
matplotlib: 3.8.4
numpy: 1.26.4
数据集: https://github.com/keychankc/dl_code_for_blog/tree/main/004_cnn_classification_flowers/data
完整代码: https://github.com/keychankc/dl_code_for_blog/blob/main/004_cnn_classification_flowers/main.py