如何使用 TFRecord 数据集使 TensorFlow + Keras 快速运行?

Posted

技术标签:

【中文标题】如何使用 TFRecord 数据集使 TensorFlow + Keras 快速运行?【英文标题】:How do you make TensorFlow + Keras fast with a TFRecord dataset? 【发布时间】:2017-06-30 07:20:09 【问题描述】:

什么是如何将 TensorFlow TFRecord 与 Keras 模型和 tf.session.run() 一起使用的示例,同时将数据集保持在带有队列运行器的张量中?

下面是一个有效的 sn-p,但它需要以下改进:

使用Model API 指定一个 Input() 从 TFRecord 加载数据集 并行运行数据集(例如使用 queuerunner)

这里是 sn-p,有几行 TODO 行表明需要什么:

from keras.models import Model
import tensorflow as tf
from keras import backend as K
from keras.layers import Dense, Input
from keras.objectives import categorical_crossentropy
from tensorflow.examples.tutorials.mnist import input_data

sess = tf.Session()
K.set_session(sess)

# Can this be done more efficiently than placeholders w/ TFRecords?
img = tf.placeholder(tf.float32, shape=(None, 784))
labels = tf.placeholder(tf.float32, shape=(None, 10))

# TODO: Use Input() 
x = Dense(128, activation='relu')(img)
x = Dense(128, activation='relu')(x)
preds = Dense(10, activation='softmax')(x)
# TODO: Construct model = Model(input=inputs, output=preds)

loss = tf.reduce_mean(categorical_crossentropy(labels, preds))

# TODO: handle TFRecord data, is it the same?
mnist_data = input_data.read_data_sets('MNIST_data', one_hot=True)

train_step = tf.train.GradientDescentOptimizer(0.5).minimize(loss)

sess.run(tf.global_variables_initializer())

# TODO remove default, add queuerunner
with sess.as_default():
    for i in range(1000):
        batch = mnist_data.train.next_batch(50)
        train_step.run(feed_dict=img: batch[0],
                                  labels: batch[1])
    print(loss.eval(feed_dict=img:    mnist_data.test.images, 
                               labels: mnist_data.test.labels))

为什么这个问题很重要?

无需返回 python 即可进行高性能训练 没有TFRecord to numpy 到张量的转换 Keras will soon be part of tensorflow 演示 Keras Model() 类如何正确接受输入数据的张量。

以下是语义分割问题示例的一些入门信息:

example unet Keras model unet.py,恰好是用于语义分割的。 Keras + Tensorflow Blog Post attempt at running the unet model a tf session with TFRecords and a Keras model(不工作) 创建 TFRecord 的代码:tf_records.py 在densenet_fcn.py 中尝试运行带有 TFRecords 的 tf 会话和 Keras 模型的 unet 模型(不工作)

【问题讨论】:

github.com/tensorflow/tensorflow/issues/8787 将致力于在已接受的答案中提供的快速修复之外全面支持此功能。 更新拉取请求github.com/fchollet/keras/pull/6928 【参考方案1】:

我不使用 tfrecord 数据集格式,因此不会争论优缺点,但我对扩展 Keras 以支持相同的格式很感兴趣。

github.com/indraforyou/keras_tfrecord 是存储库。将简要解释主要变化。

数据集创建和加载

data_to_tfrecordread_and_decode here 负责创建和加载 tfrecord 数据集。必须特别注意实施read_and_decode,否则您将在训练期间面临神秘错误。

初始化和Keras模型

现在tf.train.shuffle_batch 和 Keras Input 层都返回张量。但是tf.train.shuffle_batch 返回的那个没有 Keras 内部需要的元数据。事实证明,通过使用tensor 参数调用Input 层,任何张量都可以很容易地转换为带有keras 元数据的张量。

所以这需要初始化:

x_train_, y_train_ = ktfr.read_and_decode('train.mnist.tfrecord', one_hot=True, n_class=nb_classes, is_train=True)

x_train_batch, y_train_batch = K.tf.train.shuffle_batch([x_train_, y_train_],
                                                batch_size=batch_size,
                                                capacity=2000,
                                                min_after_dequeue=1000,
                                                num_threads=32) # set the number of threads here

x_train_inp = Input(tensor=x_train_batch)

现在使用x_train_inp 可以开发任何 keras 模型。

培训(简单)

假设train_out 是您的 keras 模型的输出张量。您可以轻松编写自定义训练循环:

loss = tf.reduce_mean(categorical_crossentropy(y_train_batch, train_out))
train_op = tf.train.GradientDescentOptimizer(0.01).minimize(loss)


# sess.run(tf.global_variables_initializer())
sess.run(tf.initialize_all_variables())

with sess.as_default():
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)

    try:
      step = 0
      while not coord.should_stop():
        start_time = time.time()

        _, loss_value = sess.run([train_op, loss], feed_dict=K.learning_phase(): 0)

        duration = time.time() - start_time

        if step % 100 == 0:
          print('Step %d: loss = %.2f (%.3f sec)' % (step, loss_value,
                                                     duration))
        step += 1
    except tf.errors.OutOfRangeError:
      print('Done training for %d epochs, %d steps.' % (FLAGS.num_epochs, step))
    finally:
      coord.request_stop()

    coord.join(threads)
    sess.close()

培训(keras 风格)

让 keras 如此有利可图的特性之一是其具有回调函数的通用训练机制。

但是为了支持 tfrecords 类型的训练,fit 函数需要进行一些更改

运行队列线程 不通过feed_dict 输入批处理数据 支持验证变得棘手,因为验证数据也将通过另一个张量进入,需要在内部创建不同的模型,共享上层和其他 tfrecord 读取器提供的验证张量。

但所有这些都可以通过另一个标志参数轻松支持。让事情变得一团糟的是 keras 功能 sample_weightclass_weight 它们用于称量每个样本和称量每个类。为此,在compile() 中,keras 创建了占位符 (here),并且还为目标 (here) 隐式创建了占位符,这在我们的情况下不需要标签已经由 tfrecord 读取器输入。这些占位符需要在会话运行期间输入,这在我们的 cae 中是不必要的。

所以考虑到这些变化,compile_tfrecord(here) 和 fit_tfrecord(here) 是 compilefit 的扩展,共享代码的 95%。

它们可以通过以下方式使用:

import keras_tfrecord as ktfr

train_model = Model(input=x_train_inp, output=train_out)
ktfr.compile_tfrecord(train_model, optimizer='rmsprop', loss='categorical_crossentropy', out_tensor_lst=[y_train_batch], metrics=['accuracy'])

train_model.summary()

ktfr.fit_tfrecord(train_model, X_train.shape[0], batch_size, nb_epoch=3)
train_model.save_weights('saved_wt.h5')

欢迎您改进代码和拉取请求。

【讨论】:

哇看起来棒极了!也许值得向官方 keras 上游存储库 keras-contrib 提出拉取请求?我会试试这个然后我希望我会奖励答案信用+赏金。我还编辑了链接以使用 keras 的当前版本哈希,因此行号将保持正确。 这里是keras-contrib pull request #27【参考方案2】:

2018-08-29 更新,现在 keras 直接支持此功能,请参见以下示例:

https://github.com/keras-team/keras/blob/master/examples/mnist_tfrecord.py

原答案:

使用外部损失支持 TFRecord。以下是构建外部损失的关键线:

# tf yield ops that supply dataset images and labels
x_train_batch, y_train_batch = read_and_decode_recordinput(...)

# create a basic cnn
x_train_input = Input(tensor=x_train_batch)
x_train_out = cnn_layers(x_train_input)

model = Model(inputs=x_train_input, outputs=x_train_out)
loss = keras.losses.categorical_crossentropy(y_train_batch, x_train_out)
model.add_loss(loss)

model.compile(optimizer='rmsprop', loss=None)

这是 Keras 2 的示例。它在应用小补丁 #7060 后可以工作:

'''MNIST dataset with TensorFlow TFRecords.

Gets to 99.25% test accuracy after 12 epochs
(there is still a lot of margin for parameter tuning).
'''
import os
import copy
import time

import numpy as np

import tensorflow as tf
from tensorflow.python.ops import data_flow_ops
from keras import backend as K
from keras.models import Model
from keras.layers import Dense
from keras.layers import Dropout
from keras.layers import Flatten
from keras.layers import Input
from keras.layers import Conv2D
from keras.layers import MaxPooling2D
from keras.callbacks import EarlyStopping
from keras.callbacks import TensorBoard
from keras.objectives import categorical_crossentropy
from keras.utils import np_utils
from keras.utils.generic_utils import Progbar
from keras import callbacks as cbks
from keras import optimizers, objectives
from keras import metrics as metrics_module

from keras.datasets import mnist

if K.backend() != 'tensorflow':
    raise RuntimeError('This example can only run with the '
                       'TensorFlow backend for the time being, '
                       'because it requires TFRecords, which '
                       'are not supported on other platforms.')


def images_to_tfrecord(images, labels, filename):
    def _int64_feature(value):
        return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

    def _bytes_feature(value):
        return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

    """ Save data into TFRecord """
    if not os.path.isfile(filename):
        num_examples = images.shape[0]

        rows = images.shape[1]
        cols = images.shape[2]
        depth = images.shape[3]

        print('Writing', filename)
        writer = tf.python_io.TFRecordWriter(filename)
        for index in range(num_examples):
            image_raw = images[index].tostring()
            example = tf.train.Example(features=tf.train.Features(feature=
                'height': _int64_feature(rows),
                'width': _int64_feature(cols),
                'depth': _int64_feature(depth),
                'label': _int64_feature(int(labels[index])),
                'image_raw': _bytes_feature(image_raw)))
            writer.write(example.SerializeToString())
        writer.close()
    else:
        print('tfrecord %s already exists' % filename)


def read_and_decode_recordinput(tf_glob, one_hot=True, classes=None, is_train=None,
                                batch_shape=[1000, 28, 28, 1], parallelism=1):
    """ Return tensor to read from TFRecord """
    print 'Creating graph for loading %s TFRecords...' % tf_glob
    with tf.variable_scope("TFRecords"):
        record_input = data_flow_ops.RecordInput(
            tf_glob, batch_size=batch_shape[0], parallelism=parallelism)
        records_op = record_input.get_yield_op()
        records_op = tf.split(records_op, batch_shape[0], 0)
        records_op = [tf.reshape(record, []) for record in records_op]
        progbar = Progbar(len(records_op))

        images = []
        labels = []
        for i, serialized_example in enumerate(records_op):
            progbar.update(i)
            with tf.variable_scope("parse_images", reuse=True):
                features = tf.parse_single_example(
                    serialized_example,
                    features=
                        'label': tf.FixedLenFeature([], tf.int64),
                        'image_raw': tf.FixedLenFeature([], tf.string),
                    )
                img = tf.decode_raw(features['image_raw'], tf.uint8)
                img.set_shape(batch_shape[1] * batch_shape[2])
                img = tf.reshape(img, [1] + batch_shape[1:])

                img = tf.cast(img, tf.float32) * (1. / 255) - 0.5

                label = tf.cast(features['label'], tf.int32)
                if one_hot and classes:
                    label = tf.one_hot(label, classes)

                images.append(img)
                labels.append(label)

        images = tf.parallel_stack(images, 0)
        labels = tf.parallel_stack(labels, 0)
        images = tf.cast(images, tf.float32)

        images = tf.reshape(images, shape=batch_shape)

        # StagingArea will store tensors
        # across multiple steps to
        # speed up execution
        images_shape = images.get_shape()
        labels_shape = labels.get_shape()
        copy_stage = data_flow_ops.StagingArea(
            [tf.float32, tf.float32],
            shapes=[images_shape, labels_shape])
        copy_stage_op = copy_stage.put(
            [images, labels])
        staged_images, staged_labels = copy_stage.get()

        return images, labels


def save_mnist_as_tfrecord():
    (X_train, y_train), (X_test, y_test) = mnist.load_data()
    X_train = X_train[..., np.newaxis]
    X_test = X_test[..., np.newaxis]
    images_to_tfrecord(images=X_train, labels=y_train, filename='train.mnist.tfrecord')
    images_to_tfrecord(images=X_test, labels=y_test, filename='test.mnist.tfrecord')


def cnn_layers(x_train_input):
    x = Conv2D(32, (3, 3), activation='relu', padding='valid')(x_train_input)
    x = Conv2D(64, (3, 3), activation='relu')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Dropout(0.25)(x)
    x = Flatten()(x)
    x = Dense(128, activation='relu')(x)
    x = Dropout(0.5)(x)
    x_train_out = Dense(classes,
                        activation='softmax',
                        name='x_train_out')(x)
    return x_train_out


sess = tf.Session()
K.set_session(sess)

save_mnist_as_tfrecord()

batch_size = 100
batch_shape = [batch_size, 28, 28, 1]
epochs = 3000
classes = 10
parallelism = 10

x_train_batch, y_train_batch = read_and_decode_recordinput(
    'train.mnist.tfrecord',
    one_hot=True,
    classes=classes,
    is_train=True,
    batch_shape=batch_shape,
    parallelism=parallelism)

x_test_batch, y_test_batch = read_and_decode_recordinput(
    'test.mnist.tfrecord',
    one_hot=True,
    classes=classes,
    is_train=True,
    batch_shape=batch_shape,
    parallelism=parallelism)


x_batch_shape = x_train_batch.get_shape().as_list()
y_batch_shape = y_train_batch.get_shape().as_list()

x_train_input = Input(tensor=x_train_batch, batch_shape=x_batch_shape)
x_train_out = cnn_layers(x_train_input)
y_train_in_out = Input(tensor=y_train_batch, batch_shape=y_batch_shape, name='y_labels')
cce = categorical_crossentropy(y_train_batch, x_train_out)
train_model = Model(inputs=[x_train_input], outputs=[x_train_out])
train_model.add_loss(cce)

train_model.compile(optimizer='rmsprop',
                    loss=None,
                    metrics=['accuracy'])
train_model.summary()

tensorboard = TensorBoard()

# tensorboard disabled due to Keras bug
train_model.fit(batch_size=batch_size,
                epochs=epochs)  # callbacks=[tensorboard])

train_model.save_weights('saved_wt.h5')

K.clear_session()

# Second Session, pure Keras
(X_train, y_train), (X_test, y_test) = mnist.load_data()
X_train = X_train[..., np.newaxis]
X_test = X_test[..., np.newaxis]
x_test_inp = Input(batch_shape=(None,) + (X_test.shape[1:]))
test_out = cnn_layers(x_test_inp)
test_model = Model(inputs=x_test_inp, outputs=test_out)

test_model.load_weights('saved_wt.h5')
test_model.compile(optimizer='rmsprop', loss='categorical_crossentropy', metrics=['accuracy'])
test_model.summary()

loss, acc = test_model.evaluate(X_test, np_utils.to_categorical(y_test), classes)
print('\nTest accuracy: 0'.format(acc))

我还一直致力于在以下问题和拉取请求中改进对 TFRecords 的支持:

#6928 Yield Op 支持:通过 TFRecords 和 RecordInput 实现高性能大型数据集 #7102 Keras Input Tensor API 设计提案

终于可以使用tf.contrib.learn.Experiment在TensorFlow中训练Keras模型了。

【讨论】:

在合并 PR github.com/fchollet/keras/pull/7060 修复 generic_utils.py 模块后,我只能让这个示例处理外部损失。 由于某种原因,data_flow_ops.RecordInput 只返回了第一批,然后 Keras 认为这个 epoch 已经完成,并重新启动另一个 epoch。我不知道为什么。我知道您很难看到发生了什么,但是您对如何调试有什么建议吗?非常感谢。我确定我传入的 tfrecord 文件是正确的(有超过 60k 的图像)。 @NullSpace 这应该是一个单独的 *** 问题。使用当前的 keras master 一步 == 一个 epoch,所以只需重复运行或尝试github.com/fchollet/keras/pull/7113。

以上是关于如何使用 TFRecord 数据集使 TensorFlow + Keras 快速运行?的主要内容,如果未能解决你的问题,请参考以下文章

TFRecord数据处理

tensorflow读取tfrecord数据集

tensorflow models api:ValueError: Tensor conversion requested dtype string for Tensor with dtype flo

如何将numpy数组存储为tfrecord?

TFRecord 的使用

将图像/掩码对转换为tfrecord