Fashion MNIST 是一个定位在比 MNIST 图片识别问题稍复杂的数据集,它的设定与MNIST 几乎完全一样,包含了 10 类不同类型的衣服、鞋子、包等灰度图片,图片大小为28 × 28,共 70000 张图片,其中 60000 张用于训练集, 10000 张用于测试集,每行是一种类别图片。 Fashion MNIST 除了图片内容与 MNIST 不一样,其它设定都相同,大部分情况可以直接替换掉原来基于 MNIST 训练的算法代码,而不需要额外修改。由于 Fashion MNIST 图片识别相对于 MNIST 图片更难,因此可以用于测试稍复杂的算法性能
自编码器算法原理非常简单,实现方便,训练也较稳定, 相对于 PCA 算法,神经网络的强大表达能力可以学习到输入的高层抽象的隐藏特征向量?,同时也能够基于?重建出输入。 这里基于Fashsion MNIST 数据集进行图片重建
一、数据集的加载以及预处理
# 加载Fashion MNIST数据集
(x_train, y_train), (x_test, y_test) = keras.datasets.fashion_mnist.load_data()
# 归一化处理 x: [0, 1]
x_train, x_test = x_train.astype(np.float32) / 255., x_test.astype(np.float32) / 255.
# 只需要通过图片数据即可构建训练数据集对象,不需要标签y
train_db = tf.data.Dataset.from_tensor_slices(x_train)
train_db = train_db.shuffle(batches * 5).batch(batches)
# 构建测试集对象
test_db = tf.data.Dataset.from_tensor_slices((x_test))
test_db = test_db.batch(batches)
print(x_train.shape, y_train.shape)
print(x_test.shape, y_test.shape)
AE数据集的加载只需要图片数据x即可,因为它的训练的督促信号由标签y变成自身x,但是在加载时还是需要将标签y加载进来
二、网络模型构建
利用编码器将输入图片? ∈ 降维到较低维度的隐藏向量: h∈ ,并基于隐藏向量h利用解码器重建图片
编码器由 3 层全连接层网络组成,输出节点数分别为 256、 128、 20,解码器同样由 3 层全连接网络组成,输出节点数分别为 128、 256、 784
编码器子网络的实现, 利用 3 层的神经网络将长度为 784 的图片向量数据依次降维到 256、 128, 最后降维到 h_dim 维度,每层使用 ReLU 激活函数,最后一层不使用激活函数
# Encodersself.encoder = Sequential([layers.Dense(256, activation=tf.nn.relu),layers.Dense(128, activation=tf.nn.relu),layers.Dense(h_dim)])
解码器子网络, 基于隐藏向量 h_dim 依次升维到 128、 256、 784 长度,除最后一层,激活函数使用 ReLU 函数。解码器的输出为 784 长度的向量,代表了打平后的28 × 28大小图片,通过Reshape 操作即可恢复为图片矩阵
# Decodersself.decoder = Sequential([layers.Dense(128, activation=tf.nn.relu),layers.Dense(256, activation=tf.nn.relu),layers.Dense(784)])
编码器和解码器 2 个子网络均实现在自编码器类 AE 中,在初始化函数中同时创建这两个子网络
class AE(keras.Model):def __init__(self):super(AE, self).__init__()# Encodersself.encoder = Sequential([layers.Dense(256, activation=tf.nn.relu),layers.Dense(128, activation=tf.nn.relu),layers.Dense(h_dim)])# Decodersself.decoder = Sequential([layers.Dense(128, activation=tf.nn.relu),layers.Dense(256, activation=tf.nn.relu),layers.Dense(784)])def call(self, inputs, training):# [b,784] -> [b, 10]h = self.encoder(inputs)# [b, 10] -> [b, 784]x_hat = self.decoder(h)return x_hat
在call()函数中完成前向传播过程,输入图片首先通过 encoder 子网络得到隐藏向量 h,再通过 decoder 得到重建图片。依次调用编码器和解码器的前向传播函数即可,
三、网络装配与训练
自编码器的训练过程与分类器的基本一致,通过误差函数计算出重建向量?与原始输入向量?之间的距离,再利用 TensorFlow 的自动求导机制同时求出 encoder 和 decoder 的梯度,循环更新即可
model = AE()
model.build(input_shape=(None, 784))
model.summary()optimizer = optimizers.Adam(lr=1e-3)for epoch in range(100):for step, x in enumerate(train_db):# [b,28,28] -> [b,784]x = tf.reshape(x, [-1, 784])# 构建梯度记录器with tf.GradientTape() as tape:# 前向计算获得重建的图片x_rec_logits = model(x)# x 与 重建的 x :重建图片与输入之间的损失函数rec_loss = tf.losses.binary_crossentropy(x, x_rec_logits, from_logits=True)rec_loss = tf.reduce_mean(rec_loss)grads = tape.gradient(rec_loss, model.trainable_variables)optimizer.apply_gradients(zip(grads, model.trainable_variables))if step % 100 == 0:print(epoch, step, float(rec_loss))
首先创建自编码器实例和优化器,并设置合适的学习率。每次通过前向计算获得重建图片向量,并利用tf.nn.sigmoid_cross_entropy_with_logits 损失函数计算重建图片与原始图片直接的误差,实
际上利用 MSE 误差函数也是可行的
四、测试(图片重建)
从测试集中随机采样测试图片? ∈ ?test, 经过自编码器计算得到重建后的图片,然后将真实图片与重建图片保存为图片阵列, 并可视化, 方便比对
# 重建图片,从测试集采样一批图片x = next(iter(test_db))# 打平并送入自编码器logits = model(tf.reshape(x, [-1, 784]) )x_hat = tf.sigmoid(logits) # 将输出转换为像素值,使用sigmoid# [b,784] -> [b,28,28]x_hat = tf.reshape(x_hat, [-1, 28, 28])# 输入的前 50 张+重建的前 50 张图片合并, [b, 28, 28] => [2b, 28, 28]x_concat = tf.concat([x[: 50], x_hat[:50]], axis=0)x_concat = x_concat.numpy() * 255.x_concat = x_concat.astype(np.uint8)save_image(x_concat, 'ae_images/rec_epoch_%d.png' % epoch)
save_images函数负责将多张图片合并并保存为一张大图
def save_image(imgs, name):# 创建 280x280 大小图片阵列new_im = Image.new('L', (280, 280))index = 0for i in range(0, 280, 28): # 10 行图片阵列for j in range(0, 280, 28): # 10 列图片阵列im = imgs[index]im = Image.fromarray(im, mode='L')new_im.paste(im, (i, j)) # 写入对应位置index += 1# 保存图片阵列new_im.save(name)
结果:
其中每张图片的左边 5 列为真实图片,右边 5 列为对应的重建图片
五、程序
# -*- codeing = utf-8 -*-
# @Time : 23:37
# @Author:Paranipd
# @File : AE_test.py
# @Software:PyCharmimport os
import tensorflow as tf
import numpy as np
from tensorflow import keras
from PIL import Image
from matplotlib import pyplot as plt
from tensorflow.keras import datasets, Sequential, layers, metrics, optimizers, lossestf.random.set_seed(22)
np.random.seed(22)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2')def save_image(imgs, name):# 创建 280x280 大小图片阵列new_im = Image.new('L', (280, 280))index = 0for i in range(0, 280, 28): # 10 行图片阵列for j in range(0, 280, 28): # 10 列图片阵列im = imgs[index]im = Image.fromarray(im, mode='L')new_im.paste(im, (i, j)) # 写入对应位置index += 1# 保存图片阵列new_im.save(name)h_dim = 20
batches = 512
# 加载Fashion MNIST数据集
(x_train, y_train), (x_test, y_test) = keras.datasets.fashion_mnist.load_data()
# 归一化处理 x: [0, 1]
x_train, x_test = x_train.astype(np.float32) / 255., x_test.astype(np.float32) / 255.
# 只需要通过图片数据即可构建训练数据集对象,不需要标签y
train_db = tf.data.Dataset.from_tensor_slices(x_train)
train_db = train_db.shuffle(batches * 5).batch(batches)
# 构建测试集对象
test_db = tf.data.Dataset.from_tensor_slices((x_test))
test_db = test_db.batch(batches)
print(x_train.shape, y_train.shape)
print(x_test.shape, y_test.shape)class AE(keras.Model):def __init__(self):super(AE, self).__init__()# Encodersself.encoder = Sequential([layers.Dense(256, activation=tf.nn.relu),layers.Dense(128, activation=tf.nn.relu),layers.Dense(h_dim)])# Decodersself.decoder = Sequential([layers.Dense(128, activation=tf.nn.relu),layers.Dense(256, activation=tf.nn.relu),layers.Dense(784)])def call(self, inputs, training):# [b,784] -> [b, 10]h = self.encoder(inputs)# [b, 10] -> [b, 784]x_hat = self.decoder(h)return x_hatmodel = AE()
model.build(input_shape=(None, 784))
model.summary()optimizer = optimizers.Adam(lr=1e-3)for epoch in range(100):for step, x in enumerate(train_db):# [b,28,28] -> [b,784]x = tf.reshape(x, [-1, 784])# 构建梯度记录器with tf.GradientTape() as tape:# 前向计算获得重建的图片x_rec_logits = model(x)# x 与 重建的 x :重建图片与输入之间的损失函数rec_loss = tf.losses.binary_crossentropy(x, x_rec_logits, from_logits=True)rec_loss = tf.reduce_mean(rec_loss)grads = tape.gradient(rec_loss, model.trainable_variables)optimizer.apply_gradients(zip(grads, model.trainable_variables))if step % 100 == 0:print(epoch, step, float(rec_loss))# 评估# 重建图片,从测试集采样一批图片x = next(iter(test_db))# 打平并送入自编码器logits = model(tf.reshape(x, [-1, 784]) )x_hat = tf.sigmoid(logits) # 将输出转换为像素值,使用sigmoid# [b,784] -> [b,28,28]x_hat = tf.reshape(x_hat, [-1, 28, 28])# 输入的前 50 张+重建的前 50 张图片合并, [b, 28, 28] => [2b, 28, 28]x_concat = tf.concat([x[: 50], x_hat[:50]], axis=0)x_concat = x_concat.numpy() * 255.x_concat = x_concat.astype(np.uint8)save_image(x_concat, 'ae_images/rec_epoch_%d.png' % epoch)