深度学习算法实践12---卷积神经网络(CNN)实现
Posted 最老程序员闫涛
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深度学习算法实践12---卷积神经网络(CNN)实现相关的知识,希望对你有一定的参考价值。
在搞清楚卷积神经网络(CNN)的原理之后,在本篇博文中,我们将讨论基于Theano的算法实现技术。我们还将以MNIST手写数字识别为例,创建卷积神经网络(CNN),训练该网络,使识别误差达到1%以内。
我们首先需要读入MNIST手写数字识别的训练样本集,为此我们定义了一个工具类:
from __future__ import print_function
__docformat__ = 'restructedtext en'
import six.moves.cPickle as pickle
import gzip
import os
import sys
import timeit
import numpy
import theano
import theano.tensor as T
class MnistLoader(object):
def load_data(self, dataset):
data_dir, data_file = os.path.split(dataset)
if data_dir == "" and not os.path.isfile(dataset):
new_path = os.path.join(
os.path.split(__file__)[0],
"..",
"data",
dataset
)
if os.path.isfile(new_path) or data_file == 'mnist.pkl.gz':
dataset = new_path
if (not os.path.isfile(dataset)) and data_file == 'mnist.pkl.gz':
from six.moves import urllib
origin = (
'http://www.iro.umontreal.ca/~lisa/deep/data/mnist/mnist.pkl.gz'
)
print('Downloading data from %s' % origin)
urllib.request.urlretrieve(origin, dataset)
print('... loading data')
# Load the dataset
with gzip.open(dataset, 'rb') as f:
try:
train_set, valid_set, test_set = pickle.load(f, encoding='latin1')
except:
train_set, valid_set, test_set = pickle.load(f)
def shared_dataset(data_xy, borrow=True):
data_x, data_y = data_xy
shared_x = theano.shared(numpy.asarray(data_x,
dtype=theano.config.floatX),
borrow=borrow)
shared_y = theano.shared(numpy.asarray(data_y,
dtype=theano.config.floatX),
borrow=borrow)
return shared_x, T.cast(shared_y, 'int32')
test_set_x, test_set_y = shared_dataset(test_set)
valid_set_x, valid_set_y = shared_dataset(valid_set)
train_set_x, train_set_y = shared_dataset(train_set)
rval = [(train_set_x, train_set_y), (valid_set_x, valid_set_y),
(test_set_x, test_set_y)]
return rval
这个类在之前我们已经用过,在这里就不详细讲解了。之所以单独定义这个类,是因为如果我们将问题换为其他类型时,我们只需要修改这一个类,就可以实现训练数据的载入了,这样简化了程序修改工作量。
我们所采用的方法是将图像先接入卷积神经网络,之后再接入BP网络的隐藏层,然后再接入逻辑回归的输出层,因此我们需要先定义多层前向网络的隐藏层和逻辑回归输出层。隐藏层的定义如下所示:
from __future__ import print_function
__docformat__ = 'restructedtext en'
import os
import sys
import timeit
import numpy
import theano
import theano.tensor as T
from logistic_regression import LogisticRegression
# start-snippet-1
class HiddenLayer(object):
def __init__(self, rng, input, n_in, n_out, W=None, b=None,
activation=T.tanh):
self.input = input
if W is None:
W_values = numpy.asarray(
rng.uniform(
low=-numpy.sqrt(6. / (n_in + n_out)),
high=numpy.sqrt(6. / (n_in + n_out)),
size=(n_in, n_out)
),
dtype=theano.config.floatX
)
if activation == theano.tensor.nnet.sigmoid:
W_values *= 4
W = theano.shared(value=W_values, name='W', borrow=True)
if b is None:
b_values = numpy.zeros((n_out,), dtype=theano.config.floatX)
b = theano.shared(value=b_values, name='b', borrow=True)
self.W = W
self.b = b
lin_output = T.dot(input, self.W) + self.b
self.output = (
lin_output if activation is None
else activation(lin_output)
)
# parameters of the model
self.params = [self.W, self.b]
接下来我们定义逻辑回归算法类:
from __future__ import print_function
__docformat__ = 'restructedtext en'
import six.moves.cPickle as pickle
import gzip
import os
import sys
import timeit
import numpy
import theano
import theano.tensor as T
class LogisticRegression(object):
def __init__(self, input, n_in, n_out):
self.W = theano.shared(
value=numpy.zeros(
(n_in, n_out),
dtype=theano.config.floatX
),
name='W',
borrow=True
)
self.b = theano.shared(
value=numpy.zeros(
(n_out,),
dtype=theano.config.floatX
),
name='b',
borrow=True
)
self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W) + self.b)
self.y_pred = T.argmax(self.p_y_given_x, axis=1)
self.params = [self.W, self.b]
self.input = input
print("Yantao: ***********************************")
def negative_log_likelihood(self, y):
return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]), y])
def errors(self, y):
if y.ndim != self.y_pred.ndim:
raise TypeError(
'y should have the same shape as self.y_pred',
('y', y.type, 'y_pred', self.y_pred.type)
)
if y.dtype.startswith('int'):
return T.mean(T.neq(self.y_pred, y))
else:
raise NotImplementedError()
这段代码在逻辑回归博文中已经详细讨论过了,这里就不再重复了,有兴趣的读者可以查看这篇博文(逻辑回归算法实现)。
做完上述准备工作之后,我们就可以开始卷积神经网络(CNN)实现了。我们先来定义基于简化版Lenet5的卷积神经网络(CNN)的定义,代码如下所示:
from __future__ import print_function
import os
import sys
import timeit
import numpy
import theano
import theano.tensor as T
from theano.tensor.signal import pool
from theano.tensor.nnet import conv2d
class LeNetConvPoolLayer(object):
def __init__(self, rng, input, filter_shape, image_shape, poolsize=(2, 2)):
assert image_shape[1] == filter_shape[1]
self.input = input
fan_in = numpy.prod(filter_shape[1:])
fan_out = (filter_shape[0] * numpy.prod(filter_shape[2:]) //
numpy.prod(poolsize))
W_bound = numpy.sqrt(6. / (fan_in + fan_out))
self.W = theano.shared(
numpy.asarray(
rng.uniform(low=-W_bound, high=W_bound, size=filter_shape),
dtype=theano.config.floatX
),
borrow=True
)
b_values = numpy.zeros((filter_shape[0],), dtype=theano.config.floatX)
self.b = theano.shared(value=b_values, borrow=True)
conv_out = conv2d(
input=input,
filters=self.W,
filter_shape=filter_shape,
input_shape=image_shape
)
pooled_out = pool.pool_2d(
input=conv_out,
ds=poolsize,
ignore_border=True
)
self.output = T.tanh(pooled_out + self.b.dimshuffle('x', 0, 'x', 'x'))
self.params = [self.W, self.b]
self.input = input
上面代码实现了对输入信号的卷积操作,并对结果进行最大化池化。
下面我们来看怎样初始化Lenet层,怎样将Lenet层输出信号转为MLP网络隐藏层的输入信号,具体代码如下所示:
layer0 = LeNetConvPoolLayer(
rng,
input=layer0_input,
image_shape=(batch_size, 1, 28, 28),
filter_shape=(nkerns[0], 1, 5, 5),
poolsize=(2, 2)
)
如上所示,我们的输入信号是28*28的黑白图像,而且我们采用的批量学习,因此输入图像就定义为(batch_size, 1, 28, 28),我们对图像进行5*5卷积操作,根据卷积操作定义,最终得到的卷积输出层为(28-5+1,28-5+1)=(24,24)的“图像”,我们采用2*2的最大池化操作,即取2*2区域像素的最大值作为新的像素点的值,则最终输出层得到12*12的输出信号。
接下来,我们将输出信号继续输入一个Lenet卷积池化层,代码如下所示:
layer1 = LeNetConvPoolLayer(
rng,
input=layer0.output,
image_shape=(batch_size, nkerns[0], 12, 12),
filter_shape=(nkerns[1], nkerns[0], 5, 5),
poolsize=(2, 2)
)
如上所示,这时输入信号变化为12*12的图像,我们还使用5*5的卷积核,可以得到(12-5+1, 12-5+1)=(8,8)的图像,采用2*2最大池化操作后,得到(4,4)图像。可以通过调用layer1.output.flatten(2)将其变为一维信号,从而输入MLP的隐藏层。
下面我们定义Lenet引擎来实现装入数据,定义网络模型,训练网络工作,代码如下所示:
from __future__ import print_function
import os
import sys
import timeit
import numpy
import theano
import theano.tensor as T
from theano.tensor.signal import pool
from theano.tensor.nnet import conv2d
from mnist_loader import MnistLoader
from logistic_regression import LogisticRegression
from hidden_layer import HiddenLayer
from lenet_conv_pool_layer import LeNetConvPoolLayer
class LenetMnistEngine(object):
def __init__(self):
print("create LenetMnistEngine")
def train_model(self):
learning_rate = 0.1
n_epochs = 200
dataset = 'mnist.pkl.gz'
nkerns = [20, 50]
batch_size = 500
(n_train_batches, n_test_batches, n_valid_batches, \\
train_model, test_model, validate_model) = \\
self.build_model(learning_rate, n_epochs, \\
dataset, nkerns, batch_size)
self.train(n_epochs, n_train_batches, n_test_batches, \\
n_valid_batches, train_model, test_model, \\
validate_model)
def run(self):
print("run the model")
classifier = pickle.load(open('best_model.pkl', 'rb'))
predict_model = theano.function(
inputs=[classifier.input],
outputs=classifier.logRegressionLayer.y_pred
)
dataset='mnist.pkl.gz'
loader = MnistLoader()
datasets = loader.load_data(dataset)
test_set_x, test_set_y = datasets[2]
test_set_x = test_set_x.get_value()
predicted_values = predict_model(test_set_x[:10])
print("Predicted values for the first 10 examples in test set:")
print(predicted_values)
def build_model(self, learning_rate=0.1, n_epochs=200,
dataset='mnist.pkl.gz',
nkerns=[20, 50], batch_size=500):
rng = numpy.random.RandomState(23455)
loader = MnistLoader()
datasets = loader.load_data(dataset)
train_set_x, train_set_y = datasets[0]
valid_set_x, valid_set_y = datasets[1]
test_set_x, test_set_y = datasets[2]
n_train_batches = train_set_x.get_value(borrow=True).shape[0]
n_valid_batches = valid_set_x.get_value(borrow=True).shape[0]
n_test_batches = test_set_x.get_value(borrow=True).shape[0]
n_train_batches //= batch_size
n_valid_batches //= batch_size
n_test_batches //= batch_size
index = T.lscalar()
x = T.matrix('x')
y = T.ivector('y')
print('... building the model')
layer0_input = x.reshape((batch_size, 1, 28, 28))
layer0 = LeNetConvPoolLayer(
rng,
input=layer0_input,
image_shape=(batch_size, 1, 28, 28),
filter_shape=(nkerns[0], 1, 5, 5),
poolsize=(2, 2)
)
layer1 = LeNetConvPoolLayer(
rng,
input=layer0.output,
image_shape=(batch_size, nkerns[0], 12, 12),
filter_shape=(nkerns[1], nkerns[0], 5, 5),
poolsize=(2, 2)
)
layer2_input = layer1.output.flatten(2)
layer2 = HiddenLayer(
rng,
input=layer2_input,
n_in=nkerns[1] * 4 * 4,
n_out=500,
activation=T.tanh
)
layer3 = LogisticRegression(input=layer2.output, n_in=500, n_out=10)
cost = layer3.negative_log_likelihood(y)
test_model = theano.function(
[index],
layer3.errors(y),
givens={
x: test_set_x[index * batch_size: (index + 1) * batch_size],
y: test_set_y[index * batch_size: (index + 1) * batch_size]
}
)
validate_model = theano.function(
[index],
layer3.errors(y),
givens={
x: valid_set_x[index * batch_size: (index + 1) * batch_size],
y: valid_set_y[index * batch_size: (index + 1) * batch_size]
}
)
params = layer3.params + layer2.params + layer1.params + layer0.params
grads = T.grad(cost, params)
updates = [
(param_i, param_i - learning_rate * grad_i)
for param_i, grad_i in zip(params, grads)
]
train_model = theano.function(
[index],
cost,
updates=updates,
givens={
x: train_set_x[index * batch_size: (index + 1) * batch_size],
y: train_set_y[index * batch_size: (index + 1) * batch_size]
}
)
return (n_train_batches, n_test_batches, n_valid_batches, \\
train_model, test_model, validate_model)
def train(self, n_epochs, n_train_batches, n_test_batches, n_valid_batches,
train_model, test_model, validate_model):
print('... training')
patience = 10000
patience_increase = 2
improvement_threshold = 0.995
validation_frequency = min(n_train_batches, patience // 2)
best_validation_loss = numpy.inf
best_iter = 0
test_score = 0.
start_time = timeit.default_timer()
epoch = 0
done_looping = False
while (epoch < n_epochs) and (not done_looping):
epoch = epoch + 1
for minibatch_index in range(n_train_batches):
iter = (epoch - 1) * n_train_batches + minibatch_index
if iter % 100 == 0:
print('training @ iter = ', iter)
cost_ij = train_model(minibatch_index)
if (iter + 1) % validation_frequency == 0:
validation_losses = [validate_model(i) for i
in range(n_valid_batches)]
this_validation_loss = numpy.mean(validation_losses)
print('epoch %i, minibatch %i/%i, validation error %f %%' %
(epoch, minibatch_index + 1, n_train_batches,
this_validation_loss * 100.))
if this_validation_loss < best_validation_loss:
if this_validation_loss < best_validation_loss * \\
improvement_threshold:
patience = max(patience, iter * patience_increase)
best_validation_loss = this_validation_loss
best_iter = iter
test_losses = [
test_model(i)
for i in range(n_test_batches)
]
test_score = numpy.mean(test_losses)
with open('best_model.pkl', 'wb') as f:
pickle.dump(classifier, f)
print((' epoch %i, minibatch %i/%i, test error of '
'best model %f %%') %
(epoch, minibatch_index + 1, n_train_batches,
test_score * 100.))
if patience <= iter:
done_looping = True
break
end_time = timeit.default_timer()
print('Optimization complete.')
print('Best validation score of %f %% obtained at iteration %i, '
'with test performance %f %%' %
(best_validation_loss * 100., best_iter + 1, test_score * 100.))
print(('The code for file ' +
os.path.split(__file__)[1] +
' ran for %.2fm' % ((end_time - start_time) / 60.)), file=sys.stderr)
上述代码与之前的MLP的训练代码类似,这里就不再讨论了。在我的Mac笔记本上,运行大约6个小时,会得到错误率小于1%的结果。
以上是关于深度学习算法实践12---卷积神经网络(CNN)实现的主要内容,如果未能解决你的问题,请参考以下文章
深度学习实战——卷积神经网络/CNN实践(LeNetResnet)
深度学习与图神经网络核心技术实践应用高级研修班-Day1卷积神经网络(CNN详解)
深度学习与图神经网络核心技术实践应用高级研修班-Day1卷积神经网络和循环神经网络