python Theanoによるロジスティック回帰の実装

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python Theanoによるロジスティック回帰の実装相关的知识,希望对你有一定的参考价值。

#coding: utf-8
import cPickle
import gzip
import os
import sys
import time

import numpy as np

import theano
import theano.tensor as T

# sklearn風のインタフェースで実装

class LogisticRegression(object):
    def __init__(self, n_in, n_out):
        # 重み行列
        self.W = theano.shared(value=np.zeros((n_in, n_out), dtype=theano.config.floatX),
                               name='W',
                               borrow=True)

        # バイアスベクトル
        self.b = theano.shared(value=np.zeros((n_out,), dtype=theano.config.floatX),
                               name='b',
                               borrow=True)

        self.params = [self.W, self.b]

    def p_y_given_x(self, X):
        return T.nnet.softmax(T.dot(X, self.W) + self.b)

    def negative_log_likelihood(self, X, y):
        # 式通りに計算するとmeanではなくsum
        return -T.mean(T.log(self.p_y_given_x(X))[T.arange(y.shape[0]), y])

    def errors(self, X, y):
        # yは正解ラベル
        if y.ndim != self.predict(X).ndim:
            raise TypeError('y should have the same shape as self.predict',
                            ('y', y.type, 'predict', self.predict.type))
        if y.dtype.startswith('int'):
            return T.mean(T.neq(self.predict(X), y))
        else:
            raise NotImplementedError()

    def fit(self, train_set_x, train_set_y, valid_set_x, valid_set_y, learning_rate=0.13, n_epochs=1000, batch_size=600):
        n_train_batches = train_set_x.get_value(borrow=True).shape[0] / batch_size
        n_valid_batches = valid_set_x.get_value(borrow=True).shape[0] / batch_size

        index = T.lscalar()

        X = T.matrix('X')
        y = T.ivector('y')

        # コスト(最小化したい)
        cost = self.negative_log_likelihood(X, y)

        validate_model = theano.function(
            inputs=[index],
            outputs=self.errors(X, y),
            givens={
                X: valid_set_x[index * batch_size: (index + 1) * batch_size],
                y: valid_set_y[index * batch_size: (index + 1) * batch_size]
            })

        # コスト関数の各パラメータでの微分を計算
        g_W = T.grad(cost=cost, wrt=self.W)
        g_b = T.grad(cost=cost, wrt=self.b)

        updates = [(self.W, self.W - learning_rate * g_W),
                   (self.b, self.b - learning_rate * g_b)]

        # この関数の呼び出し時にindexに具体的な値が初めて渡される
        train_model = theano.function(
            inputs=[index],
            outputs=cost,
            updates=updates,
            givens={
                X: train_set_x[index * batch_size: (index + 1) * batch_size],
                y: train_set_y[index * batch_size: (index + 1) * batch_size]
            })

        # 訓練
        print '... training the model'

        # eary-stoppingパラメータ
        patience = 5000
        patience_increase = 2
        improvement_threshold = 0.995
        validation_frequency = min(n_train_batches, patience / 2)

        best_validation_loss = np.inf
        done_looping = False
        epoch = 0
        while (epoch < n_epochs) and (not done_looping):
            epoch = epoch + 1
            for minibatch_index in xrange(n_train_batches):
                train_model(minibatch_index)

                iter = (epoch - 1) * n_train_batches + minibatch_index
                if (iter + 1) % validation_frequency == 0:
                    validation_losses = [validate_model(i) for i in xrange(n_valid_batches)]
                    this_validation_loss = np.mean(validation_losses)
                    print "epoch %i, minibatch %i/%i, validation error %f %%" % (epoch, minibatch_index + 1, n_train_batches, this_validation_loss * 100)

                    if this_validation_loss < best_validation_loss:
                        if this_validation_loss < best_validation_loss * improvement_threshold:
                            # 十分改善したならpatienceを上げて多くループを回せるようになる
                            patience = max(patience, iter * patience_increase)
                        best_validation_loss = this_validation_loss

                # patienceを超えたらループを終了
                if patience <= iter:
                    done_looping = True
                    break

    def predict(self, X):
        return T.argmax(self.p_y_given_x(X), axis=1)

    def calc_error_rate(self, test_set_x, test_set_y, batch_size=600):
        n_test_batches = test_set_x.get_value(borrow=True).shape[0] / batch_size
        index = T.lscalar()

        X = T.matrix('X')
        y = T.ivector('y')

        # 誤差を出力する関数をコンパイル
        test_model = theano.function(
            inputs=[index],                # 入力となるシンボルのリスト
            outputs=self.errors(X, y),  # 出力となるシンボル
            givens={  # ここで初めてシンボル x, y を具体的な値で置き換える
                X: test_set_x[index * batch_size: (index + 1) * batch_size],
                y: test_set_y[index * batch_size: (index + 1) * batch_size]
            })
        test_losses = [test_model(i) for i in xrange(n_test_batches)]
        test_score = np.mean(test_losses)
        return test_score

def load_data(dataset):
    """データセットをロードしてGPUの共有変数に格納"""
    f = gzip.open(dataset, 'rb')
    train_set, valid_set, test_set = cPickle.load(f)
    f.close()

    def shared_dataset(data_xy, borrow=True):
        data_x, data_y = data_xy
        shared_x = theano.shared(np.asarray(data_x, dtype=theano.config.floatX), borrow=borrow)
        shared_y = theano.shared(np.asarray(data_y, dtype=theano.config.floatX), borrow=borrow)
        return shared_x, T.cast(shared_y, 'int32')

    test_set_x, test_set_y = shared_dataset(test_set)
    valid_set_x, valid_set_y = shared_dataset(valid_set)
    train_set_x, train_set_y = shared_dataset(train_set)

    rval = [(train_set_x, train_set_y),
            (valid_set_x, valid_set_y),
            (test_set_x, test_set_y)]
    return rval

def sgd_optimization_mnist(learning_rate=0.13, n_epochs=1000, batch_size=600):
    # 学習データの準備
    datasets = load_data('mnist.pkl.gz')

    train_set_x, train_set_y = datasets[0]
    valid_set_x, valid_set_y = datasets[1]
    test_set_x, test_set_y = datasets[2]

    # https://groups.google.com/forum/#!msg/theano-users/MnSM9VRISaQ/7cGVdU6upGkJ
#     print train_set_x.get_value()
#     print train_set_y.owner.inputs[0].get_value()

    # MNISTの手書き数字を分類するロジスティック回帰モデル
    # 入力は28ピクセルx28ピクセルの画像、出力は0から9のラベル
    classifier = LogisticRegression(n_in=28*28, n_out=10)

    start_time = time.clock()
    classifier.fit(train_set_x, train_set_y, valid_set_x, valid_set_y)
    end_time = time.clock()

    test_score = classifier.calc_error_rate(test_set_x, test_set_y)

    print "error rate: %f %%" % (test_score * 100)

if __name__ == "__main__":
    sgd_optimization_mnist()
#coding: utf-8
import cPickle
import gzip
import os
import sys
import time

import numpy as np

import theano
import theano.tensor as T

class LogisticRegression(object):
    def __init__(self, input, n_in, n_out):
        # 重み行列
        self.W = theano.shared(value=np.zeros((n_in, n_out), dtype=theano.config.floatX),
                               name='W',
                               borrow=True)

        # バイアスベクトル
        self.b = theano.shared(value=np.zeros((n_out,), dtype=theano.config.floatX),
                               name='b',
                               borrow=True)

        # どちらもシンボルの演算(計算方法のみ)
        # この時点でinputもxというシンボル
        self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W) + self.b)
        self.y_pred = T.argmax(self.p_y_given_x, axis=1)

        self.params = [self.W, self.b]

    def negative_log_likelihood(self, y):
        # 式通りに計算するとmeanではなくsum
        return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]), y])

    def errors(self, y):
        # yは正解ラベル
        if y.ndim != self.y_pred.ndim:
            raise TypeError('y should have the same shape as self.y_pred',
                            ('y', y.type, 'y_pred', self.y_pred.type))
        if y.dtype.startswith('int'):
            return T.mean(T.neq(self.y_pred, y))
        else:
            raise NotImplementedError()

def load_data(dataset):
    """データセットをロードしてGPUの共有変数に格納"""
    f = gzip.open(dataset, 'rb')
    train_set, valid_set, test_set = cPickle.load(f)
    f.close()

    def shared_dataset(data_xy, borrow=True):
        data_x, data_y = data_xy
        shared_x = theano.shared(np.asarray(data_x, dtype=theano.config.floatX), borrow=borrow)
        shared_y = theano.shared(np.asarray(data_y, dtype=theano.config.floatX), borrow=borrow)
        return shared_x, T.cast(shared_y, 'int32')

    test_set_x, test_set_y = shared_dataset(test_set)
    valid_set_x, valid_set_y = shared_dataset(valid_set)
    train_set_x, train_set_y = shared_dataset(train_set)

    rval = [(train_set_x, train_set_y),
            (valid_set_x, valid_set_y),
            (test_set_x, test_set_y)]
    return rval

def sgd_optimization_mnist(learning_rate=0.13, n_epochs=1000, batch_size=600):
    # 学習データの準備
    datasets = load_data('mnist.pkl.gz')

    train_set_x, train_set_y = datasets[0]
    valid_set_x, valid_set_y = datasets[1]
    test_set_x, test_set_y = datasets[2]

    n_train_batches = train_set_x.get_value(borrow=True).shape[0] / batch_size
    n_valid_batches = valid_set_x.get_value(borrow=True).shape[0] / batch_size
    n_test_batches = test_set_x.get_value(borrow=True).shape[0] / batch_size

    # https://groups.google.com/forum/#!msg/theano-users/MnSM9VRISaQ/7cGVdU6upGkJ
    print train_set_x.get_value()
    print train_set_y.owner.inputs[0].get_value()

    # シンボルの割り当て
    # ミニバッチのインデックスを表すシンボル
    index = T.lscalar()

    # 学習データとラベルを表すシンボル
    X = T.matrix('X')
    y = T.ivector('y')

    # MNISTの手書き数字を分類するロジスティック回帰モデル
    # 入力は28ピクセルx28ピクセルの画像、出力は0から9のラベル
    classifier = LogisticRegression(input=X, n_in=28*28, n_out=10)

    # コスト(最小化したい)
    cost = classifier.negative_log_likelihood(y)

    # 誤差を出力する関数をコンパイル
    test_model = theano.function(
        inputs=[index],                # 入力となるシンボルのリスト
        outputs=classifier.errors(y),  # 出力となるシンボル
        givens={  # ここで初めてシンボル x, y を具体的な値で置き換える
            X: test_set_x[index * batch_size: (index + 1) * batch_size],
            y: test_set_y[index * batch_size: (index + 1) * batch_size]
        })

    validate_model = theano.function(
        inputs=[index],
        outputs=classifier.errors(y),
        givens={
            X: valid_set_x[index * batch_size: (index + 1) * batch_size],
            y: valid_set_y[index * batch_size: (index + 1) * batch_size]
        })

    # コスト関数の各パラメータでの微分を計算
    g_W = T.grad(cost=cost, wrt=classifier.W)
    g_b = T.grad(cost=cost, wrt=classifier.b)

    updates = [(classifier.W, classifier.W - learning_rate * g_W),
               (classifier.b, classifier.b - learning_rate * g_b)]

    # この関数の呼び出し時にindexに具体的な値が初めて渡される
    train_model = theano.function(
        inputs=[index],
        outputs=cost,
        updates=updates,
        givens={
            X: train_set_x[index * batch_size: (index + 1) * batch_size],
            y: train_set_y[index * batch_size: (index + 1) * batch_size]
        })

    # 訓練
    print '... training the model'

    # eary-stoppingパラメータ
    patience = 5000
    patience_increase = 2
    improvement_threshold = 0.995
    validation_frequency = min(n_train_batches, patience / 2)

    best_validation_loss = np.inf
    test_score = 0
    start_time = time.clock()

    done_looping = False
    epoch = 0
    while (epoch < n_epochs) and (not done_looping):
        epoch = epoch + 1
        for minibatch_index in xrange(n_train_batches):
            minibatch_avg_cost = train_model(minibatch_index)

            iter = (epoch - 1) * n_train_batches + minibatch_index
            if (iter + 1) % validation_frequency == 0:
                validation_losses = [validate_model(i) for i in xrange(n_valid_batches)]
                this_validation_loss = np.mean(validation_losses)
                print "epoch %i, minibatch %i/%i, validation error %f %%" % (epoch, minibatch_index + 1, n_train_batches, this_validation_loss * 100)

                if this_validation_loss < best_validation_loss:
                    if this_validation_loss < best_validation_loss * improvement_threshold:
                        # 十分改善したならpatienceを上げて多くループを回せるようになる
                        patience = max(patience, iter * patience_increase)
                    best_validation_loss = this_validation_loss

                    test_losses = [test_model(i) for i in xrange(n_test_batches)]
                    test_score = np.mean(test_losses)
                    print "    epoch %i, minibatch %i/%i, test error of best model %f %%" % (epoch, minibatch_index + 1, n_train_batches, test_score * 100)

            # patienceを超えたらループを終了
            if patience <= iter:
                done_looping = True
                break

    end_time = time.clock()
    print "Optimization complete with best validation score of %f %%, with test performance %f %%" % (best_validation_loss * 100, test_score * 100)
    print "The code run for %d epochs, with %f epochs/sec" % (epoch, 1.0 * epoch / (end_time - start_time))

if __name__ == "__main__":
    sgd_optimization_mnist()
#coding: utf-8
import numpy as np
import cPickle
import gzip
from sklearn import linear_model, datasets
from sklearn.metrics.metrics import confusion_matrix, classification_report

# scikit-learnを用いたロジスティック回帰

def load_data(dataset):
    f = gzip.open(dataset, 'rb')
    train_set, valid_set, test_set = cPickle.load(f)
    f.close()

    train_set_x, train_set_y = train_set
    valid_set_x, valid_set_y = valid_set
    test_set_x, test_set_y = test_set

    rval = [(train_set_x, train_set_y),
            (valid_set_x, valid_set_y),
            (test_set_x, test_set_y)]
    return rval

if __name__ == "__main__":
    datasets = load_data('mnist.pkl.gz')

    train_set_x, train_set_y = datasets[0]
    valid_set_x, valid_set_y = datasets[1]
    test_set_x, test_set_y = datasets[2]

    print train_set_x.shape
    print train_set_y.shape

    logreg = linear_model.LogisticRegression()
    logreg.fit(train_set_x, train_set_y)
    predictions = logreg.predict(test_set_x)
    print confusion_matrix(test_set_y, predictions)
    print classification_report(test_set_y, predictions)

以上是关于python Theanoによるロジスティック回帰の実装的主要内容,如果未能解决你的问题,请参考以下文章

python 2クラスのロジスティック回帰(确率的勾配降下法版)

python Theanoによる自己符号化器の実装

python Theanoによる积层自己符号化器の実装

python Theanoによる雑音除去自己符号化器の実装

python 多项式曲线フィッティング

python KerasによるCNNの実装例