在 TensorFlow 中导入巨大的非图像数据集

Posted

技术标签:

【中文标题】在 TensorFlow 中导入巨大的非图像数据集【英文标题】:Import huge non-image dataset in TensorFlow 【发布时间】:2018-11-21 16:02:24 【问题描述】:

我有一个大数据集(300.000 个示例 x 33.000 个特征),这当然不适合内存。数据以 HDF5 格式保存。这些值大多为零(稀疏数据)。它们看起来像这样:

           Attr1    52  52  52  52  52  52  52  52 ...
           Attr2    umb umb umb umb umb umb umb umb ...
           CellID   TGC-1 TGG-1 CAG-1 TTC-1 GTG-1 GTA-1 CAA-1 CAC-1 ...

Acc     Gene                                      ...
243485  RP11-.3     0   0   0   0   0   0   0   0 ...
237613  FAM138A     0   0   0   0   0   0   0   0 ...
186092  OR4F5       0   0   0   0   0   0   0   0 ...
238009  RP11-.7     0   0   0   0   0   0   0   0 ...
239945  RP11-.8     0   0   0   0   0   0   0   0 ...
279457  FO538.2     0   0   0   0   0   0   0   0 ...
228463  AP006.2     0   0   0   0   0   0   0   0 ...
...     ...         ... ... ... ... ... ... ... ...

我已经完成了以下工作,将整个数据集加载到 TensorFlow 中(loompy 只是一个在后台使用 hdf5 的包):

import tensorflow as tf
import numpy as np
import loompy as lp

batch_size = 1000

with loompy.connect(filename, 'r') as ds:
    ds_shape = (batch_size, ds.shape[0])
    ds_dtype = ds[0:1, 0:1].dtype

    labels = np.asarray([ds.ca.CellID, ds.ca.Attr1]).T
    labels_shape = (batch_size, 1)

data_placeholder = tf.placeholder(ds_dtype, ds_shape)
labels_placeholder = tf.placeholder(labels[:,1].dtype, labels_shape)

dataset = tf.data.Dataset.from_tensor_slices((data_placeholder, labels_placeholder))
dataset = dataset.prefetch(batch_size)
iterator = dataset.make_initializable_iterator()
next_element = iterator.get_next()

with tf.Session() as sess:
    with loompy.connect(filename, 'r') as ds:
        for i in range(0, ds.shape[1], batch_size):
            batch = ds[0 : ds_shape[1], i : i + batch_size].T
            batch_labels = np.asarray([ds.ca.CellID[i : i + batch_size],
                                       ds.ca.Attr1[i : i + batch_size]]).T[:,1]

            sess.run(iterator.initializer, feed_dict = data_placeholder: batch,
                       labels_placeholder: batch_labels.reshape(batch_size, 1))

            for _ in range(batch_size):
                print(sess.run(next_element))

输出:

(array([0, 0, 0, ..., 0, 0, 0], dtype=int32), array([b'52'], dtype=object))

(array([0, 0, 0, ..., 0, 0, 0], dtype=int32), array([b'52'], dtype=object))

...

但是,通过这种方式,我无法在训练、测试和评估集中拆分数据。另外,我只能在每个批次中对其进行洗牌,这并不有效,因为大多数时候批次上的数据属于同一类。

如何处理此类数据,以便能够将它们加载为训练集、测试集、评估集并执行洗牌等(最好尽可能多地利用我的 TitanX GPU)?

【问题讨论】:

您可能想要使用 tfrecords 并将它们存储在稀疏特征中:tensorflow.org/versions/r1.2/api_docs/python/tf/SparseFeature @vijaym TFRecords 的问题是大多数示例都与图像相关,所以我还没有弄清楚如何使用这样的数据集来做到这一点。你能指出我正确的来源吗? 您可以查看我对 Numpy 数组的回答,您可能需要对稀疏矩阵进行一些更改:***.com/questions/45427637/… 您是否愿意对磁盘上的数据进行第二次(混洗和拆分)副本?有许多大型数据处理工具可以为您做到这一点。 @Omegastick 我对所有可以正常工作的东西持开放态度。你能说出你所说的工具吗? 【参考方案1】:

您绝对应该尝试Dask,它允许您处理不适合内存的数据,并且它会导致计算瘫痪,以便您可以使用 cpu 的所有内核。此外,我建议将您的数据从 hdf 移动到 parquet,它允许并发读取和写入,从而加快速度。请查看 Wes McKinney(熊猫创建者)深入了解并与其他格式进行比较的链接。

您可以在 Dask 中准备 sn-ps,准备训练、测试和验证集并在不超过可用内存的情况下读取它们。

【讨论】:

它们与 TensorFlow 的融合程度如何?我在网上找不到太多信息,除了一些人在尝试它。没有太多时间进行实验,我只想要一些效果很好的东西,这样我就可以专注于构建模型。 据我所知,它不会混合任何库来并行处理大量数据,您必须编写一个满足您需求的函数,不会预先构建任何东西来提供张量流模型。幸好 api 和 pandas 中的几乎一模一样,所以非常简单。 好吧,我使用的 loom 格式与 Pandas 数据框非常相似,但这并不能帮助我导入数据并进行我想要的数据处理。我需要一个关于如何处理这些大数据以在 TensorFlow 中执行不同任务(例如洗牌)的答案。 这正是我推荐 parquet + dask 的原因,它允许您处理比 RAM 大得多的文件,就像使用 pandas 一样。【参考方案2】:

如果有人仍然对此主题感兴趣,这是我对这个问题的解决方案。最后我坚持使用 Loompy 文件格式,因为我正在做的事情真的很方便(看看Loompy here)。为了在我的模型中导入如此大量的信息,我使用了tf.data.Dataset TensorFlow API 的from_generator() 函数。另外,我创建了一个生成器来根据需要生成数据。

下面是我的输入函数的样子:

import loompy as lp
import tensorflow as tf
from sklearn.model_selection import train_test_split

model_input_name = ""
input_size = 10000
batch_size = 32
epochs = 10

# Input functions for train, test and eval sets.
def train_input_fn():
    return _input_fn('TRAIN')

def test_input_fn():
    return _input_fn('TEST')

def eval_input_fn():
    return _input_fn('EVAL')

# General purpose input function
def _input_fn(mode = 'TRAIN'):

    """
        Arguments
            mode : 'TRAIN', 'TEST', 'EVAL'
    """

    # A generator to yield data and labels from the given FILE,
    # based on the indices assigned to the "indices" variable.
    # If you change the labels, remember to update the from_generator()
    # parameters below, to reflect their datatype.
    def gen():
        with lp.connect(FILE, 'r') as ds:
            if ae:
                for i in indices:
                    yield model_input_name: ds[:, i], ds[:, i]
            else:
                for i in indices:
                    yield model_input_name: ds[:, i], ds.ca.x_CellType[i]

    # Get the indices for train, test and eval sets
    train_idx, test_idx, eval_idx = train_test_set_idx_split(TRAIN_RT, TEST_RT, EVAL_RT)

    # Check condition and assign the respective set to the "indices" variable
    if mode == 'TRAIN':
        indices = train_idx
    elif mode == 'TEST':
        indices = test_idx
    elif mode == 'EVAL':
        indices = eval_idx
    else:
        print("Wrong mode choice: ", mode)
        exit(1)

    dataset = tf.data.Dataset.from_generator(gen, (model_input_name: tf.int64, tf.int64),
                                             output_shapes=(model_input_name: [input_size,], []))

    # Shuffle, batch, map, prefetch and repeat your dataset.
    # If you need to do some preprocessing on the data, create your function on
    # the cell above, and call it within a map() function.

    dataset = dataset.shuffle(buffer_size=batch_size*50)
    dataset = dataset.batch(batch_size)

    dataset = dataset.map(_reshape_labels)
    dataset = dataset.map(_int2float)

    # Map on whatever other functions you need
    dataset = dataset.map( ... )

    dataset = dataset.prefetch(2)
    dataset = dataset.repeat(epochs)

    iterator = dataset.make_one_shot_iterator()

    return iterator.get_next()


# Get train, test, eval indices for the given dataset
def train_test_set_idx_split(train_rt, test_rt, eval_rt):
    """ This function returns indices for the train, test and evaluation sets,
        given an input Dataset.
        Arguments:
            train_rt: ratio of the train dataset
            test_rt:  ratio of the test dataset
            eval_rt:  ratio of the evaluation dataset

        Returns:
            train_idx: indices (of the given dataset) for the train dataset
            test_idx:  indices (of the given dataset) for the test dataset
            evel_idx:  indices (of the given dataset) for the evaluation dataset

        Note:
            This function will work correctly as long as (test_rt == evel_rt) is True.
            If you need (test_rt != evel_rt), you need something more sophisticated.
    """

    with lp.connect(FILE, 'r') as ds:
        idx = np.array(range(0, ds.shape[1]))

    train_idx, test_idx = train_test_split(idx, train_size=train_rt, test_size=test_rt+eval_rt)
    test_idx, eval_idx = train_test_split(test_idx, train_size=0.5, test_size=0.5)

    return train_idx, test_idx, eval_idx

# Reshape labels as needed
def _reshape_labels(data, labels):
    return data, tf.reshape(labels, (-1,1))

【讨论】:

【参考方案3】:

dask 和 tensorflow 等机器学习框架之间的差距实际上是分布式内存缓存。

我们可以看到dask对于预处理部分来说是一个非常不错的选择,但是如何将预处理后的数据传输到tensorflow是我们比较担心的。

Vineyard(v6d.io) 通过提供与 dask 等数据引擎和 tf 和 pytorch 等 ml 引擎的集成来解决中间数据共享问题。 这是一个示例 (https://v6d.io/examples/distributed-learning.html),展示了如何将预处理后的数据从 dask 传输到 horovod.keras,希望对您有所帮助。

【讨论】:

以上是关于在 TensorFlow 中导入巨大的非图像数据集的主要内容,如果未能解决你的问题,请参考以下文章

在数组中导入数据集时,谷歌 colab 中的 ram 用完了

TensorFlow - 从 TensorBoard TFEvent 文件中导入数据?

无法在 vscode 中导入 tensorflow

在 tensorflow 2 中导入 tensorflow 模块很慢

尝试在Anaconda中导入Tensorflow时出错

无法在 tensorflow r1.14 中导入“tensorflow.contrib.tensorrt”