python Theanoによるロジスティック回帰の実装
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python Theanoによるロジスティック回帰の実装相关的知识,希望对你有一定的参考价值。
#coding: utf-8
import cPickle
import gzip
import os
import sys
import time
import numpy as np
import theano
import theano.tensor as T
# sklearn風のインタフェースで実装
class LogisticRegression(object):
def __init__(self, n_in, n_out):
# 重み行列
self.W = theano.shared(value=np.zeros((n_in, n_out), dtype=theano.config.floatX),
name='W',
borrow=True)
# バイアスベクトル
self.b = theano.shared(value=np.zeros((n_out,), dtype=theano.config.floatX),
name='b',
borrow=True)
self.params = [self.W, self.b]
def p_y_given_x(self, X):
return T.nnet.softmax(T.dot(X, self.W) + self.b)
def negative_log_likelihood(self, X, y):
# 式通りに計算するとmeanではなくsum
return -T.mean(T.log(self.p_y_given_x(X))[T.arange(y.shape[0]), y])
def errors(self, X, y):
# yは正解ラベル
if y.ndim != self.predict(X).ndim:
raise TypeError('y should have the same shape as self.predict',
('y', y.type, 'predict', self.predict.type))
if y.dtype.startswith('int'):
return T.mean(T.neq(self.predict(X), y))
else:
raise NotImplementedError()
def fit(self, train_set_x, train_set_y, valid_set_x, valid_set_y, learning_rate=0.13, n_epochs=1000, batch_size=600):
n_train_batches = train_set_x.get_value(borrow=True).shape[0] / batch_size
n_valid_batches = valid_set_x.get_value(borrow=True).shape[0] / batch_size
index = T.lscalar()
X = T.matrix('X')
y = T.ivector('y')
# コスト(最小化したい)
cost = self.negative_log_likelihood(X, y)
validate_model = theano.function(
inputs=[index],
outputs=self.errors(X, y),
givens={
X: valid_set_x[index * batch_size: (index + 1) * batch_size],
y: valid_set_y[index * batch_size: (index + 1) * batch_size]
})
# コスト関数の各パラメータでの微分を計算
g_W = T.grad(cost=cost, wrt=self.W)
g_b = T.grad(cost=cost, wrt=self.b)
updates = [(self.W, self.W - learning_rate * g_W),
(self.b, self.b - learning_rate * g_b)]
# この関数の呼び出し時にindexに具体的な値が初めて渡される
train_model = theano.function(
inputs=[index],
outputs=cost,
updates=updates,
givens={
X: train_set_x[index * batch_size: (index + 1) * batch_size],
y: train_set_y[index * batch_size: (index + 1) * batch_size]
})
# 訓練
print '... training the model'
# eary-stoppingパラメータ
patience = 5000
patience_increase = 2
improvement_threshold = 0.995
validation_frequency = min(n_train_batches, patience / 2)
best_validation_loss = np.inf
done_looping = False
epoch = 0
while (epoch < n_epochs) and (not done_looping):
epoch = epoch + 1
for minibatch_index in xrange(n_train_batches):
train_model(minibatch_index)
iter = (epoch - 1) * n_train_batches + minibatch_index
if (iter + 1) % validation_frequency == 0:
validation_losses = [validate_model(i) for i in xrange(n_valid_batches)]
this_validation_loss = np.mean(validation_losses)
print "epoch %i, minibatch %i/%i, validation error %f %%" % (epoch, minibatch_index + 1, n_train_batches, this_validation_loss * 100)
if this_validation_loss < best_validation_loss:
if this_validation_loss < best_validation_loss * improvement_threshold:
# 十分改善したならpatienceを上げて多くループを回せるようになる
patience = max(patience, iter * patience_increase)
best_validation_loss = this_validation_loss
# patienceを超えたらループを終了
if patience <= iter:
done_looping = True
break
def predict(self, X):
return T.argmax(self.p_y_given_x(X), axis=1)
def calc_error_rate(self, test_set_x, test_set_y, batch_size=600):
n_test_batches = test_set_x.get_value(borrow=True).shape[0] / batch_size
index = T.lscalar()
X = T.matrix('X')
y = T.ivector('y')
# 誤差を出力する関数をコンパイル
test_model = theano.function(
inputs=[index], # 入力となるシンボルのリスト
outputs=self.errors(X, y), # 出力となるシンボル
givens={ # ここで初めてシンボル x, y を具体的な値で置き換える
X: test_set_x[index * batch_size: (index + 1) * batch_size],
y: test_set_y[index * batch_size: (index + 1) * batch_size]
})
test_losses = [test_model(i) for i in xrange(n_test_batches)]
test_score = np.mean(test_losses)
return test_score
def load_data(dataset):
"""データセットをロードしてGPUの共有変数に格納"""
f = gzip.open(dataset, 'rb')
train_set, valid_set, test_set = cPickle.load(f)
f.close()
def shared_dataset(data_xy, borrow=True):
data_x, data_y = data_xy
shared_x = theano.shared(np.asarray(data_x, dtype=theano.config.floatX), borrow=borrow)
shared_y = theano.shared(np.asarray(data_y, dtype=theano.config.floatX), borrow=borrow)
return shared_x, T.cast(shared_y, 'int32')
test_set_x, test_set_y = shared_dataset(test_set)
valid_set_x, valid_set_y = shared_dataset(valid_set)
train_set_x, train_set_y = shared_dataset(train_set)
rval = [(train_set_x, train_set_y),
(valid_set_x, valid_set_y),
(test_set_x, test_set_y)]
return rval
def sgd_optimization_mnist(learning_rate=0.13, n_epochs=1000, batch_size=600):
# 学習データの準備
datasets = load_data('mnist.pkl.gz')
train_set_x, train_set_y = datasets[0]
valid_set_x, valid_set_y = datasets[1]
test_set_x, test_set_y = datasets[2]
# https://groups.google.com/forum/#!msg/theano-users/MnSM9VRISaQ/7cGVdU6upGkJ
# print train_set_x.get_value()
# print train_set_y.owner.inputs[0].get_value()
# MNISTの手書き数字を分類するロジスティック回帰モデル
# 入力は28ピクセルx28ピクセルの画像、出力は0から9のラベル
classifier = LogisticRegression(n_in=28*28, n_out=10)
start_time = time.clock()
classifier.fit(train_set_x, train_set_y, valid_set_x, valid_set_y)
end_time = time.clock()
test_score = classifier.calc_error_rate(test_set_x, test_set_y)
print "error rate: %f %%" % (test_score * 100)
if __name__ == "__main__":
sgd_optimization_mnist()
#coding: utf-8
import cPickle
import gzip
import os
import sys
import time
import numpy as np
import theano
import theano.tensor as T
class LogisticRegression(object):
def __init__(self, input, n_in, n_out):
# 重み行列
self.W = theano.shared(value=np.zeros((n_in, n_out), dtype=theano.config.floatX),
name='W',
borrow=True)
# バイアスベクトル
self.b = theano.shared(value=np.zeros((n_out,), dtype=theano.config.floatX),
name='b',
borrow=True)
# どちらもシンボルの演算(計算方法のみ)
# この時点でinputもxというシンボル
self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W) + self.b)
self.y_pred = T.argmax(self.p_y_given_x, axis=1)
self.params = [self.W, self.b]
def negative_log_likelihood(self, y):
# 式通りに計算するとmeanではなくsum
return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]), y])
def errors(self, y):
# yは正解ラベル
if y.ndim != self.y_pred.ndim:
raise TypeError('y should have the same shape as self.y_pred',
('y', y.type, 'y_pred', self.y_pred.type))
if y.dtype.startswith('int'):
return T.mean(T.neq(self.y_pred, y))
else:
raise NotImplementedError()
def load_data(dataset):
"""データセットをロードしてGPUの共有変数に格納"""
f = gzip.open(dataset, 'rb')
train_set, valid_set, test_set = cPickle.load(f)
f.close()
def shared_dataset(data_xy, borrow=True):
data_x, data_y = data_xy
shared_x = theano.shared(np.asarray(data_x, dtype=theano.config.floatX), borrow=borrow)
shared_y = theano.shared(np.asarray(data_y, dtype=theano.config.floatX), borrow=borrow)
return shared_x, T.cast(shared_y, 'int32')
test_set_x, test_set_y = shared_dataset(test_set)
valid_set_x, valid_set_y = shared_dataset(valid_set)
train_set_x, train_set_y = shared_dataset(train_set)
rval = [(train_set_x, train_set_y),
(valid_set_x, valid_set_y),
(test_set_x, test_set_y)]
return rval
def sgd_optimization_mnist(learning_rate=0.13, n_epochs=1000, batch_size=600):
# 学習データの準備
datasets = load_data('mnist.pkl.gz')
train_set_x, train_set_y = datasets[0]
valid_set_x, valid_set_y = datasets[1]
test_set_x, test_set_y = datasets[2]
n_train_batches = train_set_x.get_value(borrow=True).shape[0] / batch_size
n_valid_batches = valid_set_x.get_value(borrow=True).shape[0] / batch_size
n_test_batches = test_set_x.get_value(borrow=True).shape[0] / batch_size
# https://groups.google.com/forum/#!msg/theano-users/MnSM9VRISaQ/7cGVdU6upGkJ
print train_set_x.get_value()
print train_set_y.owner.inputs[0].get_value()
# シンボルの割り当て
# ミニバッチのインデックスを表すシンボル
index = T.lscalar()
# 学習データとラベルを表すシンボル
X = T.matrix('X')
y = T.ivector('y')
# MNISTの手書き数字を分類するロジスティック回帰モデル
# 入力は28ピクセルx28ピクセルの画像、出力は0から9のラベル
classifier = LogisticRegression(input=X, n_in=28*28, n_out=10)
# コスト(最小化したい)
cost = classifier.negative_log_likelihood(y)
# 誤差を出力する関数をコンパイル
test_model = theano.function(
inputs=[index], # 入力となるシンボルのリスト
outputs=classifier.errors(y), # 出力となるシンボル
givens={ # ここで初めてシンボル x, y を具体的な値で置き換える
X: test_set_x[index * batch_size: (index + 1) * batch_size],
y: test_set_y[index * batch_size: (index + 1) * batch_size]
})
validate_model = theano.function(
inputs=[index],
outputs=classifier.errors(y),
givens={
X: valid_set_x[index * batch_size: (index + 1) * batch_size],
y: valid_set_y[index * batch_size: (index + 1) * batch_size]
})
# コスト関数の各パラメータでの微分を計算
g_W = T.grad(cost=cost, wrt=classifier.W)
g_b = T.grad(cost=cost, wrt=classifier.b)
updates = [(classifier.W, classifier.W - learning_rate * g_W),
(classifier.b, classifier.b - learning_rate * g_b)]
# この関数の呼び出し時にindexに具体的な値が初めて渡される
train_model = theano.function(
inputs=[index],
outputs=cost,
updates=updates,
givens={
X: train_set_x[index * batch_size: (index + 1) * batch_size],
y: train_set_y[index * batch_size: (index + 1) * batch_size]
})
# 訓練
print '... training the model'
# eary-stoppingパラメータ
patience = 5000
patience_increase = 2
improvement_threshold = 0.995
validation_frequency = min(n_train_batches, patience / 2)
best_validation_loss = np.inf
test_score = 0
start_time = time.clock()
done_looping = False
epoch = 0
while (epoch < n_epochs) and (not done_looping):
epoch = epoch + 1
for minibatch_index in xrange(n_train_batches):
minibatch_avg_cost = train_model(minibatch_index)
iter = (epoch - 1) * n_train_batches + minibatch_index
if (iter + 1) % validation_frequency == 0:
validation_losses = [validate_model(i) for i in xrange(n_valid_batches)]
this_validation_loss = np.mean(validation_losses)
print "epoch %i, minibatch %i/%i, validation error %f %%" % (epoch, minibatch_index + 1, n_train_batches, this_validation_loss * 100)
if this_validation_loss < best_validation_loss:
if this_validation_loss < best_validation_loss * improvement_threshold:
# 十分改善したならpatienceを上げて多くループを回せるようになる
patience = max(patience, iter * patience_increase)
best_validation_loss = this_validation_loss
test_losses = [test_model(i) for i in xrange(n_test_batches)]
test_score = np.mean(test_losses)
print " epoch %i, minibatch %i/%i, test error of best model %f %%" % (epoch, minibatch_index + 1, n_train_batches, test_score * 100)
# patienceを超えたらループを終了
if patience <= iter:
done_looping = True
break
end_time = time.clock()
print "Optimization complete with best validation score of %f %%, with test performance %f %%" % (best_validation_loss * 100, test_score * 100)
print "The code run for %d epochs, with %f epochs/sec" % (epoch, 1.0 * epoch / (end_time - start_time))
if __name__ == "__main__":
sgd_optimization_mnist()
#coding: utf-8
import numpy as np
import cPickle
import gzip
from sklearn import linear_model, datasets
from sklearn.metrics.metrics import confusion_matrix, classification_report
# scikit-learnを用いたロジスティック回帰
def load_data(dataset):
f = gzip.open(dataset, 'rb')
train_set, valid_set, test_set = cPickle.load(f)
f.close()
train_set_x, train_set_y = train_set
valid_set_x, valid_set_y = valid_set
test_set_x, test_set_y = test_set
rval = [(train_set_x, train_set_y),
(valid_set_x, valid_set_y),
(test_set_x, test_set_y)]
return rval
if __name__ == "__main__":
datasets = load_data('mnist.pkl.gz')
train_set_x, train_set_y = datasets[0]
valid_set_x, valid_set_y = datasets[1]
test_set_x, test_set_y = datasets[2]
print train_set_x.shape
print train_set_y.shape
logreg = linear_model.LogisticRegression()
logreg.fit(train_set_x, train_set_y)
predictions = logreg.predict(test_set_x)
print confusion_matrix(test_set_y, predictions)
print classification_report(test_set_y, predictions)
以上是关于python Theanoによるロジスティック回帰の実装的主要内容,如果未能解决你的问题,请参考以下文章