在子类模型中使用带有自定义 train_step 的 Keras ImageDataGenerator 时出错
Posted
技术标签:
【中文标题】在子类模型中使用带有自定义 train_step 的 Keras ImageDataGenerator 时出错【英文标题】:Error using Keras ImageDataGenerator with custom train_step in subclassed model 【发布时间】:2021-03-04 02:27:50 【问题描述】:我正在尝试使用 keras 编写知识蒸馏模型。我从 keras 示例 here 开始。为了训练模型,覆盖了 train_step 和 test_step 方法。与 keras 示例不同,我想使用 ImageDataGenerator 对象来拟合模型,以预处理 CIFAR10 数据集中的图像。问题是,每当我调用传递 X_train 和 Y_train 的 model.fit 函数时,训练工作正常,如果我调用 model.fit 传递 ImageDataGenerator.flow(X_train, Y_train, batch_size) 代码返回以下错误:
NotImplementedError:子类化Model类时,应该实现调用方法。
我也尝试过修改 train_step 处理它接收到的数据输入的方式,但似乎到目前为止还没有任何方法奏效。
为什么会这样?用 ImageDataGenereator 对象覆盖 Model 类的 train_step 方法有什么问题吗?类Model的fit方法也应该被覆盖吗?
为了让事情变得清晰和可重现,这里是示例代码:
import time
import copy
import tensorflow as tf
import keras
from keras import regularizers
from keras.engine import Model
from keras.layers import Dropout, Flatten, Dense, Conv2D, MaxPooling2D, Activation, BatchNormalization
from keras.models import Sequential
from keras.datasets import cifar10
from keras.preprocessing.image import ImageDataGenerator
from keras.utils import np_utils
from tensorflow.python.keras.engine import data_adapter
# Imported from files
import settings_parser
from utils import progressive_learning_rate
from teacher import Teacher, build_teacher
from student import Student, build_student
class Distiller(tf.keras.Model):
def __init__(self, student, teacher):
super(Distiller, self).__init__()
self.teacher = teacher
self.student = student
def compile(self, optimizer, metrics, student_loss_fn, distillation_loss_fn, alpha=0.1, temperature=3):
""" Configure the distiller.
Args:
optimizer: Keras optimizer for the student weights
metrics: Keras metrics for evaluation
student_loss_fn: Loss function of difference between student
predictions and ground-truth
distillation_loss_fn: Loss function of difference between soft
student predictions and soft teacher predictions
alpha: weight to student_loss_fn and 1-alpha to distillation_loss_fn
temperature: Temperature for softening probability distributions.
Larger temperature gives softer distributions.
"""
super(Distiller, self).compile(optimizer=optimizer, metrics=metrics)
self.student_loss_fn = student_loss_fn
self.distillation_loss_fn = distillation_loss_fn
self.alpha = alpha
self.temperature = temperature
# @tf.function
def train_step(self, data):
# Treat data in different ways if it is a tuple or an iterator
x = None
y = None
if isinstance(data, tuple):
x, y = data
if isinstance(data, tf.keras.preprocessing.image.NumpyArrayIterator):
x, y = data.next()
# Forward pass of teacher
teacher_predictions = self.teacher(x, training=False)
with tf.GradientTape() as tape:
# Forward pass of student
student_predictions = self.student(x, training=True)
# Compute losses
student_loss = self.student_loss_fn(y, student_predictions)
distillation_loss = self.distillation_loss_fn(
tf.nn.softmax(teacher_predictions / self.temperature, axis=1),
tf.nn.softmax(student_predictions / self.temperature, axis=1),
)
loss = self.alpha * student_loss + (1 - self.alpha) * distillation_loss
# Compute gradients
trainable_vars = self.student.trainable_variables
gradients = tape.gradient(loss, trainable_vars)
# Update weights
self.optimizer.apply_gradients(zip(gradients, trainable_vars))
# Update the metrics configured in `compile()`.
self.compiled_metrics.update_state(y, student_predictions)
# Return a dict of performance
results = m.name: m.result() for m in self.metrics
results.update(
"student_loss": student_loss, "distillation_loss": distillation_loss
)
return results
# @tf.function
def test_step(self, data):
# Treat data in different ways if it is a tuple or an iterator
x = None
y = None
if isinstance(data, tuple):
x, y = data
if isinstance(data, tf.keras.preprocessing.image.NumpyArrayIterator):
x, y = data.next()
# Compute predictions
y_prediction = self.student(x, training=False)
# Calculate the loss
student_loss = self.student_loss_fn(y, y_prediction)
# Update the metrics.
self.compiled_metrics.update_state(y, y_prediction)
# Return a dict of performance
results = m.name: m.result() for m in self.metrics
results.update("student_loss": student_loss)
return results
#Define method to build the teacher model (VGG16)
def build_teacher():
input = keras.Input(shape=(32, 32, 3), name="img")
x = Conv2D(64, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(input)
x = Activation('relu')(x)
x = BatchNormalization()(x)
x = Dropout(0.3)(x)
# Block 2
x = Conv2D(64, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(x)
x = Activation('relu')(x)
x = BatchNormalization()(x)
x = MaxPooling2D(pool_size=(2, 2))(x)
# Block 3
x = Conv2D(128, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(x)
x = Activation('relu')(x)
x = BatchNormalization()(x)
x = Dropout(0.4)(x)
# Block 4
x = Conv2D(128, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(x)
x = Activation('relu')(x)
x = BatchNormalization()(x)
x = MaxPooling2D(pool_size=(2, 2))(x)
# Block 5
x = Conv2D(256, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(x)
x = Activation('relu')(x)
x = BatchNormalization()(x)
x = Dropout(0.4)(x)
# Block 6
x = Conv2D(256, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(x)
x = Activation('relu')(x)
x = BatchNormalization()(x)
x = Dropout(0.4)(x)
# Block 7
x = Conv2D(256, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(x)
x = Activation('relu')(x)
x = BatchNormalization()(x)
x = MaxPooling2D(pool_size=(2, 2))(x)
# Block 8
x = Conv2D(512, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(x)
x = Activation('relu')(x)
x = BatchNormalization()(x)
x = Dropout(0.4)(x)
# Block 9
x = Conv2D(512, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(x)
x = Activation('relu')(x)
x = BatchNormalization()(x)
x = Dropout(0.4)(x)
# Block 10
x = Conv2D(512, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(x)
x = Activation('relu')(x)
x = BatchNormalization()(x)
x = MaxPooling2D(pool_size=(2, 2))(x)
# Block 11
x = Conv2D(512, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(x)
x = Activation('relu')(x)
x = BatchNormalization()(x)
x = Dropout(0.4)(x)
# Block 12
x = Conv2D(512, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(x)
x = Activation('relu')(x)
x = BatchNormalization()(x)
x = Dropout(0.4)(x)
# Block 13
x = Conv2D(512, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(x)
x = Activation('relu')(x)
x = BatchNormalization()(x)
x = MaxPooling2D(pool_size=(2, 2))(x)
x = Dropout(0.5)(x)
# Flatten and classification
x = Flatten()(x)
x = Dense(512)(x)
x = Activation('relu')(x)
x = BatchNormalization()(x)
x = Dropout(0.5)(x)
# Out
x = Dense(10)(x)
output = Activation('softmax')(x)
# Define model from input and output
model = keras.Model(input, output, name="teacher")
print(model.summary())
return model
#Define method to build the teacher model (VGG16)
def build_student():
input = keras.Input(shape=(32, 32, 3), name="img")
x = Conv2D(64, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(input)
x = Activation('relu')(x)
x = BatchNormalization()(x)
x = Dropout(0.3)(x)
# Block 2
x = Conv2D(128, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(x)
x = Activation('relu')(x)
x = BatchNormalization()(x)
x = MaxPooling2D(pool_size=(2, 2))(x)
# Block 3
x = Conv2D(128, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(x)
x = Activation('relu')(x)
x = BatchNormalization()(x)
x = Dropout(0.4)(x)
# Block 4
x = Conv2D(128, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(x)
x = Activation('relu')(x)
x = BatchNormalization()(x)
x = MaxPooling2D(pool_size=(2, 2))(x)
# Block 5
x = Conv2D(256, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(x)
x = Activation('relu')(x)
x = BatchNormalization()(x)
x = Dropout(0.4)(x)
x = MaxPooling2D(pool_size=(2, 2))(x)
# Flatten and classification
x = Flatten()(x)
x = Dense(512)(x)
x = Activation('relu')(x)
x = BatchNormalization()(x)
x = Dropout(0.5)(x)
# Out
x = Dense(10)(x)
output = Activation('softmax')(x)
# Define model from input and output
model = keras.Model(input, output, name="student")
print(model.summary())
return model
if __name__ == '__main__':
args = settings_parser.arg_parse()
print_during_epochs = True
student = build_student()
student_clone = build_student()
student_clone.set_weights(student.get_weights())
teacher = build_teacher()
(X_train, y_train), (X_test, y_test) = cifar10.load_data()
X_train = X_train.astype('float32')
X_test = X_test.astype('float32')
Y_train = np_utils.to_categorical(y_train, 10)
Y_test = np_utils.to_categorical(y_test, 10)
train_datagen = ImageDataGenerator(
rescale=1. / 255, # rescale input image
featurewise_center=False, # set input mean to 0 over the dataset
samplewise_center=False, # set each sample mean to 0
featurewise_std_normalization=False, # divide inputs by std of the dataset
samplewise_std_normalization=False, # divide each input by its std
zca_whitening=False, # apply ZCA whitening
rotation_range=15, # randomly rotate images in the range (degrees, 0 to 180)
width_shift_range=0.1, # randomly shift images horizontally (fraction of total width)
height_shift_range=0.1, # randomly shift images vertically (fraction of total height)
horizontal_flip=True, # randomly flip images
vertical_flip=False) # randomly flip images)
train_datagen.fit(X_train)
train_generator = train_datagen.flow(X_train, Y_train, batch_size=64)
test_datagen = ImageDataGenerator(rescale=1. / 255)
test_generator = test_datagen.flow(X_test, Y_test, batch_size=64)
# Train teacher as usual
teacher.compile(optimizer=keras.optimizers.SGD(),
loss=keras.losses.categorical_crossentropy,
metrics=['accuracy'])
# Train and evaluate teacher on data.
teacher.fit(train_generator, validation_data=test_generator, epochs=5, verbose=print_during_epochs)
loss, acc = teacher.evaluate(test_generator)
print("Teacher model, accuracy: :5.2f%".format(100 * acc))
# Train student as doen usually
student_clone.compile(optimizer=keras.optimizers.SGD(),
loss=keras.losses.categorical_crossentropy,
metrics=['accuracy'])
# Train and evaluate student trained from scratch.
student_clone.fit(train_generator, validation_data=test_generator, epochs=5, verbose=print_during_epochs)
loss, acc = student_clone.evaluate(test_generator)
print("Student scratch model, accuracy: :5.2f%".format(100 * acc))
#print('\n\n'.format(teacher.summary(), student_clone.summary()))
# Train student using knowledge distillation
distiller = Distiller(student=student, teacher=teacher)
distiller.compile(optimizer=keras.optimizers.SGD(),
metrics=['accuracy'],
student_loss_fn=keras.losses.CategoricalCrossentropy(), # categorical_crossentropy,
distillation_loss_fn=keras.losses.KLDivergence(),
alpha=0.1,
temperature=10)
# Distill teacher to student
distiller.fit(X_train, Y_train, epochs=5) #THIS WORKS FINE
distiller.fit(train_generator, validation_data=test_generator, epochs=5,
verbose=print_during_epochs) # THIS DOESN'T WORK
# Evaluate student on test dataset
loss, acc = distiller.evaluate(test_generator)
print("Student distilled model, accuracy: :5.2f%".format(100 * acc))
【问题讨论】:
你能添加完整的回溯吗? 【参考方案1】:尝试将call
方法添加到您的 Distiller 类。下面是一个使用keras.Model
的子类的自动编码器示例:
class LSTM_Detector(Model):
def __init__(self, flight_len, param_len):
super(LSTM_Detector, self).__init__()
self.input_dim = (flight_len, param_len)
self.encoder = tf.keras.Sequential([
layers.LSTM(16,
return_sequences=True,
activation="relu",
input_shape=self.input_dim),
])
self.decoder = tf.keras.Sequential([
layers.LSTM(16,
return_sequences=True,
activation="relu"),
layers.TimeDistributed(layers.Dense(self.input_dim[1]))
])
def call(self, x):
encoded = self.encoder(x)
decoded = self.decoder(encoded)
return decoded
【讨论】:
感谢您的回复。我知道通过添加调用方法错误会消失,但仍然存在问题。 Distiller 模型是使用两个模型获得的,学生模型和教师模型,因此 Distiller 模型将有两个不同的输出。我不太确定拟合函数如何处理蒸馏器模型的两个输出。此外,我认为问题不在于调用方法,因为调用 distiller.fit(X_train, Y_train) 工作正常。这很奇怪,因为如果问题是调用方法,fit 应该会引发相同的错误,但事实并非如此。 在这种情况下可能来自“test_datagen”。以下是 .fit() 方法的 'keras.Models' 源代码的摘录:“请注意,validation_data
不支持x
中支持的所有数据类型,例如,dict、generator 或keras.utils.Sequence
. "
我之前已经成功地在validation_data中使用了生成器。但我不是使用 Functionnal API 的子类模型类。
我尝试在不传递validation_data = test_datagen
的情况下运行 fit 方法,但它引发了同样的错误。因此,问题与test_datagen结构无关。【参考方案2】:
通过添加 call() 方法并在 compile() 方法中启用 Eager Execution,这个问题就解决了。
即在编译方法中添加这一行 super(GAN, self).compile(run_eagerly=True)
请按照解决此问题的示例代码。
############ Customize what happens in Model.fit and Building GAN Like Structure #############
from tensorflow import keras
from keras.layers import Dense, Flatten, Reshape, Input, InputLayer, Conv2D, MaxPooling2D, ZeroPadding2D, UpSampling3D, MaxPooling3D, UpSampling2D, Conv3D, Conv2DTranspose
from keras.models import Sequential, Model
import numpy as np
import tensorflow as tf
from keras import layers
from keras import backend as K
# ## Design model
def upsample(filters, size, norm_type='batchnorm', apply_dropout=False):
"""Upsamples an input.
Conv2DTranspose => Batchnorm => Dropout => Relu
Args:
filters: number of filters
size: filter size
norm_type: Normalization type; either 'batchnorm' or 'instancenorm'.
apply_dropout: If True, adds the dropout layer
Returns:
Upsample Sequential Model
"""
initializer = tf.random_normal_initializer(0., 0.02)
result = tf.keras.Sequential()
result.add(
tf.keras.layers.Conv2DTranspose(filters, size, strides=2,
padding='same',
kernel_initializer=initializer,
use_bias=False))
if norm_type.lower() == 'batchnorm':
result.add(tf.keras.layers.BatchNormalization())
elif norm_type.lower() == 'instancenorm':
result.add(InstanceNormalization())
if apply_dropout:
result.add(tf.keras.layers.Dropout(0.5))
result.add(tf.keras.layers.ReLU())
return result
# Create the generator
norm_type = 'batchnorm'
generator_resnet_model = tf.keras.applications.ResNet50V2(
include_top=False,
weights=None,
input_tensor=None,
input_shape=(224,224,3),
pooling=max,
)
Hog_Feature_Vector_generator = keras.Sequential(
[
keras.Input( shape=[224,224,3] ),
generator_resnet_model,
upsample(1024, 3, norm_type, apply_dropout=True),
upsample(512, 3, norm_type, apply_dropout=True),
upsample(256, 3, norm_type, apply_dropout=True),
Conv2D(128, 3, strides=(1, 1), padding='same', activation='relu'),
Conv2D(64, 3, strides=(1, 1), padding='same', activation='relu'),
Conv2D(32, 3, strides=(1, 1), padding='same', activation='relu'),
Conv2D(16, 3, strides=(1, 1), padding='same', activation='relu'),
Conv2D(9, 3, strides=(1, 1), padding='valid', activation='relu'),
layers.Reshape((26244,)),
],
name="Hog_Feature_Vector_generator",
)
print(Hog_Feature_Vector_generator.summary())
# Create 2 Discriminators
classifier_discriminator_resnet_model = tf.keras.applications.ResNet50V2(
include_top=True,
weights=None,
input_tensor=None,
input_shape=(216,216,3),
pooling=max,
classes=4,
classifier_activation=None,
)
Hog_Feature_Vector_Classifier = keras.Sequential(
[
keras.Input(shape=[26244,]),
layers.Reshape((54,54,9)),
upsample(18, 3, norm_type, apply_dropout=True),
upsample(36, 3, norm_type, apply_dropout=True),
Conv2D(18, 3, strides=(1, 1), padding='same', activation='relu'),
Conv2D(3, 3, strides=(1, 1), padding='same', activation='relu'),
classifier_discriminator_resnet_model,
],
name="Hog_Feature_Vector_Classifier",
)
print(Hog_Feature_Vector_Classifier.summary())
real_vs_fake_discriminator_resnet_model = tf.keras.applications.ResNet50V2(
include_top=True,
weights=None,
input_tensor=None,
input_shape=(216,216,3),
pooling=max,
classes=1,
classifier_activation="softmax",
)
real_vs_fake_Hog_Feature_Vector_Classifier = keras.Sequential(
[
keras.Input(shape=[26244,]),
layers.Reshape((54,54,9)),
upsample(18, 3, norm_type, apply_dropout=True),
upsample(36, 3, norm_type, apply_dropout=True),
Conv2D(18, 3, strides=(1, 1), padding='same', activation='relu'),
Conv2D(3, 3, strides=(1, 1), padding='same', activation='relu'),
real_vs_fake_discriminator_resnet_model,
],
name="real_vs_fake_Hog_Feature_Vector_Classifier",
)
print(real_vs_fake_Hog_Feature_Vector_Classifier.summary())
class GAN(keras.Model):
def __init__(self, discriminator_classifier, discriminator_fake_vs_real, generator):
super(GAN, self).__init__()
self.discriminator_classifier = discriminator_classifier
self.discriminator_fake_vs_real = discriminator_fake_vs_real
self.generator = generator
def call(self, input):
private_fvs = self.generator(input)
dec1_output = self.discriminator_classifier(private_fvs)
dec2_output = self.discriminator_fake_vs_real(private_fvs)
return private_fvs, dec1_output, dec2_output
def compile(self, d1_optimizer, d2_optimizer, g_optimizer, loss_fn_d1, loss_fn_d2):
super(GAN, self).compile(run_eagerly=True)
self.d1_optimizer = d1_optimizer
self.d2_optimizer = d2_optimizer
self.g_optimizer = g_optimizer
self.loss_fn_d1 = loss_fn_d1
self.loss_fn_d2 = loss_fn_d2
def train_step(self, data):
print(f"Eager execution mode: tf.executing_eagerly()")
# inp, trainLabels = data
# trainImages, trainFVs = inp
trainImages, trainFVs, trainLabels = data
## Inversing Labels for defining Not a Classidier #####
ones_array = np.ones( tf.shape (trainLabels) )
inverse_labels = ones_array - trainLabels
batch_size = tf.shape(trainImages)[0]
# Generate Private HoG Feature Vectors
generated_hog_fds = self.generator(trainImages)
# Combine them with real Feature Vectors
combined_features = tf.concat([generated_hog_fds, trainFVs], axis=0)
# Assemble labels discriminating real from fake Feature Vectors
labels = tf.concat([tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0)
# Add random noise to the labels - important trick!
labels += 0.05 * tf.random.uniform(tf.shape(labels))
# Train the Fake vs Real discriminator / d2
with tf.GradientTape() as tape:
predictions = self.discriminator_fake_vs_real(combined_features)
d2_loss = self.loss_fn_d2(labels, predictions)
grads = tape.gradient(d2_loss, self.discriminator_fake_vs_real.trainable_weights)
self.d2_optimizer.apply_gradients(
zip(grads, self.discriminator_fake_vs_real.trainable_weights)
)
# Train the discriminator Classifier / d1
with tf.GradientTape() as tape:
predictions = self.discriminator_classifier(generated_hog_fds)
d1_loss = self.loss_fn_d1(inverse_labels, predictions)
grads = tape.gradient(d1_loss, self.discriminator_classifier.trainable_weights)
self.d1_optimizer.apply_gradients(
zip(grads, self.discriminator_classifier.trainable_weights)
)
# Assemble labels that say "all real images"
misleading_labels = tf.zeros((batch_size, 1))
# Train the generator with computing loss from both discriminators (note that we should *not* update the weights
# of the discriminator)!
with tf.GradientTape() as tape:
predictions1 = self.discriminator_classifier(self.generator(trainImages))
predictions2 = self.discriminator_fake_vs_real(self.generator(trainImages))
g_loss_d1 = self.loss_fn_d1(inverse_labels, predictions1)
g_loss_d2 = self.loss_fn_d2(misleading_labels, predictions2)
g_loss = g_loss_d1 + g_loss_d2
grads = tape.gradient(g_loss, self.generator.trainable_weights)
self.g_optimizer.apply_gradients(zip(grads, self.generator.trainable_weights))
return "d1_loss": d1_loss, "d2_loss": d2_loss, "g_loss": g_loss
gan = GAN(discriminator_classifier=Hog_Feature_Vector_Classifier,
discriminator_fake_vs_real = real_vs_fake_Hog_Feature_Vector_Classifier,
generator=Hog_Feature_Vector_generator
)
gan.compile(
d1_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
d2_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
g_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
loss_fn_d1= keras.losses.categorical_crossentropy,
loss_fn_d2= keras.losses.BinaryCrossentropy(from_logits= True)
)
gan.fit(training_generator, epochs=2,
verbose = 2)
【讨论】:
以上是关于在子类模型中使用带有自定义 train_step 的 Keras ImageDataGenerator 时出错的主要内容,如果未能解决你的问题,请参考以下文章
使用带有子类 UINavigationController 的自定义 iOS 7 过渡偶尔会导致黑屏
:模型训练和预测的三种方法(fit&tf.GradientTape&train_step&tf.data)