实现18 层的深度残差网络 ResNet18,并在 CIFAR10 图片数据集上训练与测试。标准的 ResNet18 接受输入为224 × 224 大小的图片数据,我们将 ResNet18 进行适量调整,使得它输入大小为32 × 32,输出维度为 10。调整后的 ResNet18 网络结构如图:
一、数据集加载以及数据集预处理
def preprocess(x, y):# 将数据映射到-1~1x = 2 * tf.cast(x, dtype=tf.float32) / 255. - 1y = tf.cast(y, dtype=tf.int32) # 类型转换return x, y(x, y), (x_test, y_test) = load.load_data() # 加载数据集
y = tf.squeeze(y, axis=1) # 删除不必要的维度
y_test = tf.squeeze(y_test, axis=1) # 删除不必要的维度
print(x.shape, y.shape, x_test.shape, y_test.shape)train_db = tf.data.Dataset.from_tensor_slices((x, y)) # 构建训练集
# 随机打散,预处理,批量化
train_db = train_db.shuffle(1000).map(preprocess).batch(512)test_db = tf.data.Dataset.from_tensor_slices((x_test, y_test)) # 构建测试集
# 预处理,批量化
test_db = test_db.map(preprocess).batch(512)
# 采样一个样本
sample = next(iter(train_db))
数据集的各种处理还是合以前一样。由于网上在线下载数据集不行,使用的是自定义的函数来加载数据的。数据集加载以后,构建训练集合测试集并进行相应的类型转换,训练集除了要进行批处理还要进行打散。
二、网络模型构建
1. BasicBlock
首先实现中间两个卷积层, Skip Connection 1x1 卷积层的残差模块
class BasicBlock(layers.Layer):# 残差模块def __init__(self, filter_num, stride=1):super(BasicBlock, self).__init__()# 第一个卷积单元self.conv1 = layers.Conv2D(filter_num, (3, 3), strides=stride, padding='same')self.bn1 = layers.BatchNormalization()self.relu = layers.Activation('relu')# 第二个卷积单元self.conv2 = layers.Conv2D(filter_num, (3, 3), strides=1, padding='same')self.bn2 = layers.BatchNormalization()# padding都是same,所以只有stride=1,输入与输出形状相同,否则高宽减少为原来的1/sif stride != 1: # 通过1x1卷积完成shape匹配self.downsample = Sequential()self.downsample.add(layers.Conv2D(filter_num, (1, 1), strides=stride))else: # shape匹配,直接短接self.downsample = lambda x: xdef call(self, inputs, training=None):# [b, h, w, c],通过第一个卷积单元out = self.conv1(inputs)out = self.bn1(out)out = self.relu(out)# 通过第二个卷积单元out = self.conv2(out)out = self.bn2(out)# 通过identity模块identity = self.downsample(inputs)# 2条路径输出直接相加output = layers.add([out, identity])output = tf.nn.relu(output) # 激活函数return output
通过build_resblock 可以一次完成多个残差模块的新建
def build_resblock(self, filter_num, blocks, stride=1):# 辅助函数,堆叠filter_num个BasicBlockres_blocks = Sequential()# 只有第一个BasicBlock的步长可能不为1,实现下采样res_blocks.add(BasicBlock(filter_num, stride))# 每个每个ResBlock包含两个 blocks 个BasicBlockfor _ in range(1, blocks): # 其他BasicBlock步长都为1res_blocks.add(BasicBlock(filter_num, stride=1))return res_blocks
2.ResNet
在设计深度卷积神经网络时,一般按照特征图高宽?/?逐渐减少,通道数?逐渐增大的经验法则。可以通过堆叠通道数逐渐增大的 Res Block 来实现高层特征的提取。下面来实现通用的 ResNet 网络模型。
# 通用的ResNet实现类
class ResNet(keras.Model):def __init__(self, layer_dims, num_classes=10):# layer_dims:[2, 2, 2, 2] 4个ResBlock,每个ResBlock包含两个basicblock num_classes:分类数目super(ResNet, self).__init__()# 根网络,预处理self.stem = Sequential([layers.Conv2D(64, (3, 3), strides=(1, 1)),layers.BatchNormalization(),layers.Activation('relu'),layers.MaxPool2D(pool_size=(2, 2), strides=(1, 1), padding='same')])# 堆叠4个Block,每个block包含了多个BasicBlock,设置步长不一样# 在设计深度卷积神经网络时,一般按照特征图高宽?/?逐渐减少,通道数?逐渐增大的经验法则。# 每个ResBlock包含两个basicblockself.layer1 = self.build_resblock(64, layer_dims[0])self.layer2 = self.build_resblock(128, layer_dims[1], stride=2)self.layer3 = self.build_resblock(256, layer_dims[2], stride=2)self.layer4 = self.build_resblock(512, layer_dims[3], stride=2)# 输出:[b, 512, h,w],无法确定h,w# 通过Pooling层将高宽降低为1x1, 相当由于打平成全连接层self.avgpool = layers.GlobalAveragePooling2D()# 最后连接一个全连接层分类self.fc = layers.Dense(num_classes)# 向前计算def call(self, inputs, training=None):# 通过根网络x = self.stem(inputs)# 一次通过4个模块x = self.layer1(x)x = self.layer2(x)x = self.layer3(x)x = self.layer4(x)# 通过池化层x = self.avgpool(x)# 通过全连接层x = self.fc(x)return x
通过调整每个 Res Block 的堆叠数量和通道数可以产生不同的 ResNet,如通过 64-64-128-128-256-256-512-512 通道数配置,共 8 个 Res Block,可得到 ResNet18 的网络模型。每个ResBlock 包含了 2 个主要的卷积层,因此卷积层数量是8 ? 2 = 16,加上网络末尾的全连接层,共 18 层。
注意,在卷积层之后一般是添加全连接层来进行特征的向量化,但是全连接层有一个非常致命的弱点就是参数量过大,特别是与最后一个卷积层相连的全连接层。一方面增加了Training以及testing的计算量,降低了速度;另外一方面参数量过大容易过拟合。虽然使用了类似dropout等手段去处理,还是达不到效果。
全连接层将卷积层展开成向量之后,再要针对每个feature map进行分类,Global Average Pooling直接完成这两步操作。
所以,这里这卷积层之后先进行了GlobalAveragePooling2D,然后才进行全连接层的连接。全局平均池化,深度神经网络中经常使用的一个层,使用前后的尺寸分别为[B,H,W,C]->[B,C].
创建ResNet18和ResNet34网络模型:
def resnet18():# 通过调整模块内部BasicBlock的数量和配置实现不同的ResNetreturn ResNet([2, 2, 2, 2])def resnet34():# 通过调整模块内部BasicBlock的数量和配置实现不同的ResNetreturn ResNet([3, 4, 6, 3])
三、网络装配
在完成网络模型的搭建后,需要指定网络使用的优化器对象、 损失函数类型, 评价指标等设定,这一步称为装配
model = resnet18() # ResNet18网络model.build(input_shape=(None, 32, 32, 3))model.summary() # 统计网络参数optimizer = optimizers.Adam(lr=1e-4) # 构建优化器
优化器主要使用apply_gradients方法传入变量和对应梯度从而来对给定变量进行迭代
四、计算梯度,代价函数并更新参数
在使用自动求导功能计算梯度,需要将向前计算过程放置在tf.GradientTape()环境中, 利用GradientTape对象的gradient()方法自动求解参数的梯度, 并利用optimizers对象更新参数
with tf.GradientTape() as tape:# [b, 32, 32, 3] => [b, 10],前向传播logits = model(x)# [b] => [b, 10],one-hot编码y_onehot = tf.one_hot(y, depth=10)# 计算交叉熵loss = tf.losses.categorical_crossentropy(y_onehot, logits, from_logits=True)loss = tf.reduce_mean(loss)# 计算梯度信息grads = tape.gradient(loss, model.trainable_variables)# 更新网络参数optimizer.apply_gradients(zip(grads, model.trainable_variables))if step % 50 == 0:print(epoch, step, 'loss:', float(loss))
五、测试
在测试阶段,由于不需要记录梯度信息,代码一般不需要写在 with tf.GradientTape() as tape 环境中。前向计算得到的输出经过 softmax 函数后,代表了网络预测当前图片输入?属于类别?的概率?(?标签是?|?), ? ∈ 9 。通过 argmax 函数选取概率最大的元素所在的索引,作为当前?的预测类别,与真实标注?比较,通过计算比较结果中间 True 的数量并求和来统计预测正确的样本的个数,最后除以总样本的个数,得出网络的测试准确度
total_num = 0total_correct = 0for x, y in test_db:logits = model(x)prob = tf.nn.softmax(logits, axis=1)pred = tf.argmax(prob, axis=1)pred = tf.cast(pred, dtype=tf.int32)correct = tf.cast(tf.equal(pred, y), dtype=tf.int32)correct = tf.reduce_sum(correct)total_num += x.shape[0]total_correct += int(correct)acc = total_correct / total_numprint(epoch, 'acc:', acc)
测试结果:
跑了2个epoch, 准确率比较低,跑完应该能达到80%左右。
完整程序
resnet.py文件
# -*- codeing = utf-8 -*-
# @Time : 13:21
# @Author:Paranipd
# @File : resnet.py
# @Software:PyCharmimport tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, Sequentialclass BasicBlock(layers.Layer):# 残差模块def __init__(self, filter_num, stride=1):super(BasicBlock, self).__init__()# 第一个卷积单元self.conv1 = layers.Conv2D(filter_num, (3, 3), strides=stride, padding='same')self.bn1 = layers.BatchNormalization()self.relu = layers.Activation('relu')# 第二个卷积单元self.conv2 = layers.Conv2D(filter_num, (3, 3), strides=1, padding='same')self.bn2 = layers.BatchNormalization()# padding都是same,所以只有stride=1,输入与输出形状相同,否则高宽减少为原来的1/sif stride != 1: # 通过1x1卷积完成shape匹配self.downsample = Sequential()self.downsample.add(layers.Conv2D(filter_num, (1, 1), strides=stride))else: # shape匹配,直接短接self.downsample = lambda x: xdef call(self, inputs, training=None):# [b, h, w, c],通过第一个卷积单元out = self.conv1(inputs)out = self.bn1(out)out = self.relu(out)# 通过第二个卷积单元out = self.conv2(out)out = self.bn2(out)# 通过identity模块identity = self.downsample(inputs)# 2条路径输出直接相加output = layers.add([out, identity])output = tf.nn.relu(output) # 激活函数return output# 通用的ResNet实现类
class ResNet(keras.Model):def __init__(self, layer_dims, num_classes=10):# layer_dims:[2, 2, 2, 2] 4个ResBlock,每个ResBlock包含两个basicblock num_classes:分类数目super(ResNet, self).__init__()# 根网络,预处理self.stem = Sequential([layers.Conv2D(64, (3, 3), strides=(1, 1)),layers.BatchNormalization(),layers.Activation('relu'),layers.MaxPool2D(pool_size=(2, 2), strides=(1, 1), padding='same')])# 堆叠4个Block,每个block包含了多个BasicBlock,设置步长不一样# 在设计深度卷积神经网络时,一般按照特征图高宽?/?逐渐减少,通道数?逐渐增大的经验法则。# 每个ResBlock包含两个basicblockself.layer1 = self.build_resblock(64, layer_dims[0])self.layer2 = self.build_resblock(128, layer_dims[1], stride=2)self.layer3 = self.build_resblock(256, layer_dims[2], stride=2)self.layer4 = self.build_resblock(512, layer_dims[3], stride=2)# 输出:[b, 512, h,w],无法确定h,w# 通过Pooling层将高宽降低为1x1, 相当由于打平成全连接层self.avgpool = layers.GlobalAveragePooling2D()# 最后连接一个全连接层分类self.fc = layers.Dense(num_classes)# 向前计算def call(self, inputs, training=None):# 通过根网络x = self.stem(inputs)# 一次通过4个模块x = self.layer1(x)x = self.layer2(x)x = self.layer3(x)x = self.layer4(x)# 通过池化层x = self.avgpool(x)# 通过全连接层x = self.fc(x)return xdef build_resblock(self, filter_num, blocks, stride=1):# 辅助函数,堆叠filter_num个BasicBlockres_blocks = Sequential()# 只有第一个BasicBlock的步长可能不为1,实现下采样res_blocks.add(BasicBlock(filter_num, stride))# 每个每个ResBlock包含两个 blocks 个BasicBlockfor _ in range(1, blocks): # 其他BasicBlock步长都为1res_blocks.add(BasicBlock(filter_num, stride=1))return res_blocksdef resnet18():# 通过调整模块内部BasicBlock的数量和配置实现不同的ResNetreturn ResNet([2, 2, 2, 2])def resnet34():# 通过调整模块内部BasicBlock的数量和配置实现不同的ResNetreturn ResNet([3, 4, 6, 3])
mian.py
# -*- codeing = utf-8 -*-
# @Time : 14:22
# @Author:Paranipd
# @File : resnet18_train.py
# @Software:PyCharmimport tensorflow as tf
from tensorflow.keras import layers, optimizers, datasets, Sequential
import os
from resnet import resnet18
import loados.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
tf.random.set_seed(2345)def preprocess(x, y):# 将数据映射到-1~1x = 2 * tf.cast(x, dtype=tf.float32) / 255. - 1y = tf.cast(y, dtype=tf.int32) # 类型转换return x, y(x, y), (x_test, y_test) = load.load_data() # 加载数据集
y = tf.squeeze(y, axis=1) # 删除不必要的维度
y_test = tf.squeeze(y_test, axis=1) # 删除不必要的维度
print(x.shape, y.shape, x_test.shape, y_test.shape)train_db = tf.data.Dataset.from_tensor_slices((x, y)) # 构建训练集
# 随机打散,预处理,批量化
train_db = train_db.shuffle(1000).map(preprocess).batch(512)test_db = tf.data.Dataset.from_tensor_slices((x_test, y_test)) # 构建测试集
# 预处理,批量化
test_db = test_db.map(preprocess).batch(512)
# 采样一个样本
sample = next(iter(train_db))def main():# [b, 32, 32, 3] => [b, 1, 1, 512]model = resnet18() # ResNet18网络model.build(input_shape=(None, 32, 32, 3))model.summary() # 统计网络参数optimizer = optimizers.Adam(lr=1e-4) # 构建优化器for epoch in range(100): # 训练epochfor step, (x, y) in enumerate(train_db):with tf.GradientTape() as tape:# [b, 32, 32, 3] => [b, 10],前向传播logits = model(x)# [b] => [b, 10],one-hot编码y_onehot = tf.one_hot(y, depth=10)# 计算交叉熵loss = tf.losses.categorical_crossentropy(y_onehot, logits, from_logits=True)loss = tf.reduce_mean(loss)# 计算梯度信息grads = tape.gradient(loss, model.trainable_variables)# 更新网络参数optimizer.apply_gradients(zip(grads, model.trainable_variables))if step % 50 == 0:print(epoch, step, 'loss:', float(loss))total_num = 0total_correct = 0for x, y in test_db:logits = model(x)prob = tf.nn.softmax(logits, axis=1)pred = tf.argmax(prob, axis=1)pred = tf.cast(pred, dtype=tf.int32)correct = tf.cast(tf.equal(pred, y), dtype=tf.int32)correct = tf.reduce_sum(correct)total_num += x.shape[0]total_correct += int(correct)acc = total_correct / total_numprint(epoch, 'acc:', acc)if __name__ == '__main__':main()