如何正确结合 TensorFlow 的 Dataset API 和 Keras?

Posted

技术标签:

【中文标题】如何正确结合 TensorFlow 的 Dataset API 和 Keras?【英文标题】:How to Properly Combine TensorFlow's Dataset API and Keras? 【发布时间】:2018-02-18 12:04:13 【问题描述】:

Keras 的fit_generator() 模型方法需要一个生成器,该生成器生成形状为(输入、目标)的元组,其中两个元素都是 NumPy 数组。 The documentation 似乎暗示如果我简单地将 Dataset iterator 包装在生成器中,并确保将张量转换为 NumPy 数组,我应该很高兴。然而,这段代码给了我一个错误:

import numpy as np
import os
import keras.backend as K
from keras.layers import Dense, Input
from keras.models import Model
import tensorflow as tf
from tensorflow.contrib.data import Dataset

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

with tf.Session() as sess:
    def create_data_generator():
        dat1 = np.arange(4).reshape(-1, 1)
        ds1 = Dataset.from_tensor_slices(dat1).repeat()

        dat2 = np.arange(5, 9).reshape(-1, 1)
        ds2 = Dataset.from_tensor_slices(dat2).repeat()

        ds = Dataset.zip((ds1, ds2)).batch(4)
        iterator = ds.make_one_shot_iterator()
        while True:
            next_val = iterator.get_next()
            yield sess.run(next_val)

datagen = create_data_generator()

input_vals = Input(shape=(1,))
output = Dense(1, activation='relu')(input_vals)
model = Model(inputs=input_vals, outputs=output)
model.compile('rmsprop', 'mean_squared_error')
model.fit_generator(datagen, steps_per_epoch=1, epochs=5,
                    verbose=2, max_queue_size=2)

这是我得到的错误:

Using TensorFlow backend.
Epoch 1/5
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/home/jsaporta/anaconda3/lib/python3.6/site-packages/tensorflow/python/client/session.py", line 270, in __init__
    fetch, allow_tensor=True, allow_operation=True))
  File "/home/jsaporta/anaconda3/lib/python3.6/site-packages/tensorflow/python/framework/ops.py", line 2708, in as_graph_element
    return self._as_graph_element_locked(obj, allow_tensor, allow_operation)
  File "/home/jsaporta/anaconda3/lib/python3.6/site-packages/tensorflow/python/framework/ops.py", line 2787, in _as_graph_element_locked
    raise ValueError("Tensor %s is not an element of this graph." % obj)
ValueError: Tensor Tensor("IteratorGetNext:0", shape=(?, 1), dtype=int64) is not an element of this graph.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/jsaporta/anaconda3/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/home/jsaporta/anaconda3/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/home/jsaporta/anaconda3/lib/python3.6/site-packages/keras/utils/data_utils.py", line 568, in data_generator_task
    generator_output = next(self._generator)
  File "./datagen_test.py", line 25, in create_data_generator
    yield sess.run(next_val)
  File "/home/jsaporta/anaconda3/lib/python3.6/site-packages/tensorflow/python/client/session.py", line 895, in run
    run_metadata_ptr)
  File "/home/jsaporta/anaconda3/lib/python3.6/site-packages/tensorflow/python/client/session.py", line 1109, in _run
    self._graph, fetches, feed_dict_tensor, feed_handles=feed_handles)
  File "/home/jsaporta/anaconda3/lib/python3.6/site-packages/tensorflow/python/client/session.py", line 413, in __init__
    self._fetch_mapper = _FetchMapper.for_fetch(fetches)
  File "/home/jsaporta/anaconda3/lib/python3.6/site-packages/tensorflow/python/client/session.py", line 233, in for_fetch
    return _ListFetchMapper(fetch)
  File "/home/jsaporta/anaconda3/lib/python3.6/site-packages/tensorflow/python/client/session.py", line 340, in __init__
    self._mappers = [_FetchMapper.for_fetch(fetch) for fetch in fetches]
  File "/home/jsaporta/anaconda3/lib/python3.6/site-packages/tensorflow/python/client/session.py", line 340, in <listcomp>
    self._mappers = [_FetchMapper.for_fetch(fetch) for fetch in fetches]
  File "/home/jsaporta/anaconda3/lib/python3.6/site-packages/tensorflow/python/client/session.py", line 241, in for_fetch
    return _ElementFetchMapper(fetches, contraction_fn)
  File "/home/jsaporta/anaconda3/lib/python3.6/site-packages/tensorflow/python/client/session.py", line 277, in __init__
    'Tensor. (%s)' % (fetch, str(e)))
ValueError: Fetch argument <tf.Tensor 'IteratorGetNext:0' shape=(?, 1) dtype=int64> cannot be interpreted as a Tensor. (Tensor Tensor("IteratorGetNext:0", shape=(?, 1), dtype=int64) is not an element of this graph.)

Traceback (most recent call last):
  File "./datagen_test.py", line 34, in <module>
    verbose=2, max_queue_size=2)
  File "/home/jsaporta/anaconda3/lib/python3.6/site-packages/keras/legacy/interfaces.py", line 87, in wrapper
    return func(*args, **kwargs)
  File "/home/jsaporta/anaconda3/lib/python3.6/site-packages/keras/engine/training.py", line 2011, in fit_generator
    generator_output = next(output_generator)
StopIteration

奇怪的是,在我初始化 datagen 之后直接添加包含 next(datagen) 的行会导致代码运行良好,没有错误。

为什么我的原始代码不起作用?为什么当我将该行添加到我的代码时它开始工作?有没有更有效的方法将 TensorFlow 的 Dataset API 与 Keras 一起使用,而不涉及将张量转换为 NumPy 数组并再次转换回来?

【问题讨论】:

我不确定是不是这个原因,但我觉得你在 with 块中定义一个函数真的很奇怪。 显然,将 with 块放在生成器定义中确实可以使代码在有和没有额外行的情况下都可以工作,尽管我可以发誓我首先尝试过这种方式。不过,考虑到(我认为)TensorFlow Sessions 的工作方式,我不明白为什么它应该有所作为。另一个谜。 with 块不会在会话结束时关闭会话吗?我认为它真的不应该包含将在它之外使用的定义......如果我将其作为问题的答案发布,它会被标记为已回答吗? 我不认为这个问题会得到回答。如果我们将sess = tf.InteractiveSession() 放在文件顶部并将with 块更改为with sess.as_default()(并将它放在生成器定义中),我们会得到与以前相同的错误。更改交互式会话并完全删除 with 块(因为它将自己设置为默认会话),也会产生相同的错误。我不清楚这是问题的症结所在。 我认为这确实是图表的“脱节”。一旦你在一个 numpy 数组中转换一个张量,你就会失去连接(它不再是一个张量)。有没有办法创建并行会话?也许您的生成器应该在其中创建子会话(独立于运行模型的会话),所以这样它就不会期望连接? 【参考方案1】:

其他答案很好,但重要的是要注意,直接将 from_tensor_slices 与大型 numpy 数组一起使用可以快速填满您的内存,因为 IIRC,值将作为 tf.constants 复制到图表中。根据我的经验,这将导致静默失败,训练最终会开始,但损失等方面不会有任何改善。

更好的方法是使用占位符。例如。这是我为图像及其 onehot 目标创建生成器的代码:

def create_generator_tf_dataset(self, images, onehots, batch_size):
    # Get shapes
    img_size = images.shape
    img_size = (None, img_size[1], img_size[2], img_size[3])
    onehot_size = onehots.shape
    onehot_size = (None, onehot_size[1])

    # Placeholders
    images_tensor = tf.placeholder(tf.float32, shape=img_size)
    onehots_tensor = tf.placeholder(tf.float32, shape=onehot_size)

    # Dataset
    dataset = tf.data.Dataset.from_tensor_slices((images_tensor, onehots_tensor))
    # Map function (e.g. augmentation)
    if map_fn is not None:
        dataset = dataset.map(lambda x, y: (map_fn(x), y), num_parallel_calls=tf.data.experimental.AUTOTUNE)
    # Combined shuffle and infinite repeat
    dataset = dataset.apply(
        tf.data.experimental.shuffle_and_repeat(len(images), None))  
    dataset = dataset.batch(batch_size)
    dataset = dataset.prefetch(1)

    # Make the iterator
    iterator = dataset.make_initializable_iterator()
    init_op = iterator.initializer
    next_val = iterator.get_next()

    with K.get_session().as_default() as sess:
        sess.run(init_op, feed_dict=images_tensor: images, onehots_tensor: onehots)
        while True:
            inputs, labels = sess.run(next_val)
            yield inputs, labels

【讨论】:

【参考方案2】:

根据我最近的经验,一个重要的观察结果是使用 tf.keras 而不是原生 keras。为我工作 tf > 1.12。

希望它也可以帮助其他人。

【讨论】:

【参考方案3】:

2018 年 6 月 9 日更新

从 Tensorflow 1.9 开始,可以将tf.data.Dataset 对象直接传递给keras.Model.fit(),其作用类似于fit_generator。 可以在此 gist 上找到完整的示例。
# Load mnist training data
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
training_set = tfdata_generator(x_train, y_train,is_training=True)

model = # your keras model here              
model.fit(
    training_set.make_one_shot_iterator(),
    steps_per_epoch=len(x_train) // 128,
    epochs=5,
    verbose = 1)
tfdata_generator 是一个返回可迭代 tf.data.Dataset 的函数。
def tfdata_generator(images, labels, is_training, batch_size=128):
  '''Construct a data generator using `tf.Dataset`. '''

  def map_fn(image, label):
      '''Preprocess raw data to trainable input. '''
    x = tf.reshape(tf.cast(image, tf.float32), (28, 28, 1))
    y = tf.one_hot(tf.cast(label, tf.uint8), _NUM_CLASSES)
    return x, y

  dataset = tf.data.Dataset.from_tensor_slices((images, labels))

  if is_training:
    dataset = dataset.shuffle(1000)  # depends on sample size
  dataset = dataset.map(map_fn)
  dataset = dataset.batch(batch_size)
  dataset = dataset.repeat()
  dataset = dataset.prefetch(tf.contrib.data.AUTOTUNE)

  return dataset

旧解决方案:

除了@Yu-Yang的回答,你还可以修改tf.data.Dataset成为fit_generator的生成器,如下

from tensorflow.contrib.learn.python.learn.datasets import mnist

data   = mnist.load_mnist()
model  = # your Keras model
model.fit_generator(generator = tfdata_generator(data.train.images, data.train.labels),
                    steps_per_epoch=200,
                    workers = 0 , # This is important
                    verbose = 1)


def tfdata_generator(images, labels, batch_size=128, shuffle=True,):
    def map_func(image, label):
        '''A transformation function'''
        x_train = tf.reshape(tf.cast(image, tf.float32), image_shape)
        y_train = tf.one_hot(tf.cast(label, tf.uint8), num_classes)
        return [x_train, y_train]

    dataset  = tf.data.Dataset.from_tensor_slices((images, labels))
    dataset  = dataset.map(map_func)
    dataset  = dataset.shuffle().batch(batch_size).repeat()
    iterator = dataset.make_one_shot_iterator()

    next_batch = iterator.get_next()
    while True:
        yield K.get_session().run(next_batch)

【讨论】:

这是 AFAIK 向 keras 提供验证数据的唯一方法,使用 fit_generator 的 validation_data 参数 训练完模型后在evaluate_generator()中再次使用tfdata_generator()时,遇到了同样的tfdata_generate()函数无法运行的问题。似乎命令“K.get_session()”不能运行两次。有没有想过纠正这个问题? workers=0这一行很重要。基本上,如果workers > 0,你最终会遇到多线程问题,因为多个线程试图评估同一个生成器。如果你调用一次生成器(初始化它),它会工作,因为你已经创建了端点,但你可能会得到奇怪的结果,因为它不是线程安全的 K.get_session().run(next_batch) 的结果将是一个 numpy 数组列表,不是吗?我认为这个想法是避免回到 python 层并留在 tensorflow 中...... 我想知道当make_one_shot_iterator() 只支持通过数据集迭代一次时,Keras 是如何做到 5 个 epoch 的?【参考方案4】:

@Yu_Yang 和@Dat-Nguyen 的解决方案都可以正常工作。通过使用可馈送迭代器并将验证集的句柄作为验证“数据”传递,@Yu-Yang 的解决方案也可以在训练期间支持验证集。这有点令人费解,但它确实有效。

您还可以将 Keras 模型转换为 Estimator,它们支持数据集:

estimator = tf.keras.estimator.model_to_estimator(keras_model=model,
                                                  model_dir=model_dir)
input_name = model.layers[0].input.op.name

def input_fn(dataset):
    dataset = dataset.map(lambda X,y: input_name: X, y)
    return dataset.make_one_shot_iterator().get_next()

train_spec = tf.estimator.TrainSpec(
    input_fn=lambda: input_fn(train_set), max_steps=100)
eval_spec = tf.estimator.EvalSpec(
    input_fn=lambda: input_fn(test_set))

tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)

【讨论】:

你能详细说明一下@Yu-Yang 的解决方案如何获取验证数据吗?我尝试传递 validation_data 参数,但 Keras 在使用本机张量时会引发错误【参考方案5】:

如果您使用 Pandas 库创建 TensorFlow 数据集,这里有一个解决方案。请注意,如果没有tf.reshape(),此代码将无法工作,因为出于某种原因,来自tf.py_func() 的张量没有形状信息。所以这不适用于tuple。有人有解决方法吗?

def _get_input_data_for_dataset(file_name):
     df_input=pd.read_csv(file_name.decode(),usecols=['Wind_MWh'])            

     X_data = df_input.as_matrix()

     return X_data.astype('float32', copy=False)

X_dataset = tf.data.Dataset.from_tensor_slices(file_names)
X_dataset = X_dataset.flat_map(lambda file_name: tf.data.Dataset.from_tensor_slices(
                            tf.reshape(tf.py_func(_get_input_data_for_dataset,[file_name], tf.float32),[-1,1])))

X_dataset = X_dataset.batch(5)
X_iter = X_dataset.make_one_shot_iterator()
X_batch = X_iter.get_next()
input_X1 = Input(tensor= X_batch ,name='input_X1')

y1 = Dense(units=64, activation='relu',kernel_initializer=tf.keras.initializers.Constant(1),name='layer_FC1')(input_X1)

【讨论】:

【参考方案6】:

确实有一种更有效的方式来使用Dataset,而无需将张量转换为numpy 数组。但是,它(还没有?)在官方文档中。从发行说明来看,它是 Keras 2.0.7 中引入的一个功能。您可能需要安装 keras>=2.0.7 才能使用它。

x = np.arange(4).reshape(-1, 1).astype('float32')
ds_x = Dataset.from_tensor_slices(x).repeat().batch(4)
it_x = ds_x.make_one_shot_iterator()

y = np.arange(5, 9).reshape(-1, 1).astype('float32')
ds_y = Dataset.from_tensor_slices(y).repeat().batch(4)
it_y = ds_y.make_one_shot_iterator()

input_vals = Input(tensor=it_x.get_next())
output = Dense(1, activation='relu')(input_vals)
model = Model(inputs=input_vals, outputs=output)
model.compile('rmsprop', 'mse', target_tensors=[it_y.get_next()])
model.fit(steps_per_epoch=1, epochs=5, verbose=2)

几个不同之处:

    tensor 参数提供给Input 层。 Keras 将从该张量中读取值,并将其用作拟合模型的输入。 将target_tensors 参数提供给Model.compile()。 记得将 x 和 y 都转换为 float32。在正常使用情况下,Keras 会为你做这个转换。但现在您必须自己动手。 在Dataset 的构造过程中指定了批处理大小。使用steps_per_epochepochs 控制何时停止模型拟合。

简而言之,如果要从张量中读取数据,请使用 Input(tensor=...)model.compile(target_tensors=...)model.fit(x=None, y=None, ...)

【讨论】:

看起来甚至没有必要拥有两个单独的迭代器。您可以压缩这两个数据集,创建一个类似next_val = it.get_next() 的节点,并将其输出元素提供给Input()Model.compile() 函数。 迭代器初始化怎么样?我可以以某种方式告诉 keras 在每个时代初始化它吗?还是我仍然需要创建会话并手动执行,然后每次只运行一个 epoch? 所以我尝试了这种方法,它不仅使模型很难使用(您必须保存并重新加载它才能为其提供验证数据等输入),而且它似乎也是较慢(kaggle.com/kmader/tensorflow-data-keras-tensors-retinopathy vs kaggle.com/kmader/inceptionv3-for-retinopathy-gpu-hr)从每次迭代 10 秒(使用 session.run() 和正常拟合)到每次迭代 15 秒(使用 Input(tensor= 和 target_tensors) @yu-yang 如果我们要保存这样创建的模型,当我们加载它时,它有一个普通的Input 层。我们如何将 TF 张量绑定到这个输入? 我认为最好的方法是使用相同的代码重新创建模型,然后使用model.load_weights(加载model.save_weights保存的权重文件)。 official example 也使用这种方法来保存/加载模型。

以上是关于如何正确结合 TensorFlow 的 Dataset API 和 Keras?的主要内容,如果未能解决你的问题,请参考以下文章

如何将 Cloud TPU 与 Tensorflow Lite Model Maker 结合使用?

将 Keras 和 Tensorflow 与 AMD GPU 结合使用

Tensorflow:如何正确使用 Adam 优化器

Tensorflow显示图片

如何正确地重新标记 TensorFlow 数据集?

sh 如何正确安装tensorflow_cc