python 使用TensorFlow去噪自动编码器实现。
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 使用TensorFlow去噪自动编码器实现。相关的知识,希望对你有一定的参考价值。
models_dir = 'models/' # dir to save/restore models
data_dir = 'data/' # directory to store algorithm data
summary_dir = 'logs/' # directory to store tensorflow summaries
from scipy import misc
import tensorflow as tf
import numpy as np
# ############# #
# Utilities #
# ############# #
def xavier_init(fan_in, fan_out, const=1):
""" Xavier initialization of network weights.
https://stackoverflow.com/questions/33640581/how-to-do-xavier-initialization-on-tensorflow
:param fan_in: fan in of the network (n_features)
:param fan_out: fan out of the network (n_components)
:param const: multiplicative constant
"""
low = -const * np.sqrt(6.0 / (fan_in + fan_out))
high = const * np.sqrt(6.0 / (fan_in + fan_out))
return tf.random_uniform((fan_in, fan_out), minval=low, maxval=high)
def gen_batches(data, batch_size):
""" Divide input data into batches.
:param data: input data
:param batch_size: size of each batch
:return: data divided into batches
"""
data = np.array(data)
for i in range(0, data.shape[0], batch_size):
yield data[i:i+batch_size]
def masking_noise(X, v):
""" Apply masking noise to data in X, in other words a fraction v of elements of X
(chosen at random) is forced to zero.
:param X: array_like, Input data
:param v: int, fraction of elements to distort
:return: transformed data
"""
X_noise = X.copy()
n_samples = X.shape[0]
n_features = X.shape[1]
for i in range(n_samples):
mask = np.random.randint(0, n_features, v)
for m in mask:
X_noise[i][m] = 0.
return X_noise
def salt_and_pepper_noise(X, v):
""" Apply salt and pepper noise to data in X, in other words a fraction v of elements of X
(chosen at random) is set to its maximum or minimum value according to a fair coin flip.
If minimum or maximum are not given, the min (max) value in X is taken.
:param X: array_like, Input data
:param v: int, fraction of elements to distort
:return: transformed data
"""
X_noise = X.copy()
n_features = X.shape[1]
mn = X.min()
mx = X.max()
for i, sample in enumerate(X):
mask = np.random.randint(0, n_features, v)
for m in mask:
if np.random.random() < 0.5:
X_noise[i][m] = mn
else:
X_noise[i][m] = mx
return X_noise
def masking_noise(X, v):
""" Apply masking noise to data in X, in other words a fraction v of elements of X
(chosen at random) is forced to zero.
:param X: array_like, Input data
:param v: int, fraction of elements to distort
:return: transformed data
"""
X_noise = X.copy()
n_samples = X.shape[0]
n_features = X.shape[1]
for i in range(n_samples):
mask = np.random.randint(0, n_features, v)
for m in mask:
X_noise[i][m] = 0.
return X_noise
def salt_and_pepper_noise(X, v):
""" Apply salt and pepper noise to data in X, in other words a fraction v of elements of X
(chosen at random) is set to its maximum or minimum value according to a fair coin flip.
If minimum or maximum are not given, the min (max) value in X is taken.
:param X: array_like, Input data
:param v: int, fraction of elements to distort
:return: transformed data
"""
X_noise = X.copy()
n_features = X.shape[1]
mn = X.min()
mx = X.max()
for i, sample in enumerate(X):
mask = np.random.randint(0, n_features, v)
for m in mask:
if np.random.random() < 0.5:
X_noise[i][m] = mn
else:
X_noise[i][m] = mx
return X_noise
def gen_image(img, width, height, outfile, img_type='grey'):
assert len(img) == width * height or len(img) == width * height * 3
if img_type == 'grey':
misc.imsave(outfile, img.reshape(width, height))
elif img_type == 'color':
misc.imsave(outfile, img.reshape(3, width, height))
import tensorflow as tf
import autoencoder
import datasets
# #################### #
# Flags definition #
# #################### #
flags = tf.app.flags
FLAGS = flags.FLAGS
# Global configuration
flags.DEFINE_string('model_name', '', 'Model name.')
flags.DEFINE_string('dataset', 'mnist', 'Which dataset to use. ["mnist", "cifar10"]')
flags.DEFINE_string('cifar_dir', '', 'Path to the cifar 10 dataset directory.')
flags.DEFINE_integer('seed', -1, 'Seed for the random generators (>= 0). Useful for testing hyperparameters.')
flags.DEFINE_boolean('restore_previous_model', False, 'If true, restore previous model corresponding to model name.')
flags.DEFINE_boolean('encode_train', False, 'Whether to encode and store the training set.')
flags.DEFINE_boolean('encode_valid', False, 'Whether to encode and store the validation set.')
flags.DEFINE_boolean('encode_test', False, 'Whether to encode and store the test set.')
# Stacked Denoising Autoencoder specific parameters
flags.DEFINE_integer('n_components', 256, 'Number of hidden units in the dae.')
flags.DEFINE_string('corr_type', 'none', 'Type of input corruption. ["none", "masking", "salt_and_pepper"]')
flags.DEFINE_float('corr_frac', 0., 'Fraction of the input to corrupt.')
flags.DEFINE_integer('xavier_init', 1, 'Value for the constant in xavier weights initialization.')
flags.DEFINE_string('enc_act_func', 'tanh', 'Activation function for the encoder. ["sigmoid", "tanh"]')
flags.DEFINE_string('dec_act_func', 'none', 'Activation function for the decoder. ["sigmoid", "tanh", "none"]')
flags.DEFINE_string('main_dir', 'dae/', 'Directory to store data relative to the algorithm.')
flags.DEFINE_string('loss_func', 'mean_squared', 'Loss function. ["mean_squared" or "cross_entropy"]')
flags.DEFINE_integer('verbose', 0, 'Level of verbosity. 0 - silent, 1 - print accuracy.')
flags.DEFINE_integer('weight_images', 0, 'Number of weight images to generate.')
flags.DEFINE_string('opt', 'gradient_descent', '["gradient_descent", "ada_grad", "momentum"]')
flags.DEFINE_float('learning_rate', 0.01, 'Initial learning rate.')
flags.DEFINE_float('momentum', 0.5, 'Momentum parameter.')
flags.DEFINE_integer('num_epochs', 10, 'Number of epochs.')
flags.DEFINE_integer('batch_size', 10, 'Size of each mini-batch.')
assert FLAGS.dataset in ['mnist', 'cifar10']
assert FLAGS.enc_act_func in ['sigmoid', 'tanh']
assert FLAGS.dec_act_func in ['sigmoid', 'tanh', 'none']
assert FLAGS.corr_type in ['masking', 'salt_and_pepper', 'none']
assert 0. <= FLAGS.corr_frac <= 1.
assert FLAGS.loss_func in ['cross_entropy', 'mean_squared']
assert FLAGS.opt in ['gradient_descent', 'ada_grad', 'momentum']
if __name__ == '__main__':
if FLAGS.dataset == 'mnist':
# ################# #
# MNIST Dataset #
# ################# #
trX, vlX, teX = datasets.load_mnist_dataset(mode='unsupervised')
elif FLAGS.dataset == 'cifar10':
# ################### #
# Cifar10 Dataset #
# ################### #
trX, teX = datasets.load_cifar10_dataset(FLAGS.cifar_dir, mode='unsupervised')
vlX = teX[:5000] # Validation set is the first half of the test set
else: # cannot be reached, just for completeness
trX = None
vlX = None
teX = None
# Create the object
dae = autoencoder.DenoisingAutoencoder(
seed=FLAGS.seed, model_name=FLAGS.model_name, n_components=FLAGS.n_components,
enc_act_func=FLAGS.enc_act_func, dec_act_func=FLAGS.dec_act_func, xavier_init=FLAGS.xavier_init,
corr_type=FLAGS.corr_type, corr_frac=FLAGS.corr_frac, dataset=FLAGS.dataset,
loss_func=FLAGS.loss_func, main_dir=FLAGS.main_dir, opt=FLAGS.opt,
learning_rate=FLAGS.learning_rate, momentum=FLAGS.momentum,
verbose=FLAGS.verbose, num_epochs=FLAGS.num_epochs, batch_size=FLAGS.batch_size)
# Fit the model
dae.fit(trX, teX, restore_previous_model=FLAGS.restore_previous_model)
# Encode the training data and store it
dae.transform(trX, name='train', save=FLAGS.encode_train)
dae.transform(vlX, name='validation', save=FLAGS.encode_valid)
dae.transform(teX, name='test', save=FLAGS.encode_test)
# save images
dae.get_weights_as_images(28, 28, max_images=FLAGS.weight_images)
import tensorflow as tf
import numpy as np
import os
import zconfig
import utils
class DenoisingAutoencoder(object):
""" Implementation of Denoising Autoencoders using TensorFlow.
The interface of the class is sklearn-like.
"""
def __init__(self, model_name='dae', n_components=256, main_dir='dae/', enc_act_func='tanh',
dec_act_func='none', loss_func='mean_squared', num_epochs=10, batch_size=10, dataset='mnist',
xavier_init=1, opt='gradient_descent', learning_rate=0.01, momentum=0.5, corr_type='none',
corr_frac=0., verbose=1, seed=-1):
"""
:param main_dir: main directory to put the models, data and summary directories
:param n_components: number of hidden units
:param enc_act_func: Activation function for the encoder. ['tanh', 'sigmoid']
:param dec_act_func: Activation function for the decoder. ['tanh', 'sigmoid']
:param loss_func: Loss function. ['mean_squared', 'cross_entropy']
:param xavier_init: Value of the constant for xavier weights initialization
:param opt: Which tensorflow optimizer to use. ['gradient_descent', 'momentum', 'ada_grad']
:param learning_rate: Initial learning rate
:param momentum: Momentum parameter
:param corr_type: Type of input corruption. ["none", "masking", "salt_and_pepper"]
:param corr_frac: Fraction of the input to corrupt.
:param verbose: Level of verbosity. 0 - silent, 1 - print accuracy.
:param num_epochs: Number of epochs
:param batch_size: Size of each mini-batch
:param dataset: Optional name for the dataset.
:param seed: positive integer for seeding random generators. Ignored if < 0.
"""
self.model_name = model_name
self.n_components = n_components
self.main_dir = main_dir
self.enc_act_func = enc_act_func
self.dec_act_func = dec_act_func
self.loss_func = loss_func
self.num_epochs = num_epochs
self.batch_size = batch_size
self.dataset = dataset
self.xavier_init = xavier_init
self.opt = opt
self.learning_rate = learning_rate
self.momentum = momentum
self.corr_type = corr_type
self.corr_frac = corr_frac
self.verbose = verbose
self.seed = seed
if self.seed >= 0:
np.random.seed(self.seed)
tf.set_random_seed(self.seed)
self.models_dir, self.data_dir, self.tf_summary_dir = self._create_data_directories()
self.model_path = self.models_dir + self.model_name
self.input_data = None
self.input_data_corr = None
self.W_ = None
self.bh_ = None
self.bv_ = None
self.encode = None
self.decode = None
self.train_step = None
self.cost = None
self.tf_session = None
self.tf_merged_summaries = None
self.tf_summary_writer = None
self.tf_saver = None
def fit(self, train_set, validation_set=None, restore_previous_model=False):
""" Fit the model to the data.
:param train_set: Training data.
:param validation_set: optional, default None. Validation data.
:param restore_previous_model:
if true, a previous trained model
with the same name of this model is restored from disk to continue training.
:return: self
"""
n_features = train_set.shape[1]
self._build_model(n_features)
with tf.Session() as self.tf_session:
self._initialize_tf_utilities_and_ops(restore_previous_model)
self._train_model(train_set, validation_set)
self.tf_saver.save(self.tf_session, self.models_dir + self.model_name)
def _initialize_tf_utilities_and_ops(self, restore_previous_model):
""" Initialize TensorFlow operations: summaries, init operations, saver, summary_writer.
Restore a previously trained model if the flag restore_previous_model is true.
"""
self.tf_merged_summaries = tf.merge_all_summaries()
init_op = tf.initialize_all_variables()
self.tf_saver = tf.train.Saver()
self.tf_session.run(init_op)
if restore_previous_model:
self.tf_saver.restore(self.tf_session, self.model_path)
self.tf_summary_writer = tf.train.SummaryWriter(self.tf_summary_dir, self.tf_session.graph_def)
def _train_model(self, train_set, validation_set):
"""Train the model.
:param train_set: training set
:param validation_set: validation set. optional, default None
:return: self
"""
corruption_ratio = np.round(self.corr_frac * train_set.shape[1]).astype(np.int)
for i in range(self.num_epochs):
self._run_train_step(train_set, corruption_ratio)
if i % 5 == 0:
if validation_set is not None:
self._run_validation_error_and_summaries(i, validation_set)
def _run_train_step(self, train_set, corruption_ratio):
""" Run a training step. A training step is made by randomly corrupting the training set,
randomly shuffling it, divide it into batches and run the optimizer for each batch.
:param train_set: training set
:param corruption_ratio: fraction of elements to corrupt
:return: self
"""
x_corrupted = self._corrupt_input(train_set, corruption_ratio)
shuff = zip(train_set, x_corrupted)
np.random.shuffle(shuff)
batches = [_ for _ in utils.gen_batches(shuff, self.batch_size)]
for batch in batches:
x_batch, x_corr_batch = zip(*batch)
tr_feed = {self.input_data: x_batch, self.input_data_corr: x_corr_batch}
self.tf_session.run(self.train_step, feed_dict=tr_feed)
def _corrupt_input(self, data, v):
""" Corrupt a fraction 'v' of 'data' according to the
noise method of this autoencoder.
:return: corrupted data
"""
if self.corr_type == 'masking':
x_corrupted = utils.masking_noise(data, v)
elif self.corr_type == 'salt_and_pepper':
x_corrupted = utils.salt_and_pepper_noise(data, v)
elif self.corr_type == 'none':
x_corrupted = data
else:
x_corrupted = None
return x_corrupted
def _run_validation_error_and_summaries(self, epoch, validation_set):
""" Run the summaries and error computation on the validation set.
:param epoch: current epoch
:param validation_set: validation data
:return: self
"""
vl_feed = {self.input_data: validation_set, self.input_data_corr: validation_set}
result = self.tf_session.run([self.tf_merged_summaries, self.cost], feed_dict=vl_feed)
summary_str = result[0]
err = result[1]
self.tf_summary_writer.add_summary(summary_str, epoch)
if self.verbose == 1:
print("Validation cost at step %s: %s" % (epoch, err))
def _build_model(self, n_features):
""" Creates the computational graph.
:type n_features: int
:param n_features: Number of features.
:return: self
"""
self.input_data, self.input_data_corr = self._create_placeholders(n_features)
self.W_, self.bh_, self.bv_ = self._create_variables(n_features)
self._create_encode_layer()
self._create_decode_layer()
self._create_cost_function_node()
self._create_train_step_node()
def _create_placeholders(self, n_features):
""" Create the TensorFlow placeholders for the model.
:return: tuple(input_data(shape(None, n_features)),
input_data_corr(shape(None, n_features)))
"""
input_data = tf.placeholder('float', [None, n_features], name='x-input')
input_data_corr = tf.placeholder('float', [None, n_features], name='x-corr-input')
return input_data, input_data_corr
def _create_variables(self, n_features):
""" Create the TensorFlow variables for the model.
:return: tuple(weights(shape(n_features, n_components)),
hidden bias(shape(n_components)),
visible bias(shape(n_features)))
"""
W_ = tf.Variable(utils.xavier_init(n_features, self.n_components, self.xavier_init), name='enc-w')
bh_ = tf.Variable(tf.zeros([self.n_components]), name='hidden-bias')
bv_ = tf.Variable(tf.zeros([n_features]), name='visible-bias')
return W_, bh_, bv_
def _create_encode_layer(self):
""" Create the encoding layer of the network.
:return: self
"""
with tf.name_scope("W_x_bh"):
if self.enc_act_func == 'sigmoid':
self.encode = tf.nn.sigmoid(tf.matmul(self.input_data_corr, self.W_) + self.bh_)
elif self.enc_act_func == 'tanh':
self.encode = tf.nn.tanh(tf.matmul(self.input_data_corr, self.W_) + self.bh_)
else:
self.encode = None
def _create_decode_layer(self):
""" Create the decoding layer of the network.
:return: self
"""
with tf.name_scope("Wg_y_bv"):
if self.dec_act_func == 'sigmoid':
self.decode = tf.nn.sigmoid(tf.matmul(self.encode, tf.transpose(self.W_)) + self.bv_)
elif self.dec_act_func == 'tanh':
self.decode = tf.nn.tanh(tf.matmul(self.encode, tf.transpose(self.W_)) + self.bv_)
elif self.dec_act_func == 'none':
self.decode = tf.matmul(self.encode, tf.transpose(self.W_)) + self.bv_
else:
self.decode = None
def _create_cost_function_node(self):
""" create the cost function node of the network.
:return: self
"""
with tf.name_scope("cost"):
if self.loss_func == 'cross_entropy':
self.cost = - tf.reduce_sum(self.input_data * tf.log(self.decode))
_ = tf.scalar_summary("cross_entropy", self.cost)
elif self.loss_func == 'mean_squared':
self.cost = tf.sqrt(tf.reduce_mean(tf.square(self.input_data - self.decode)))
_ = tf.scalar_summary("mean_squared", self.cost)
else:
self.cost = None
def _create_train_step_node(self):
""" create the training step node of the network.
:return: self
"""
with tf.name_scope("train"):
if self.opt == 'gradient_descent':
self.train_step = tf.train.GradientDescentOptimizer(self.learning_rate).minimize(self.cost)
elif self.opt == 'ada_grad':
self.train_step = tf.train.AdagradOptimizer(self.learning_rate).minimize(self.cost)
elif self.opt == 'momentum':
self.train_step = tf.train.MomentumOptimizer(self.learning_rate, self.momentum).minimize(self.cost)
else:
self.train_step = None
def transform(self, data, name='train', save=False):
""" Transform data according to the model.
:param data: Data to transform
:param name: Identifier for the data that is being encoded
:param save: If true, save data to disk
:return: transformed data
"""
with tf.Session() as self.tf_session:
self.tf_saver.restore(self.tf_session, self.models_dir + self.model_name)
encoded_data = self.encode.eval({self.input_data_corr: data})
if save:
np.save(self.data_dir + self.model_name + '-' + name, encoded_data)
return encoded_data
def load_model(self, shape, model_path):
""" Restore a previously trained model from disk.
:param shape: tuple(n_features, n_components)
:param model_path: path to the trained model
:return: self, the trained model
"""
self.n_components = shape[1]
self._build_model(shape[0])
init_op = tf.initialize_all_variables()
self.tf_saver = tf.train.Saver()
with tf.Session() as self.tf_session:
self.tf_session.run(init_op)
self.tf_saver.restore(self.tf_session, model_path)
def get_model_parameters(self):
""" Return the model parameters in the form of numpy arrays.
:return: model parameters
"""
with tf.Session() as self.tf_session:
self.tf_saver.restore(self.tf_session, self.models_dir + self.model_name)
return {
'enc_w': self.W_.eval(),
'enc_b': self.bh_.eval(),
'dec_b': self.bv_.eval()
}
def _create_data_directories(self):
""" Create the three directories for storing respectively the models,
the data generated by training and the TensorFlow's summaries.
:return: tuple of strings(models_dir, data_dir, summary_dir)
"""
self.main_dir = self.main_dir + '/' if self.main_dir[-1] != '/' else self.main_dir
models_dir = zconfig.models_dir + self.main_dir
data_dir = zconfig.data_dir + self.main_dir
summary_dir = zconfig.summary_dir + self.main_dir
for d in [models_dir, data_dir, summary_dir]:
if not os.path.isdir(d):
os.mkdir(d)
return models_dir, data_dir, summary_dir
def get_weights_as_images(self, width, height, outdir='img/', max_images=10, model_path=None):
""" Save the weights of this autoencoder as images, one image per hidden unit.
Useful to visualize what the autoencoder has learned.
:type width: int
:param width: Width of the images
:type height: int
:param height: Height of the images
:type outdir: string, default 'data/sdae/img'
:param outdir: Output directory for the images. This path is appended to self.data_dir
:type max_images: int, default 10
:param max_images: Number of images to return.
"""
assert max_images <= self.n_components
outdir = self.data_dir + outdir
if not os.path.isdir(outdir):
os.mkdir(outdir)
with tf.Session() as self.tf_session:
if model_path is not None:
self.tf_saver.restore(self.tf_session, model_path)
else:
self.tf_saver.restore(self.tf_session, self.models_dir + self.model_name)
enc_weights = self.W_.eval()
perm = np.random.permutation(self.n_components)[:max_images]
for p in perm:
enc_w = np.array([i[p] for i in enc_weights])
image_path = outdir + self.model_name + '-enc_weights_{}.png'.format(p)
utils.gen_image(enc_w, width, height, image_path)
以上是关于python 使用TensorFlow去噪自动编码器实现。的主要内容,如果未能解决你的问题,请参考以下文章
TensorFlow实现去噪自编码器及使用—Masking Noise Auto Encoder
TensorFlow实现去噪自编码器及使用—Masking Noise Auto Encoder
TensorFlow实现去噪自编码器及使用—Masking Noise Auto Encoder