Python TensorFlow实现Sequential深度神经网络回归

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python TensorFlow实现Sequential深度神经网络回归相关的知识,希望对你有一定的参考价值。

  本文介绍基于Python语言中TensorFlowKeras接口,实现深度神经网络回归的方法。

(基于Python TensorFlow Keras的深度学习回归代码——keras.Sequential深度神经网络)

1 写在前面

  前期一篇文章TensorFlow DNNRegressor实现深度学习的代码详细介绍了基于TensorFlow tf.estimator接口的深度学习网络;而在TensorFlow 2.0中,新的Keras接口具有与 tf.estimator接口一致的功能,且其更易于学习,对于新手而言友好程度更高;在TensorFlow官网也建议新手从Keras接口入手开始学习。因此,本文结合TensorFlow Keras接口,加以深度学习回归的详细介绍与代码实战。

  和上述博客类似,本文第二部分为代码的分解介绍,第三部分为完整代码。一些在上述博客介绍过的内容,在本文中就省略了,大家如果有需要可以先查看上述文章TensorFlow DNNRegressor实现深度学习的代码

  相关版本信息:Python版本:3.8.5TensorFlow版本:2.4.1;编译器版本:Spyder 4.1.5

2 代码分解介绍

2.1 准备工作

  首先需要引入相关的库与包。

import os
import glob
import openpyxl
import numpy as np
import pandas as pd
import seaborn as sns
import tensorflow as tf
import scipy.stats as stats
import matplotlib.pyplot as plt
from sklearn import metrics
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import regularizers
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.layers.experimental import preprocessing

  由于后续代码执行过程中,会有很多数据的展示与输出,其中多数数据都带有小数部分;为了让程序所显示的数据更为整齐、规范,我们可以对代码的浮点数、数组与NumPy对象对应的显示规则加以约束。

np.set_printoptions(precision=4,suppress=True)

  其中,precision设置小数点后显示的位数,默认为8suppress表示是否使用定点计数法(即与科学计数法相对)。

2.2 参数配置

  深度学习代码一大特点即为具有较多的参数需要我们手动定义。为避免调参时上下翻找,我们可以将主要的参数集中在一起,方便我们后期调整。

  其中,具体参数的含义在本文后续部分详细介绍。

# Input parameters.
DataPath="G:/CropYield/03_DL/00_Data/AllDataAll.csv"
ModelPath="G:/CropYield/03_DL/02_DNNModle"
CheckPointPath="G:/CropYield/03_DL/02_DNNModle/Weights"
CheckPointName=CheckPointPath+"/Weights_epoch:03d_val_loss:.4f.hdf5"
ParameterPath="G:/CropYield/03_DL/03_OtherResult/ParameterResult.xlsx"
TrainFrac=0.8
RandomSeed=np.random.randint(low=21,high=22)
CheckPointMethod=val_loss
HiddenLayer=[64,128,256,512,512,1024,1024]
RegularizationFactor=0.0001
ActivationMethod=relu
DropoutValue=[0.5,0.5,0.5,0.3,0.3,0.3,0.2]
OutputLayerActMethod=linear
LossMethod=mean_absolute_error
LearnRate=0.005
LearnDecay=0.0005
FitEpoch=500
BatchSize=9999
ValFrac=0.2
BestEpochOptMethod=adam

2.3 数据导入与数据划分

  我的数据已经保存在了.csv文件中,因此可以用pd.read_csv直接读取。

  其中,数据的每一列是一个特征,每一行是全部特征与因变量(就是下面的Yield)组合成的样本。

# Fetch and divide data.
MyData=pd.read_csv(DataPath,names=[EVI0610,EVI0626,EVI0712,EVI0728,EVI0813,EVI0829,
                                   EVI0914,EVI0930,EVI1016,Lrad06,Lrad07,Lrad08,
                                   Lrad09,Lrad10,Prec06,Prec07,Prec08,Prec09,
                                   Prec10,Pres06,Pres07,Pres08,Pres09,Pres10,
                                   SIF161,SIF177,SIF193,SIF209,SIF225,SIF241,
                                   SIF257,SIF273,SIF289,Shum06,Shum07,Shum08,
                                   Shum09,Shum10,SoilType,Srad06,Srad07,Srad08,
                                   Srad09,Srad10,Temp06,Temp07,Temp08,Temp09,
                                   Temp10,Wind06,Wind07,Wind08,Wind09,Wind10,
                                   Yield],header=0)

  随后,对导入的数据划分训练集与测试集。

TrainData=MyData.sample(frac=TrainFrac,random_state=RandomSeed)
TestData=MyData.drop(TrainData.index)

  其中,TrainFrac为训练集(包括验证数据)所占比例,RandomSeed为随即划分数据时所用的随机数种子。

2.4 联合分布图绘制

  在开始深度学习前,我们可以分别对输入数据的不同特征与因变量的关系加以查看。绘制联合分布图就是一种比较好的查看多个变量之间关系的方法。我们用seaborn来实现这一过程。seaborn是一个基于matplotlibPython数据可视化库,使得我们可以通过较为简单的操作,绘制出动人的图片。代码如下:

# Draw the joint distribution image.
def JointDistribution(Factors):
    plt.figure(1)
    sns.pairplot(TrainData[Factors],kind=reg,diag_kind=kde)
    sns.set(font_scale=2.0)
    DataDistribution=TrainData.describe().transpose()
    
# Draw the joint distribution image.
JointFactor=[Lrad07,Prec06,SIF161,Shum06,Srad07,Srad08,Srad10,Temp06,Yield]
JointDistribution(JointFactor)

  其中,JointFactor为需要绘制联合分布图的特征名称,JointDistribution函数中的kind表示联合分布图中非对角线图的类型,可选regscatterkdehistreg代表在图片中加入一条拟合直线,scatter就是不加入这条直线,kde是等高线的形式,hist就是类似于栅格地图的形式;diag_kind表示联合分布图中对角线图的类型,可选histkdehist代表直方图,kde代表直方图曲线化。font_scale是图中的字体大小。JointDistribution函数中最后一句是用来展示TrainData中每一项特征数据的统计信息,包括最大值、最小值、平均值、分位数等。

  图片绘制的示例如下:

  要注意,绘制联合分布图比较慢,建议大家不要选取太多的变量,否则程序会卡在这里比较长的时间。

2.5 因变量分离与数据标准化

  因变量分离我们就不再多解释啦;接下来,我们要知道,对于机器学习、深度学习而言,数据标准化是十分重要的——用官网所举的一个例子:不同的特征在神经网络中会乘以相同的权重weight,因此输入数据的尺度(即数据不同特征之间的大小关系)将会影响到输出数据与梯度的尺度;因此,数据标准化可以使得模型更加稳定。

  在这里,首先说明数据标准化与归一化的区别。

  标准化即将训练集中某列的值缩放成均值为0,方差为1的状态;而归一化是将训练集中某列的值缩放到01之间。而在机器学习中,标准化较之归一化通常具有更高的使用频率,且标准化后的数据在神经网络训练时,其收敛将会更快。

  最后,一定要记得——标准化时只需要对训练集数据加以处理,不要把测试集Test的数据引入了!因为标准化只需要对训练数据加以处理,引入测试集反而会影响标准化的作用。

# Separate independent and dependent variables.
TrainX=TrainData.copy(deep=True)
TestX=TestData.copy(deep=True)
TrainY=TrainX.pop(Yield)
TestY=TestX.pop(Yield)

# Standardization data.
Normalizer=preprocessing.Normalization()
Normalizer.adapt(np.array(TrainX))

  在这里,我们直接运用preprocessing.Normalization()建立一个预处理层,其具有数据标准化的功能;随后,通过.adapt()函数将需要标准化的数据(即训练集的自变量)放入这一层,便可以实现数据的标准化操作。

2.6 原有模型删除

  我们的程序每执行一次,便会在指定路径中保存当前运行的模型。为保证下一次模型保存时不受上一次模型运行结果干扰,我们可以将模型文件夹内的全部文件删除。

# Delete the model result from the last run.
def DeleteOldModel(ModelPath):
    AllFileName=os.listdir(ModelPath)
    for i in AllFileName:
        NewPath=os.path.join(ModelPath,i)
        if os.path.isdir(NewPath):
            DeleteOldModel(NewPath)
        else:
            os.remove(NewPath)
          
# Delete the model result from the last run.
DeleteOldModel(ModelPath)

  这一部分的代码在文章TensorFlow DNNRegressor实现深度学习的代码有详细的讲解,这里就不再重复。

2.7 最优Epoch保存与读取

  在我们训练模型的过程中,会让模型运行几百个Epoch(一个Epoch即全部训练集数据样本均进入模型训练一次);而由于每一次的Epoch所得到的精度都不一样,那么我们自然需要挑出几百个Epoch中最优秀的那一个Epoch

# Find and save optimal epoch.
def CheckPoint(Name):
    Checkpoint=ModelCheckpoint(Name,
                               monitor=CheckPointMethod,
                               verbose=1,
                               save_best_only=True,
                               mode=auto)
    CallBackList=[Checkpoint]
    return CallBackList

# Find and save optimal epochs.
CallBack=CheckPoint(CheckPointName)

  其中,Name就是保存Epoch的路径与文件名命名方法;monitor是我们挑选最优Epoch的依据,在这里我们用验证集数据对应的误差来判断这个Epoch是不是我们想要的;verbose用来设置输出日志的内容,我们用1就好;save_best_only用来确定我们是否只保存被认定为最优的Epochmode用以判断我们的monitor是越大越好还是越小越好,前面提到了我们的monitor是验证集数据对应的误差,那么肯定是误差越小越好,所以这里可以用automin,其中auto是模型自己根据用户选择的monitor方法来判断越大越好还是越小越好。

  找到最优Epoch后,将其传递给CallBack。需要注意的是,这里的最优Epoch是多个Epoch——因为每一次Epoch只要获得了当前模型所遇到的最优解,它就会保存;下一次再遇见一个更好的解时,同样保存,且不覆盖上一次的Epoch。可以这么理解,假如一共有三次Epoch,所得到的误差分别为574;那么我们保存的Epoch就是第一次和第三次。

2.8 模型构建

  Keras接口下的模型构建就很清晰明了了。相信大家在看了前期一篇文章TensorFlow DNNRegressor实现深度学习的代码后,结合代码旁的注释就理解啦。

# Build DNN model.
def BuildModel(Norm):
    Model=keras.Sequential([Norm, # 数据标准化层
                            
                            layers.Dense(HiddenLayer[0], # 指定隐藏层1的神经元个数
                                         kernel_regularizer=regularizers.l2(RegularizationFactor), # 运用L2正则化
                                         # activation=ActivationMethod
                                         ),
                            layers.LeakyReLU(), # 引入LeakyReLU这一改良的ReLU激活函数,从而加快模型收敛,减少过拟合
                            layers.BatchNormalization(), # 引入Batch Normalizing,加快网络收敛与增强网络稳固性
                            layers.Dropout(DropoutValue[0]), # 指定隐藏层1的Dropout值
                            
                            layers.Dense(HiddenLayer[1],
                                         kernel_regularizer=regularizers.l2(RegularizationFactor),
                                         # activation=ActivationMethod
                                         ),
                            layers.LeakyReLU(),
                            layers.BatchNormalization(),
                            layers.Dropout(DropoutValue[1]),
                            
                            layers.Dense(HiddenLayer[2],
                                         kernel_regularizer=regularizers.l2(RegularizationFactor),
                                         # activation=ActivationMethod
                                         ),
                            layers.LeakyReLU(),
                            layers.BatchNormalization(),
                            layers.Dropout(DropoutValue[2]),
                            
                            layers.Dense(HiddenLayer[3],
                                         kernel_regularizer=regularizers.l2(RegularizationFactor),
                                         # activation=ActivationMethod
                                         ),
                            layers.LeakyReLU(),
                            layers.BatchNormalization(),
                            layers.Dropout(DropoutValue[3]),
                            
                            layers.Dense(HiddenLayer[4],
                                         kernel_regularizer=regularizers.l2(RegularizationFactor),
                                         # activation=ActivationMethod
                                         ),
                            layers.LeakyReLU(),
                            layers.BatchNormalization(),
                            layers.Dropout(DropoutValue[4]),
                            
                            layers.Dense(HiddenLayer[5],
                                         kernel_regularizer=regularizers.l2(RegularizationFactor),
                                         # activation=ActivationMethod
                                         ),
                            layers.LeakyReLU(),
                            layers.BatchNormalization(),
                            layers.Dropout(DropoutValue[5]),
                            
                            layers.Dense(HiddenLayer[6],
                                         kernel_regularizer=regularizers.l2(RegularizationFactor),
                                         # activation=ActivationMethod
                                         ),
                            layers.LeakyReLU(),
                            # If batch normalization is set in the last hidden layer, the error image
                            # will show a trend of first stable and then decline; otherwise, it will 
                            # decline and then stable.
                            # layers.BatchNormalization(),
                            layers.Dropout(DropoutValue[6]),
                            
                            layers.Dense(units=1,
                                         activation=OutputLayerActMethod)]) # 最后一层就是输出层
    Model.compile(loss=LossMethod, # 指定每个批次训练误差的减小方法
                  optimizer=tf.keras.optimizers.Adam(learning_rate=LearnRate,decay=LearnDecay)) 
                  # 运用学习率下降的优化方法
    return Model
   
# Build DNN regression model.
DNNModel=BuildModel(Normalizer)
DNNModel.summary()
DNNHistory=DNNModel.fit(TrainX,
                        TrainY,
                        epochs=FitEpoch,
                        # batch_size=BatchSize,
                        verbose=1,
                        callbacks=CallBack,
                        validation_split=ValFrac)

  在这里,.summary()查看模型摘要,validation_split为在训练数据中,取出ValFrac所指定比例的一部分作为验证数据。DNNHistory则记录了模型训练过程中的各类指标变化情况,接下来我们可以基于其绘制模型训练过程的误差变化图像。

2.9 训练图像绘制

  机器学习中,过拟合是影响训练精度的重要因素。因此,我们最好在训练模型的过程中绘制训练数据、验证数据的误差变化图象,从而更好获取模型的训练情况。

# Draw error image.
def LossPlot(History):
    plt.figure(2)
    plt.plot(History.history[loss],label=loss)
    plt.plot(History.history[val_loss],label=val_loss)
    plt.ylim([0,4000])
    plt.xlabel(Epoch)
    plt.ylabel(Error)
    plt.legend()
    plt.grid(True)

# Draw error image.
LossPlot(DNNHistory)

  其中,lossval_loss分别是模型训练过程中,训练集、验证集对应的误差;如果训练集误差明显小于验证集误差,就说明模型出现了过拟合。

2.10 最优Epoch选取

  前面提到了,我们将多个符合要求的Epoch保存在了指定的路径下,那么最终我们可以从中选取最好的那个Epoch,作为模型的最终参数,从而对测试集数据加以预测。那么在这里,我们需要将这一全局最优Epoch选取出,并带入到最终的模型里。

# Optimize the model based on optimal epoch.
def BestEpochIntoModel(Path,Model):
    EpochFile=glob.glob(Path+/*)
    BestEpoch=max(EpochFile,key=os.path.getmtime)
    Model.load_weights(BestEpoch)
    Model.compile(loss=LossMethod,
                  optimizer=BestEpochOptMethod)
    return Model

# Optimize the model based on optimal epoch.
DNNModel=BestEpochIntoModel(CheckPointPath,DNNModel)

  总的来说,这里就是运用了os.path.getmtime模块,将我们存储Epoch的文件夹中最新的那个Epoch挑出来——这一Epoch就是使得验证集数据误差最小的全局最优Epoch;并通过load_weights将这一Epoch对应的模型参数引入模型。

2.11 模型测试、拟合图像绘制、精度验证与模型参数与结果保存

  前期一篇文章TensorFlow DNNRegressor实现深度学习的代码中有相关的代码讲解内容,因此这里就不再赘述啦。

# Draw Test image.
def TestPlot(TestY,TestPrediction):
    plt.figure(3)
    ax=plt.axes(aspect=equal)
    plt.scatter(TestY,TestPrediction)
    plt.xlabel(True Values)
    plt.ylabel(Predictions)
    Lims=[0,10000]
    plt.xlim(Lims)
    plt.ylim(Lims)
    plt.plot(Lims,Lims)
    plt.grid(False)

# Verify the accuracy and draw error hist image.
def AccuracyVerification(TestY,TestPrediction):
    DNNError=TestPrediction-TestY
    plt.figure(4)
    plt.hist(DNNError,bins=30)
    plt.xlabel(Prediction Error)
    plt.ylabel(Count)
    plt.grid(False)
    Pearsonr=stats.pearsonr(TestY,TestPrediction)
    R2=metrics.r2_score(TestY,TestPrediction)
    RMSE=metrics.mean_squared_error(TestY,TestPrediction)**0.5
    print(Pearson correlation coefficient is 0, and RMSE is 1..format(Pearsonr[0],RMSE))
    return (Pearsonr[0],R2,RMSE)

# Save key parameters.
def WriteAccuracy(*WriteVar):
    ExcelData=openpyxl.load_workbook(WriteVar[0])
    SheetName=ExcelData.get_sheet_names()
    WriteSheet=ExcelData.get_sheet_by_name(SheetName[0])
    WriteSheet=ExcelData.active
    MaxRowNum=WriteSheet.max_row
    for i in range(len(WriteVar)-1):
        exec("WriteSheet.cell(MaxRowNum+1,i+1).value=WriteVar[i+1]")
    ExcelData.save(WriteVar[0])

# Predict test set data.
TestPrediction=DNNModel.predict(TestX).flatten()

# Draw Test image.
TestPlot(TestY,TestPrediction)

# Verify the accuracy and draw error hist image.
AccuracyResult=AccuracyVerification(TestY,TestPrediction)
PearsonR,R2,RMSE=AccuracyResult[0],AccuracyResult[1],AccuracyResult[2]

# Save model and key parameters.
DNNModel.save(ModelPath)
WriteAccuracy(ParameterPath,PearsonR,R2,RMSE,TrainFrac,RandomSeed,CheckPointMethod,
              ,.join(%s %i for i in HiddenLayer),RegularizationFactor,
              ActivationMethod,,.join(%s %i for i in DropoutValue),OutputLayerActMethod,
              LossMethod,LearnRate,LearnDecay,FitEpoch,BatchSize,ValFrac,BestEpochOptMethod)

  得到拟合图像如下:

  得到误差分布直方图如下:

  至此,代码的分解介绍就结束啦~

3 完整代码

# -*- coding: utf-8 -*-
"""
Created on Tue Feb 24 12:42:17 2021

@author: fkxxgis
"""

import os
import glob
import openpyxl
import numpy as np
import pandas as pd
import seaborn as sns
import tensorflow as tf
import scipy.stats as stats
import matplotlib.pyplot as plt
from sklearn import metrics
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import regularizers
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.layers.experimental import preprocessing

np.set_printoptions(precision=4,suppress=True)

# Draw the joint distribution image.
def JointDistribution(Factors):
    plt.figure(1)
    sns.pairplot(TrainData[Factors],kind=reg,diag_kind=kde)
    sns.set(font_scale=2.0)
    DataDistribution=TrainData.describe().transpose()

# Delete the model result from the last run.
def DeleteOldModel(ModelPath):
    AllFileName=os.listdir(ModelPath)
    for i in AllFileName:
        NewPath=os.path.join(ModelPath,i)
        if os.path.isdir(NewPath):
            DeleteOldModel(NewPath)
        else:
            os.remove(NewPath)

# Find and save optimal epoch.
def CheckPoint(Name):
    Checkpoint=ModelCheckpoint(Name,
                               monitor=CheckPointMethod,
                               verbose=1,
                               save_best_only=True,
                               mode=auto)
    CallBackList=[Checkpoint]
    return CallBackList

# Build DNN model.
def BuildModel(Norm):
    Model=keras.Sequential([Norm, # 数据标准化层
                            
                            layers.Dense(HiddenLayer[0], # 指定隐藏层1的神经元个数
                                         kernel_regularizer=regularizers.l2(RegularizationFactor), # 运用L2正则化
                                         # activation=ActivationMethod
                                         ),
                            layers.LeakyReLU(), # 引入LeakyReLU这一改良的ReLU激活函数,从而加快模型收敛,减少过拟合
                            layers.BatchNormalization(), # 引入Batch Normalizing,加快网络收敛与增强网络稳固性
                            layers.Dropout(DropoutValue[0]), # 指定隐藏层1的Dropout值
                            
                            layers.Dense(HiddenLayer[1],
                                         kernel_regularizer=regularizers.l2(RegularizationFactor),
                                         # activation=ActivationMethod
                                         ),
                            layers.LeakyReLU(),
                            layers.BatchNormalization(),
                            layers.Dropout(DropoutValue[1]),
                            
                            layers.Dense(HiddenLayer[2],
                                         kernel_regularizer=regularizers.l2(RegularizationFactor),
                                         # activation=ActivationMethod
                                         ),
                            layers.LeakyReLU(),
                            layers.BatchNormalization(),
                            layers.Dropout(DropoutValue[2]),
                            
                            layers.Dense(HiddenLayer[3],
                                         kernel_regularizer=regularizers.l2(RegularizationFactor),
                                         # activation=ActivationMethod
                                         ),
                            layers.LeakyReLU(),
                            layers.BatchNormalization(),
                            layers.Dropout(DropoutValue[3]),
                            
                            layers.Dense(HiddenLayer[4],
                                         kernel_regularizer=regularizers.l2(RegularizationFactor),
                                         # activation=ActivationMethod
                                         ),
                            layers.LeakyReLU(),
                            layers.BatchNormalization(),
                            layers.Dropout(DropoutValue[4]),
                            
                            layers.Dense(HiddenLayer[5],
                                         kernel_regularizer=regularizers.l2(RegularizationFactor),
                                         # activation=ActivationMethod
                                         ),
                            layers.LeakyReLU(),
                            layers.BatchNormalization(),
                            layers.Dropout(DropoutValue[5]),
                            
                            layers.Dense(HiddenLayer[6],
                                         kernel_regularizer=regularizers.l2(RegularizationFactor),
                                         # activation=ActivationMethod
                                         ),
                            layers.LeakyReLU(),
                            # If batch normalization is set in the last hidden layer, the error image
                            # will show a trend of first stable and then decline; otherwise, it will 
                            # decline and then stable.
                            # layers.BatchNormalization(),
                            layers.Dropout(DropoutValue[6]),
                            
                            layers.Dense(units=1,
                                         activation=OutputLayerActMethod)]) # 最后一层就是输出层
    Model.compile(loss=LossMethod, # 指定每个批次训练误差的减小方法
                  optimizer=tf.keras.optimizers.Adam(learning_rate=LearnRate,decay=LearnDecay)) 
                  # 运用学习率下降的优化方法
    return Model

# Draw error image.
def LossPlot(History):
    plt.figure(2)
    plt.plot(History.history[loss],label=loss)
    plt.plot(History.history[val_loss],label=val_loss)
    plt.ylim([0,4000])
    plt.xlabel(Epoch)
    plt.ylabel(Error)
    plt.legend()
    plt.grid(True)

# Optimize the model based on optimal epoch.
def BestEpochIntoModel(Path,Model):
    EpochFile=glob.glob(Path+/*)
    BestEpoch=max(EpochFile,key=os.path.getmtime)
    Model.load_weights(BestEpoch)
    Model.compile(loss=LossMethod,
                  optimizer=BestEpochOptMethod)
    return Model

# Draw Test image.
def TestPlot(TestY,TestPrediction):
    plt.figure(3)
    ax=plt.axes(aspect=equal)
    plt.scatter(TestY,TestPrediction)
    plt.xlabel(True Values)
    plt.ylabel(Predictions)
    Lims=[0,10000]
    plt.xlim(Lims)
    plt.ylim(Lims)
    plt.plot(Lims,Lims)
    plt.grid(False)

# Verify the accuracy and draw error hist image.
def AccuracyVerification(TestY,TestPrediction):
    DNNError=TestPrediction-TestY
    plt.figure(4)
    plt.hist(DNNError,bins=30)
    plt.xlabel(Prediction Error)
    plt.ylabel(Count)
    plt.grid(False)
    Pearsonr=stats.pearsonr(TestY,TestPrediction)
    R2=metrics.r2_score(TestY,TestPrediction)
    RMSE=metrics.mean_squared_error(TestY,TestPrediction)**0.5
    print(Pearson correlation coefficient is 0, and RMSE is 1..format(Pearsonr[0],RMSE))
    return (Pearsonr[0],R2,RMSE)

# Save key parameters.
def WriteAccuracy(*WriteVar):
    ExcelData=openpyxl.load_workbook(WriteVar[0])
    SheetName=ExcelData.get_sheet_names()
    WriteSheet=ExcelData.get_sheet_by_name(SheetName[0])
    WriteSheet=ExcelData.active
    MaxRowNum=WriteSheet.max_row
    for i in range(len(WriteVar)-1):
        exec("WriteSheet.cell(MaxRowNum+1,i+1).value=WriteVar[i+1]")
    ExcelData.save(WriteVar[0])

# Input parameters.
DataPath="G:/CropYield/03_DL/00_Data/AllDataAll.csv"
ModelPath="G:/CropYield/03_DL/02_DNNModle"
CheckPointPath="G:/CropYield/03_DL/02_DNNModle/Weights"
CheckPointName=CheckPointPath+"/Weights_epoch:03d_val_loss:.4f.hdf5"
ParameterPath="G:/CropYield/03_DL/03_OtherResult/ParameterResult.xlsx"
TrainFrac=0.8
RandomSeed=np.random.randint(low=21,high=22)
CheckPointMethod=val_loss
HiddenLayer=[64,128,256,512,512,1024,1024]
RegularizationFactor=0.0001
ActivationMethod=relu
DropoutValue=[0.5,0.5,0.5,0.3,0.3,0.3,0.2]
OutputLayerActMethod=linear
LossMethod=mean_absolute_error
LearnRate=0.005
LearnDecay=0.0005
FitEpoch=500
BatchSize=9999
ValFrac=0.2
BestEpochOptMethod=adam

# Fetch and divide data.
MyData=pd.read_csv(DataPath,names=[EVI0610,EVI0626,EVI0712,EVI0728,EVI0813,EVI0829,
                                   EVI0914,EVI0930,EVI1016,Lrad06,Lrad07,Lrad08,
                                   Lrad09,Lrad10,Prec06,Prec07,Prec08,Prec09,
                                   Prec10,Pres06,Pres07,Pres08,Pres09,Pres10,
                                   SIF161,SIF177,SIF193,SIF209,SIF225,SIF241,
                                   SIF257,SIF273,SIF289,Shum06,Shum07,Shum08,
                                   Shum09,Shum10,SoilType,Srad06,Srad07,Srad08,
                                   Srad09,Srad10,Temp06,Temp07,Temp08,Temp09,
                                   Temp10,Wind06,Wind07,Wind08,Wind09,Wind10,
                                   Yield],header=0)
TrainData=MyData.sample(frac=TrainFrac,random_state=RandomSeed)
TestData=MyData.drop(TrainData.index)

# Draw the joint distribution image.
# JointFactor=[Lrad07,Prec06,SIF161,Shum06,Srad07,Srad08,Srad10,Temp06,Yield]
# JointDistribution(JointFactor)

# Separate independent and dependent variables.
TrainX=TrainData.copy(deep=True)
TestX=TestData.copy(deep=True)
TrainY=TrainX.pop(Yield)
TestY=TestX.pop(Yield)

# Standardization data.
Normalizer=preprocessing.Normalization()
Normalizer.adapt(np.array(TrainX))

# Delete the model result from the last run.
DeleteOldModel(ModelPath)

# Find and save optimal epochs.
CallBack=CheckPoint(CheckPointName)

# Build DNN regression model.
DNNModel=BuildModel(Normalizer)
DNNModel.summary()
DNNHistory=DNNModel.fit(TrainX,
                        TrainY,
                        epochs=FitEpoch,
                        # batch_size=BatchSize,
                        verbose=1,
                        callbacks=CallBack,
                        validation_split=ValFrac)

# Draw error image.
LossPlot(DNNHistory)

# Optimize the model based on optimal epoch.
DNNModel=BestEpochIntoModel(CheckPointPath,DNNModel)

# Predict test set data.
TestPrediction=DNNModel.predict(TestX).flatten()

# Draw Test image.
TestPlot(TestY,TestPrediction)

# Verify the accuracy and draw error hist image.
AccuracyResult=AccuracyVerification(TestY,TestPrediction)
PearsonR,R2,RMSE=AccuracyResult[0],AccuracyResult[1],AccuracyResult[2]

# Save model and key parameters.
DNNModel.save(ModelPath)
WriteAccuracy(ParameterPath,PearsonR,R2,RMSE,TrainFrac,RandomSeed,CheckPointMethod,
              ,.join(%s %i for i in HiddenLayer),RegularizationFactor,
              ActivationMethod,,.join(%s %i for i in DropoutValue),OutputLayerActMethod,
              LossMethod,LearnRate,LearnDecay,FitEpoch,BatchSize,ValFrac,BestEpochOptMethod)

  至此,大功告成。

python 使用TensorFlow去噪自动编码器实现。

models_dir = 'models/'  # dir to save/restore models
data_dir = 'data/'  # directory to store algorithm data
summary_dir = 'logs/'  # directory to store tensorflow summaries
from scipy import misc
import tensorflow as tf
import numpy as np

# ############# #
#   Utilities   #
# ############# #


def xavier_init(fan_in, fan_out, const=1):
    """ Xavier initialization of network weights.
    https://stackoverflow.com/questions/33640581/how-to-do-xavier-initialization-on-tensorflow

    :param fan_in: fan in of the network (n_features)
    :param fan_out: fan out of the network (n_components)
    :param const: multiplicative constant
    """
    low = -const * np.sqrt(6.0 / (fan_in + fan_out))
    high = const * np.sqrt(6.0 / (fan_in + fan_out))
    return tf.random_uniform((fan_in, fan_out), minval=low, maxval=high)
    

def gen_batches(data, batch_size):
    """ Divide input data into batches.

    :param data: input data
    :param batch_size: size of each batch

    :return: data divided into batches
    """
    data = np.array(data)

    for i in range(0, data.shape[0], batch_size):
        yield data[i:i+batch_size]


def masking_noise(X, v):
    """ Apply masking noise to data in X, in other words a fraction v of elements of X
    (chosen at random) is forced to zero.

    :param X: array_like, Input data
    :param v: int, fraction of elements to distort

    :return: transformed data
    """
    X_noise = X.copy()

    n_samples = X.shape[0]
    n_features = X.shape[1]

    for i in range(n_samples):
        mask = np.random.randint(0, n_features, v)

        for m in mask:
            X_noise[i][m] = 0.

    return X_noise


def salt_and_pepper_noise(X, v):
    """ Apply salt and pepper noise to data in X, in other words a fraction v of elements of X
    (chosen at random) is set to its maximum or minimum value according to a fair coin flip.
    If minimum or maximum are not given, the min (max) value in X is taken.

    :param X: array_like, Input data
    :param v: int, fraction of elements to distort

    :return: transformed data
    """
    X_noise = X.copy()
    n_features = X.shape[1]

    mn = X.min()
    mx = X.max()

    for i, sample in enumerate(X):
        mask = np.random.randint(0, n_features, v)

        for m in mask:

            if np.random.random() < 0.5:
                X_noise[i][m] = mn
            else:
                X_noise[i][m] = mx

    return X_noise
    
    
  def masking_noise(X, v):
    """ Apply masking noise to data in X, in other words a fraction v of elements of X
    (chosen at random) is forced to zero.

    :param X: array_like, Input data
    :param v: int, fraction of elements to distort

    :return: transformed data
    """
    X_noise = X.copy()

    n_samples = X.shape[0]
    n_features = X.shape[1]

    for i in range(n_samples):
        mask = np.random.randint(0, n_features, v)

        for m in mask:
            X_noise[i][m] = 0.

    return X_noise


def salt_and_pepper_noise(X, v):
    """ Apply salt and pepper noise to data in X, in other words a fraction v of elements of X
    (chosen at random) is set to its maximum or minimum value according to a fair coin flip.
    If minimum or maximum are not given, the min (max) value in X is taken.

    :param X: array_like, Input data
    :param v: int, fraction of elements to distort

    :return: transformed data
    """
    X_noise = X.copy()
    n_features = X.shape[1]

    mn = X.min()
    mx = X.max()

    for i, sample in enumerate(X):
        mask = np.random.randint(0, n_features, v)

        for m in mask:

            if np.random.random() < 0.5:
                X_noise[i][m] = mn
            else:
                X_noise[i][m] = mx

    return X_noise


def gen_image(img, width, height, outfile, img_type='grey'):
    assert len(img) == width * height or len(img) == width * height * 3

    if img_type == 'grey':
        misc.imsave(outfile, img.reshape(width, height))

    elif img_type == 'color':
        misc.imsave(outfile, img.reshape(3, width, height))
import tensorflow as tf

import autoencoder
import datasets

# #################### #
#   Flags definition   #
# #################### #
flags = tf.app.flags
FLAGS = flags.FLAGS

# Global configuration
flags.DEFINE_string('model_name', '', 'Model name.')
flags.DEFINE_string('dataset', 'mnist', 'Which dataset to use. ["mnist", "cifar10"]')
flags.DEFINE_string('cifar_dir', '', 'Path to the cifar 10 dataset directory.')
flags.DEFINE_integer('seed', -1, 'Seed for the random generators (>= 0). Useful for testing hyperparameters.')
flags.DEFINE_boolean('restore_previous_model', False, 'If true, restore previous model corresponding to model name.')
flags.DEFINE_boolean('encode_train', False, 'Whether to encode and store the training set.')
flags.DEFINE_boolean('encode_valid', False, 'Whether to encode and store the validation set.')
flags.DEFINE_boolean('encode_test', False, 'Whether to encode and store the test set.')


# Stacked Denoising Autoencoder specific parameters
flags.DEFINE_integer('n_components', 256, 'Number of hidden units in the dae.')
flags.DEFINE_string('corr_type', 'none', 'Type of input corruption. ["none", "masking", "salt_and_pepper"]')
flags.DEFINE_float('corr_frac', 0., 'Fraction of the input to corrupt.')
flags.DEFINE_integer('xavier_init', 1, 'Value for the constant in xavier weights initialization.')
flags.DEFINE_string('enc_act_func', 'tanh', 'Activation function for the encoder. ["sigmoid", "tanh"]')
flags.DEFINE_string('dec_act_func', 'none', 'Activation function for the decoder. ["sigmoid", "tanh", "none"]')
flags.DEFINE_string('main_dir', 'dae/', 'Directory to store data relative to the algorithm.')
flags.DEFINE_string('loss_func', 'mean_squared', 'Loss function. ["mean_squared" or "cross_entropy"]')
flags.DEFINE_integer('verbose', 0, 'Level of verbosity. 0 - silent, 1 - print accuracy.')
flags.DEFINE_integer('weight_images', 0, 'Number of weight images to generate.')
flags.DEFINE_string('opt', 'gradient_descent', '["gradient_descent", "ada_grad", "momentum"]')
flags.DEFINE_float('learning_rate', 0.01, 'Initial learning rate.')
flags.DEFINE_float('momentum', 0.5, 'Momentum parameter.')
flags.DEFINE_integer('num_epochs', 10, 'Number of epochs.')
flags.DEFINE_integer('batch_size', 10, 'Size of each mini-batch.')

assert FLAGS.dataset in ['mnist', 'cifar10']
assert FLAGS.enc_act_func in ['sigmoid', 'tanh']
assert FLAGS.dec_act_func in ['sigmoid', 'tanh', 'none']
assert FLAGS.corr_type in ['masking', 'salt_and_pepper', 'none']
assert 0. <= FLAGS.corr_frac <= 1.
assert FLAGS.loss_func in ['cross_entropy', 'mean_squared']
assert FLAGS.opt in ['gradient_descent', 'ada_grad', 'momentum']

if __name__ == '__main__':

    if FLAGS.dataset == 'mnist':

        # ################# #
        #   MNIST Dataset   #
        # ################# #

        trX, vlX, teX = datasets.load_mnist_dataset(mode='unsupervised')

    elif FLAGS.dataset == 'cifar10':

        # ################### #
        #   Cifar10 Dataset   #
        # ################### #

        trX, teX = datasets.load_cifar10_dataset(FLAGS.cifar_dir, mode='unsupervised')
        vlX = teX[:5000]  # Validation set is the first half of the test set

    else:  # cannot be reached, just for completeness
        trX = None
        vlX = None
        teX = None

    # Create the object
    dae = autoencoder.DenoisingAutoencoder(
        seed=FLAGS.seed, model_name=FLAGS.model_name, n_components=FLAGS.n_components,
        enc_act_func=FLAGS.enc_act_func, dec_act_func=FLAGS.dec_act_func, xavier_init=FLAGS.xavier_init,
        corr_type=FLAGS.corr_type, corr_frac=FLAGS.corr_frac, dataset=FLAGS.dataset,
        loss_func=FLAGS.loss_func, main_dir=FLAGS.main_dir, opt=FLAGS.opt,
        learning_rate=FLAGS.learning_rate, momentum=FLAGS.momentum,
        verbose=FLAGS.verbose, num_epochs=FLAGS.num_epochs, batch_size=FLAGS.batch_size)

    # Fit the model
    dae.fit(trX, teX, restore_previous_model=FLAGS.restore_previous_model)

    # Encode the training data and store it
    dae.transform(trX, name='train', save=FLAGS.encode_train)
    dae.transform(vlX, name='validation', save=FLAGS.encode_valid)
    dae.transform(teX, name='test', save=FLAGS.encode_test)

    # save images
    dae.get_weights_as_images(28, 28, max_images=FLAGS.weight_images)

import tensorflow as tf
import numpy as np
import os

import zconfig
import utils

class DenoisingAutoencoder(object):

    """ Implementation of Denoising Autoencoders using TensorFlow.
    The interface of the class is sklearn-like.
    """

    def __init__(self, model_name='dae', n_components=256, main_dir='dae/', enc_act_func='tanh',
                 dec_act_func='none', loss_func='mean_squared', num_epochs=10, batch_size=10, dataset='mnist',
                 xavier_init=1, opt='gradient_descent', learning_rate=0.01, momentum=0.5, corr_type='none',
                 corr_frac=0., verbose=1, seed=-1):
        """
        :param main_dir: main directory to put the models, data and summary directories
        :param n_components: number of hidden units
        :param enc_act_func: Activation function for the encoder. ['tanh', 'sigmoid']
        :param dec_act_func: Activation function for the decoder. ['tanh', 'sigmoid']
        :param loss_func: Loss function. ['mean_squared', 'cross_entropy']
        :param xavier_init: Value of the constant for xavier weights initialization
        :param opt: Which tensorflow optimizer to use. ['gradient_descent', 'momentum', 'ada_grad']
        :param learning_rate: Initial learning rate
        :param momentum: Momentum parameter
        :param corr_type: Type of input corruption. ["none", "masking", "salt_and_pepper"]
        :param corr_frac: Fraction of the input to corrupt.
        :param verbose: Level of verbosity. 0 - silent, 1 - print accuracy.
        :param num_epochs: Number of epochs
        :param batch_size: Size of each mini-batch
        :param dataset: Optional name for the dataset.
        :param seed: positive integer for seeding random generators. Ignored if < 0.
        """

        self.model_name = model_name
        self.n_components = n_components
        self.main_dir = main_dir
        self.enc_act_func = enc_act_func
        self.dec_act_func = dec_act_func
        self.loss_func = loss_func
        self.num_epochs = num_epochs
        self.batch_size = batch_size
        self.dataset = dataset
        self.xavier_init = xavier_init
        self.opt = opt
        self.learning_rate = learning_rate
        self.momentum = momentum
        self.corr_type = corr_type
        self.corr_frac = corr_frac
        self.verbose = verbose
        self.seed = seed

        if self.seed >= 0:
            np.random.seed(self.seed)
            tf.set_random_seed(self.seed)

        self.models_dir, self.data_dir, self.tf_summary_dir = self._create_data_directories()
        self.model_path = self.models_dir + self.model_name

        self.input_data = None
        self.input_data_corr = None

        self.W_ = None
        self.bh_ = None
        self.bv_ = None

        self.encode = None
        self.decode = None

        self.train_step = None
        self.cost = None

        self.tf_session = None
        self.tf_merged_summaries = None
        self.tf_summary_writer = None
        self.tf_saver = None

    def fit(self, train_set, validation_set=None, restore_previous_model=False):
        """ Fit the model to the data.

        :param train_set: Training data.
        :param validation_set: optional, default None. Validation data.
        :param restore_previous_model:
                    if true, a previous trained model
                    with the same name of this model is restored from disk to continue training.

        :return: self
        """
        n_features = train_set.shape[1]

        self._build_model(n_features)

        with tf.Session() as self.tf_session:

            self._initialize_tf_utilities_and_ops(restore_previous_model)
            self._train_model(train_set, validation_set)
            self.tf_saver.save(self.tf_session, self.models_dir + self.model_name)

    def _initialize_tf_utilities_and_ops(self, restore_previous_model):

        """ Initialize TensorFlow operations: summaries, init operations, saver, summary_writer.
        Restore a previously trained model if the flag restore_previous_model is true.
        """

        self.tf_merged_summaries = tf.merge_all_summaries()
        init_op = tf.initialize_all_variables()
        self.tf_saver = tf.train.Saver()

        self.tf_session.run(init_op)

        if restore_previous_model:
            self.tf_saver.restore(self.tf_session, self.model_path)

        self.tf_summary_writer = tf.train.SummaryWriter(self.tf_summary_dir, self.tf_session.graph_def)

    def _train_model(self, train_set, validation_set):

        """Train the model.

        :param train_set: training set
        :param validation_set: validation set. optional, default None

        :return: self
        """

        corruption_ratio = np.round(self.corr_frac * train_set.shape[1]).astype(np.int)

        for i in range(self.num_epochs):

            self._run_train_step(train_set, corruption_ratio)

            if i % 5 == 0:
                if validation_set is not None:
                    self._run_validation_error_and_summaries(i, validation_set)

    def _run_train_step(self, train_set, corruption_ratio):

        """ Run a training step. A training step is made by randomly corrupting the training set,
        randomly shuffling it,  divide it into batches and run the optimizer for each batch.

        :param train_set: training set
        :param corruption_ratio: fraction of elements to corrupt

        :return: self
        """
        x_corrupted = self._corrupt_input(train_set, corruption_ratio)

        shuff = zip(train_set, x_corrupted)
        np.random.shuffle(shuff)

        batches = [_ for _ in utils.gen_batches(shuff, self.batch_size)]

        for batch in batches:
            x_batch, x_corr_batch = zip(*batch)
            tr_feed = {self.input_data: x_batch, self.input_data_corr: x_corr_batch}
            self.tf_session.run(self.train_step, feed_dict=tr_feed)

    def _corrupt_input(self, data, v):

        """ Corrupt a fraction 'v' of 'data' according to the
        noise method of this autoencoder.
        :return: corrupted data
        """

        if self.corr_type == 'masking':
            x_corrupted = utils.masking_noise(data, v)

        elif self.corr_type == 'salt_and_pepper':
            x_corrupted = utils.salt_and_pepper_noise(data, v)

        elif self.corr_type == 'none':
            x_corrupted = data

        else:
            x_corrupted = None

        return x_corrupted

    def _run_validation_error_and_summaries(self, epoch, validation_set):

        """ Run the summaries and error computation on the validation set.

        :param epoch: current epoch
        :param validation_set: validation data

        :return: self
        """

        vl_feed = {self.input_data: validation_set, self.input_data_corr: validation_set}
        result = self.tf_session.run([self.tf_merged_summaries, self.cost], feed_dict=vl_feed)
        summary_str = result[0]
        err = result[1]

        self.tf_summary_writer.add_summary(summary_str, epoch)

        if self.verbose == 1:
            print("Validation cost at step %s: %s" % (epoch, err))

    def _build_model(self, n_features):
        """ Creates the computational graph.

        :type n_features: int
        :param n_features: Number of features.

        :return: self
        """

        self.input_data, self.input_data_corr = self._create_placeholders(n_features)
        self.W_, self.bh_, self.bv_ = self._create_variables(n_features)

        self._create_encode_layer()
        self._create_decode_layer()

        self._create_cost_function_node()
        self._create_train_step_node()

    def _create_placeholders(self, n_features):

        """ Create the TensorFlow placeholders for the model.

        :return: tuple(input_data(shape(None, n_features)),
                       input_data_corr(shape(None, n_features)))
        """

        input_data = tf.placeholder('float', [None, n_features], name='x-input')
        input_data_corr = tf.placeholder('float', [None, n_features], name='x-corr-input')

        return input_data, input_data_corr

    def _create_variables(self, n_features):

        """ Create the TensorFlow variables for the model.

        :return: tuple(weights(shape(n_features, n_components)),
                       hidden bias(shape(n_components)),
                       visible bias(shape(n_features)))
        """

        W_ = tf.Variable(utils.xavier_init(n_features, self.n_components, self.xavier_init), name='enc-w')
        bh_ = tf.Variable(tf.zeros([self.n_components]), name='hidden-bias')
        bv_ = tf.Variable(tf.zeros([n_features]), name='visible-bias')

        return W_, bh_, bv_

    def _create_encode_layer(self):

        """ Create the encoding layer of the network.
        :return: self
        """

        with tf.name_scope("W_x_bh"):
            if self.enc_act_func == 'sigmoid':
                self.encode = tf.nn.sigmoid(tf.matmul(self.input_data_corr, self.W_) + self.bh_)

            elif self.enc_act_func == 'tanh':
                self.encode = tf.nn.tanh(tf.matmul(self.input_data_corr, self.W_) + self.bh_)

            else:
                self.encode = None

    def _create_decode_layer(self):

        """ Create the decoding layer of the network.
        :return: self
        """

        with tf.name_scope("Wg_y_bv"):
            if self.dec_act_func == 'sigmoid':
                self.decode = tf.nn.sigmoid(tf.matmul(self.encode, tf.transpose(self.W_)) + self.bv_)

            elif self.dec_act_func == 'tanh':
                self.decode = tf.nn.tanh(tf.matmul(self.encode, tf.transpose(self.W_)) + self.bv_)

            elif self.dec_act_func == 'none':
                self.decode = tf.matmul(self.encode, tf.transpose(self.W_)) + self.bv_

            else:
                self.decode = None

    def _create_cost_function_node(self):

        """ create the cost function node of the network.
        :return: self
        """

        with tf.name_scope("cost"):
            if self.loss_func == 'cross_entropy':
                self.cost = - tf.reduce_sum(self.input_data * tf.log(self.decode))
                _ = tf.scalar_summary("cross_entropy", self.cost)

            elif self.loss_func == 'mean_squared':
                self.cost = tf.sqrt(tf.reduce_mean(tf.square(self.input_data - self.decode)))
                _ = tf.scalar_summary("mean_squared", self.cost)

            else:
                self.cost = None

    def _create_train_step_node(self):

        """ create the training step node of the network.
        :return: self
        """

        with tf.name_scope("train"):
            if self.opt == 'gradient_descent':
                self.train_step = tf.train.GradientDescentOptimizer(self.learning_rate).minimize(self.cost)

            elif self.opt == 'ada_grad':
                self.train_step = tf.train.AdagradOptimizer(self.learning_rate).minimize(self.cost)

            elif self.opt == 'momentum':
                self.train_step = tf.train.MomentumOptimizer(self.learning_rate, self.momentum).minimize(self.cost)

            else:
                self.train_step = None

    def transform(self, data, name='train', save=False):
        """ Transform data according to the model.

        :param data: Data to transform
        :param name: Identifier for the data that is being encoded
        :param save: If true, save data to disk

        :return: transformed data
        """

        with tf.Session() as self.tf_session:

            self.tf_saver.restore(self.tf_session, self.models_dir + self.model_name)

            encoded_data = self.encode.eval({self.input_data_corr: data})

            if save:
                np.save(self.data_dir + self.model_name + '-' + name, encoded_data)

            return encoded_data

    def load_model(self, shape, model_path):
        """ Restore a previously trained model from disk.

        :param shape: tuple(n_features, n_components)
        :param model_path: path to the trained model

        :return: self, the trained model
        """
        self.n_components = shape[1]

        self._build_model(shape[0])

        init_op = tf.initialize_all_variables()

        self.tf_saver = tf.train.Saver()

        with tf.Session() as self.tf_session:

            self.tf_session.run(init_op)

            self.tf_saver.restore(self.tf_session, model_path)

    def get_model_parameters(self):
        """ Return the model parameters in the form of numpy arrays.

        :return: model parameters
        """
        with tf.Session() as self.tf_session:

            self.tf_saver.restore(self.tf_session, self.models_dir + self.model_name)

            return {
                'enc_w': self.W_.eval(),
                'enc_b': self.bh_.eval(),
                'dec_b': self.bv_.eval()
            }

    def _create_data_directories(self):

        """ Create the three directories for storing respectively the models,
        the data generated by training and the TensorFlow's summaries.

        :return: tuple of strings(models_dir, data_dir, summary_dir)
        """

        self.main_dir = self.main_dir + '/' if self.main_dir[-1] != '/' else self.main_dir

        models_dir = zconfig.models_dir + self.main_dir
        data_dir = zconfig.data_dir + self.main_dir
        summary_dir = zconfig.summary_dir + self.main_dir

        for d in [models_dir, data_dir, summary_dir]:
            if not os.path.isdir(d):
                os.mkdir(d)

        return models_dir, data_dir, summary_dir

    def get_weights_as_images(self, width, height, outdir='img/', max_images=10, model_path=None):
        """ Save the weights of this autoencoder as images, one image per hidden unit.
        Useful to visualize what the autoencoder has learned.

        :type width: int
        :param width: Width of the images

        :type height: int
        :param height: Height of the images

        :type outdir: string, default 'data/sdae/img'
        :param outdir: Output directory for the images. This path is appended to self.data_dir

        :type max_images: int, default 10
        :param max_images: Number of images to return.
        """
        assert max_images <= self.n_components

        outdir = self.data_dir + outdir

        if not os.path.isdir(outdir):
            os.mkdir(outdir)

        with tf.Session() as self.tf_session:

            if model_path is not None:
                self.tf_saver.restore(self.tf_session, model_path)
            else:
                self.tf_saver.restore(self.tf_session, self.models_dir + self.model_name)

            enc_weights = self.W_.eval()

            perm = np.random.permutation(self.n_components)[:max_images]

            for p in perm:

                enc_w = np.array([i[p] for i in enc_weights])
                image_path = outdir + self.model_name + '-enc_weights_{}.png'.format(p)
                utils.gen_image(enc_w, width, height, image_path)

以上是关于Python TensorFlow实现Sequential深度神经网络回归的主要内容,如果未能解决你的问题,请参考以下文章

python_tensorflow_Django实现逻辑回归

在 C/C++ 中使用 TensorFlow 预训练好的模型—— 间接调用 Python 实现

5.6 tensorflow2实现奇异值分解(SVD)——python实战(下篇)

吴裕雄--天生自然python Google深度学习框架:Tensorflow实现迁移学习

Python TensorFlow实现Sequential深度神经网络回归

5.8. tensorflow2实现SVD推荐系统——python实战 (下篇)