深度学习之卷积神经网络 CIFAR10与ResNet18实战

Posted 嘟粥yyds

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深度学习之卷积神经网络 CIFAR10与ResNet18实战相关的知识,希望对你有一定的参考价值。

目录

开发环境

ResNet 原理

网络结构

 导入所需模块并设置GPU显存占用

 BasicBlock

ResNet网络模型

ResNet18

完整代码实现

 运行结果

利用TensorBoard可视化运行结果

开发环境

作者:嘟粥yyds
时间:2023年2月8日
集成开发工具:jupyter notebook 6.5.2
集成开发环境:Python 3.10.6
第三方库:tensorflow-gpu 2.9.0

ResNet 原理

ResNet 通过在卷积层的输入和输出之间添加 Skip Connection 实现层数回退机制, 输入 𝒙 通过两个卷积层,得到特征变换后的输出 ℱ(𝒙) ,与输入 𝒙进行对应元素的相加运算,得到最终输出 ℋ(𝒙)                                                 ℋ(𝒙) = 𝒙 + ℱ(𝒙) ℋ(𝒙) 叫作残差模块 (Residual Block ,简称 ResBlock) 。由于被 Skip Connection 包围的卷积神经网络需要学习映射 ℱ(𝒙) = ℋ(𝒙) − 𝒙 ,故称为残差网络。 为了能够满足输入 𝒙 与卷积层的输出 ℱ(𝒙) 能够相加运算,需要输入 𝒙 shape ℱ(𝒙)的 shape 完全一致。当出现 shape 不一致时,一般通过在 Skip Connection 上添加额外的卷积运 算环节将输入 𝒙 变换到与 ℱ(𝒙) 相同的 shape ,如下图 identity(𝒙)函数所示,其中 identity(𝒙) 以1 × 1 的卷积运算居多,主要用于调整输入的通道数。

下图 对比了 34 层的深度残差网络、 34 层的普通深度网络以及 19 层的 VGG 网 络结构。可以看到,深度残差网络通过堆叠残差模块,达到了较深的网络层数,从而获得了训练稳定、性能优越的深层网络模型。

网络结构

标准的 ResNet18 接受输入为 22 × 22 大小的图片数据,我们将 ResNet18 进行适量调整,使得它输入大小为 32 × 32 ,输出维度为 10 。调整后的 ResNet18 网络结构如图一所示。

 导入所需模块并设置GPU显存占用

import tensorflow as tf
from tensorflow.keras import layers, Sequential, losses, optimizers

# 若不支持gpu则该设置跳过
gpus = tf.config.experimental.list_physical_devices("GPU")
if gpus:
    try:
        # 设置GPU显存占用为按需分配,增长式
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        print(e)

 BasicBlock

深度残差网络并没有增加新的网络层类型,只是通过在输入和输出之间添加一条 Skip Connection ,因此并没有针对 ResNet 的底层实现。在 TensorFlow 中通过调用普通卷积层即可实现残差模块。
# 首先实现中间两个卷积层,Skip Connection 1x1 卷积层的残差模块
class BasicBlock(tf.keras.layers.Layer):
    # 残差模块
    def __init__(self, filter_num, stride=1):
        super(BasicBlock, self).__init__()
        # 第一个卷积单元
        self.conv1 = layers.Conv2D(filter_num, (3, 3), strides=stride, padding='same')
        self.bn1 = layers.BatchNormalization()
        self.relu = layers.Activation('relu')
        # 第二个卷积单元
        self.conv2 = layers.Conv2D(filter_num, (3, 3), strides=1, padding='same')
        self.bn2 = layers.BatchNormalization()
        if stride != 1:  # 通过 1x1 卷积完成 shape 匹配
            self.downsample = Sequential()
            self.downsample.add(layers.Conv2D(filter_num, (1, 1), strides=stride))
        else:  # shape 匹配,直接短接
            self.downsample = lambda x: x

    def call(self, inputs, training=None):
        # 前向计算函数
        # [b, h, w, c],通过第一个卷积单元
        out = self.conv1(inputs)
        out = self.bn1(out)
        out = self.relu(out)
        # 通过第二个卷积单元
        out = self.conv2(out)
        out = self.bn2(out)
        # 通过 identity 模块
        identity = self.downsample(inputs)
        # 2 条路径输出直接相加
        output = layers.add([out, identity])
        output = tf.nn.relu(output)  # 通过激活函数
        return output
在设计深度卷积 神经网络,一般按照特征图高宽ℎ/𝑤 逐渐减少,通道数 𝑐逐渐增大的 经验法则。可以通过堆叠通道数逐渐增大的 Res Block 来实现高层特征的提取,通过 build_resblock 可以一次完成多个残差模块的新建。
def build_resblock(self, filter_num, blocks, stride=1):
    # 辅助函数,堆叠 filter_num 个 BasicBlock
    res_blocks = Sequential()
    # 只有第一个 BasicBlock 的步长可能不为 1,实现下采样
    res_blocks.add(BasicBlock(filter_num, stride))
    for _ in range(1, blocks):  # 其他 BasicBlock 步长都为 1
        res_blocks.add(BasicBlock(filter_num, stride=1))
    return res_blocks

ResNet网络模型

下面我们来实现通用的ResNet网络模型。代码如下:

# 实现ResNet网络模型
class ResNet(tf.keras.Model):
    # 通用的 ResNet 实现类
    def __init__(self, layer_dims, num_classes=10):  # [2, 2, 2, 2]
        super(ResNet, self).__init__()
        # 根网络,预处理
        self.stem = Sequential([
            layers.Conv2D(64, (3, 3), strides=(1, 1)),
            layers.BatchNormalization(),
            layers.Activation('relu'),
            layers.MaxPool2D(pool_size=(2, 2), strides=(1, 1), padding='same')
        ])
        # 堆叠 4 个 Block,每个 Block 包含了多个 BasicBlock,设置步长不一样
        self.layer1 = self.build_resblock(64, layer_dims[0])
        self.layer2 = self.build_resblock(128, layer_dims[1], stride=2)
        self.layer3 = self.build_resblock(256, layer_dims[2], stride=2)
        self.layer4 = self.build_resblock(512, layer_dims[3], stride=2)
        # 通过 Pooling 层将高宽降低为 1x1
        self.avgpool = layers.GlobalAveragePooling2D()
        # 最后连接一个全连接层分类
        self.fc = layers.Dense(num_classes)

    def call(self, inputs, training=None):
        x = self.stem(inputs)
        # 一次通过 4 个模块
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        # 通过池化层
        x = self.avgpool(x)
        # 通过全连接层
        x = self.fc(x)
        return x

ResNet18

        通过调整每个Res Block的堆叠数量和通道数可以产生不同的ResNet,如通过64-64-128-128-256-256-512-512通道数配置,共8个Res Block,可得到ResNet18的网络模型。每个ResBlock包含了2个主要的卷积层,因此卷积层数量是8 ⋅ 2 = 16,加上网络末尾的全连接层,共18层。创建ResNet18(图二)和ResNet34可以简单实现如下:

def resnet18():
    # 通过调整模块内部BasicBlock的数量和配置实现不同的ResNet
    return ResNet([2, 2, 2, 2])


def resnet34():
    # 通过调整模块内部BasicBlock的数量和配置实现不同的ResNet
    return ResNet([3, 4, 6, 3])

完整代码实现

import tensorflow as tf
from tensorflow.keras import layers, Sequential, losses, optimizers

# 不支持gpu可跳过该代码段
gpus = tf.config.experimental.list_physical_devices("GPU")
if gpus:
    try:
        # 设置GPU显存占用为按需分配,增长式
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        print(e)


# 首先实现中间两个卷积层,Skip Connection 1x1 卷积层的残差模块
class BasicBlock(tf.keras.layers.Layer):
    # 残差模块
    def __init__(self, filter_num, stride=1):
        super(BasicBlock, self).__init__()
        # 第一个卷积单元
        self.conv1 = layers.Conv2D(filter_num, (3, 3), strides=stride, padding='same')
        self.bn1 = layers.BatchNormalization()
        self.relu = layers.Activation('relu')
        # 第二个卷积单元
        self.conv2 = layers.Conv2D(filter_num, (3, 3), strides=1, padding='same')
        self.bn2 = layers.BatchNormalization()
        if stride != 1:  # 通过 1x1 卷积完成 shape 匹配
            self.downsample = Sequential()
            self.downsample.add(layers.Conv2D(filter_num, (1, 1), strides=stride))
        else:  # shape 匹配,直接短接
            self.downsample = lambda x: x

    def call(self, inputs, training=None):
        # 前向计算函数
        # [b, h, w, c],通过第一个卷积单元
        out = self.conv1(inputs)
        out = self.bn1(out)
        out = self.relu(out)
        # 通过第二个卷积单元
        out = self.conv2(out)
        out = self.bn2(out)
        # 通过 identity 模块
        identity = self.downsample(inputs)
        # 2 条路径输出直接相加
        output = layers.add([out, identity])
        output = tf.nn.relu(output)  # 通过激活函数
        return output


# 实现ResNet网络模型
class ResNet(tf.keras.Model):
    # 通用的 ResNet 实现类
    def __init__(self, layer_dims, num_classes=10):  # [2, 2, 2, 2]
        super(ResNet, self).__init__()
        # 根网络,预处理
        self.stem = Sequential([
            layers.Conv2D(64, (3, 3), strides=(1, 1)),
            layers.BatchNormalization(),
            layers.Activation('relu'),
            layers.MaxPool2D(pool_size=(2, 2), strides=(1, 1), padding='same')
        ])
        # 堆叠 4 个 Block,每个 Block 包含了多个 BasicBlock,设置步长不一样
        self.layer1 = self.build_resblock(64, layer_dims[0])
        self.layer2 = self.build_resblock(128, layer_dims[1], stride=2)
        self.layer3 = self.build_resblock(256, layer_dims[2], stride=2)
        self.layer4 = self.build_resblock(512, layer_dims[3], stride=2)
        # 通过 Pooling 层将高宽降低为 1x1
        self.avgpool = layers.GlobalAveragePooling2D()
        # 最后连接一个全连接层分类
        self.fc = layers.Dense(num_classes)

    def call(self, inputs, training=None):
        x = self.stem(inputs)
        # 一次通过 4 个模块
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        # 通过池化层
        x = self.avgpool(x)
        # 通过全连接层
        x = self.fc(x)
        return x

    def build_resblock(self, filter_num, blocks, stride=1):
        # 辅助函数,堆叠 filter_num 个 BasicBlock
        res_blocks = Sequential()
        # 只有第一个 BasicBlock 的步长可能不为 1,实现下采样
        res_blocks.add(BasicBlock(filter_num, stride))
        for _ in range(1, blocks):  # 其他 BasicBlock 步长都为 1
            res_blocks.add(BasicBlock(filter_num, stride=1))
        return res_blocks


def resnet18():
    # 通过调整模块内部BasicBlock的数量和配置实现不同的ResNet
    return ResNet([2, 2, 2, 2])


def resnet34():
    # 通过调整模块内部BasicBlock的数量和配置实现不同的ResNet
    return ResNet([3, 4, 6, 3])


# 数据预处理
def preprocess(x, y):
    # 将x缩放到区间[-1, 1]上
    x = 2 * tf.cast(x, dtype=tf.float32) / 255. - 1
    y = tf.cast(y, dtype=tf.int32)
    return x, y


if __name__ == '__main__':
    # 在线下载,加载 CIFAR10 数据集
    (x, y), (x_test, y_test) = tf.keras.datasets.cifar10.load_data()
    y = tf.squeeze(y, axis=1)  # 删除不必要的维度
    y_test = tf.squeeze(y_test, axis=1)
    print(f"x:x.shape  y:y.shape\\nx_test:x_test.shape  y_test:y_test.shape")
    # 构建数据集
    train_db = tf.data.Dataset.from_tensor_slices((x, y))
    test_db = tf.data.Dataset.from_tensor_slices((x_test, y_test))
    # 随机打散、预处理、批量化(batch值过大可能导致gpu无法进行训练)
    train_db = train_db.shuffle(1000).map(preprocess).batch(128)
    test_db = test_db.map(preprocess).batch(128)
    # 采样一个样本
    sample = next(iter(train_db))
    print('sample:', sample[0].shape, sample[1].shape,
          tf.reduce_min(sample[0]), tf.reduce_max(sample[0]))
    model = resnet18()
    model.build(input_shape=(None, 32, 32, 3))
    # 打印网络参数信息
    print(model.summary())
    optimizer = optimizers.Adam(learning_rate=1e-4)
    # 创建TensorBorad环境(保存路径因人而异)
    log_dir = './CIFAR10_ResNet18_logs'
    summary_writer = tf.summary.create_file_writer(log_dir)
    # 开始训练
    for epoch in range(50):  # 训练50个epoch(batch值为128的情况下大约耗时50mins)
        for step, (x1, y1) in enumerate(train_db):
            with tf.GradientTape() as tape:
                # [b, 32, 32, 3]  =>  [b, 10]  前向传播
                logits = model(x1)
                # [b]  =>  [b, 10]  one_hot编码
                y_onehot = tf.one_hot(y1, depth=10)
                # 计算交叉熵
                loss = losses.categorical_crossentropy(y_onehot, logits, from_logits=True)
                loss = tf.reduce_mean(loss)
                # 计算梯度信息
                grads = tape.gradient(loss, model.trainable_variables)
                # 更新网络参数
                optimizer.apply_gradients(zip(grads, model.trainable_variables))
                if step % 100 == 0:
                    correct, total = 0, 0
                    for x2, y2 in test_db:
                        out = model(x2)
                        preds = tf.nn.softmax(out, axis=1)
                        preds = tf.argmax(preds, axis=1)
                        preds = tf.cast(preds, dtype=tf.int32)
                        correct += tf.reduce_sum(tf.cast(tf.equal(preds, y2), dtype=tf.int32))
                        total += x2.shape[0]
                    acc = float(correct / total)
                    print('epoch:', epoch, ' step:', step, ' loss:', float(loss), ' acc:', acc)
                    with summary_writer.as_default():
                        tf.summary.scalar('loss', loss, step=step)
                        tf.summary.scalar('acc', acc, step=step)

 运行结果

x:(50000, 32, 32, 3)  y:(50000,)
x_test:(10000, 32, 32, 3)  y_test:(10000,)
sample: (128, 32, 32, 3) (128,) tf.Tensor(-1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32)
Model: "res_net"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
=================================================================
 sequential (Sequential)     (None, 30, 30, 64)        2048      
                                                                 
 sequential_1 (Sequential)   (None, 30, 30, 64)        148736    
                                                                 
 sequential_2 (Sequential)   (None, 15, 15, 128)       526976    
                                                                 
 sequential_4 (Sequential)   (None, 8, 8, 256)         2102528   
                                                                 
 sequential_6 (Sequential)   (None, 4, 4, 512)         8399360   
                                                                 
 global_average_pooling2d (G  multiple                 0         
 lobalAveragePooling2D)                                          
                                                                 
 dense (Dense)               multiple                  5130      
                                                                 
=================================================================
Total params: 11,184,778
Trainable params: 11,176,970
Non-trainable params: 7,808
_________________________________________________________________

利用TensorBoard可视化运行结果

首先cmd打开命令行,激活你的代码环境,进去你上一步复制的路径,盘符跳转就打个盘号加冒号就行,然后cd 你复制的路径 

 之后会生成一个网址,在屏幕下方,类似于http:/你的电脑名/6006/,复制到浏览器打开,最好用谷歌或者火狐,然后就能查看生成的各种可视化结果了。

若使用的集成开发工具是Pycharm,则可以直接在Pycharm里的终端执行

ResNet18 的网络参数量共 1100 万个,经过 50 Epoch 后,网络的准确率达到了 79.3%。我们这里的实战代码比较精简,在精挑超参数、数据增强等手段加持下,准确率可以达到更高。

以上是关于深度学习之卷积神经网络 CIFAR10与ResNet18实战的主要内容,如果未能解决你的问题,请参考以下文章

深度学习之四:卷积神经网络基础

深度学习之卷积神经网络(CNN)学习

Pytorch 深度卷积网络在 CIFAR10 上不收敛

从软件工程的角度写机器学习6——深度学习之卷积神经网络(CNN)实现

深度学习之卷积神经网络(CNN)

深度学习之 TensorFlow:卷积神经网络