在 TensorFlow 中导入巨大的非图像数据集
Posted
技术标签:
【中文标题】在 TensorFlow 中导入巨大的非图像数据集【英文标题】:Import huge non-image dataset in TensorFlow 【发布时间】:2018-11-21 16:02:24 【问题描述】:我有一个大数据集(300.000 个示例 x 33.000 个特征),这当然不适合内存。数据以 HDF5 格式保存。这些值大多为零(稀疏数据)。它们看起来像这样:
Attr1 52 52 52 52 52 52 52 52 ...
Attr2 umb umb umb umb umb umb umb umb ...
CellID TGC-1 TGG-1 CAG-1 TTC-1 GTG-1 GTA-1 CAA-1 CAC-1 ...
Acc Gene ...
243485 RP11-.3 0 0 0 0 0 0 0 0 ...
237613 FAM138A 0 0 0 0 0 0 0 0 ...
186092 OR4F5 0 0 0 0 0 0 0 0 ...
238009 RP11-.7 0 0 0 0 0 0 0 0 ...
239945 RP11-.8 0 0 0 0 0 0 0 0 ...
279457 FO538.2 0 0 0 0 0 0 0 0 ...
228463 AP006.2 0 0 0 0 0 0 0 0 ...
... ... ... ... ... ... ... ... ... ...
我已经完成了以下工作,将整个数据集加载到 TensorFlow 中(loompy
只是一个在后台使用 hdf5 的包):
import tensorflow as tf
import numpy as np
import loompy as lp
batch_size = 1000
with loompy.connect(filename, 'r') as ds:
ds_shape = (batch_size, ds.shape[0])
ds_dtype = ds[0:1, 0:1].dtype
labels = np.asarray([ds.ca.CellID, ds.ca.Attr1]).T
labels_shape = (batch_size, 1)
data_placeholder = tf.placeholder(ds_dtype, ds_shape)
labels_placeholder = tf.placeholder(labels[:,1].dtype, labels_shape)
dataset = tf.data.Dataset.from_tensor_slices((data_placeholder, labels_placeholder))
dataset = dataset.prefetch(batch_size)
iterator = dataset.make_initializable_iterator()
next_element = iterator.get_next()
with tf.Session() as sess:
with loompy.connect(filename, 'r') as ds:
for i in range(0, ds.shape[1], batch_size):
batch = ds[0 : ds_shape[1], i : i + batch_size].T
batch_labels = np.asarray([ds.ca.CellID[i : i + batch_size],
ds.ca.Attr1[i : i + batch_size]]).T[:,1]
sess.run(iterator.initializer, feed_dict = data_placeholder: batch,
labels_placeholder: batch_labels.reshape(batch_size, 1))
for _ in range(batch_size):
print(sess.run(next_element))
输出:
(array([0, 0, 0, ..., 0, 0, 0], dtype=int32), array([b'52'], dtype=object))
(array([0, 0, 0, ..., 0, 0, 0], dtype=int32), array([b'52'], dtype=object))
...
但是,通过这种方式,我无法在训练、测试和评估集中拆分数据。另外,我只能在每个批次中对其进行洗牌,这并不有效,因为大多数时候批次上的数据属于同一类。
如何处理此类数据,以便能够将它们加载为训练集、测试集、评估集并执行洗牌等(最好尽可能多地利用我的 TitanX GPU)?
【问题讨论】:
您可能想要使用 tfrecords 并将它们存储在稀疏特征中:tensorflow.org/versions/r1.2/api_docs/python/tf/SparseFeature @vijaym TFRecords 的问题是大多数示例都与图像相关,所以我还没有弄清楚如何使用这样的数据集来做到这一点。你能指出我正确的来源吗? 您可以查看我对 Numpy 数组的回答,您可能需要对稀疏矩阵进行一些更改:***.com/questions/45427637/… 您是否愿意对磁盘上的数据进行第二次(混洗和拆分)副本?有许多大型数据处理工具可以为您做到这一点。 @Omegastick 我对所有可以正常工作的东西持开放态度。你能说出你所说的工具吗? 【参考方案1】:您绝对应该尝试Dask,它允许您处理不适合内存的数据,并且它会导致计算瘫痪,以便您可以使用 cpu 的所有内核。此外,我建议将您的数据从 hdf 移动到 parquet,它允许并发读取和写入,从而加快速度。请查看 Wes McKinney(熊猫创建者)深入了解并与其他格式进行比较的链接。
您可以在 Dask 中准备 sn-ps,准备训练、测试和验证集并在不超过可用内存的情况下读取它们。
【讨论】:
它们与 TensorFlow 的融合程度如何?我在网上找不到太多信息,除了一些人在尝试它。没有太多时间进行实验,我只想要一些效果很好的东西,这样我就可以专注于构建模型。 据我所知,它不会混合任何库来并行处理大量数据,您必须编写一个满足您需求的函数,不会预先构建任何东西来提供张量流模型。幸好 api 和 pandas 中的几乎一模一样,所以非常简单。 好吧,我使用的 loom 格式与 Pandas 数据框非常相似,但这并不能帮助我导入数据并进行我想要的数据处理。我需要一个关于如何处理这些大数据以在 TensorFlow 中执行不同任务(例如洗牌)的答案。 这正是我推荐 parquet + dask 的原因,它允许您处理比 RAM 大得多的文件,就像使用 pandas 一样。【参考方案2】:如果有人仍然对此主题感兴趣,这是我对这个问题的解决方案。最后我坚持使用 Loompy 文件格式,因为我正在做的事情真的很方便(看看Loompy here)。为了在我的模型中导入如此大量的信息,我使用了tf.data.Dataset
TensorFlow API 的from_generator()
函数。另外,我创建了一个生成器来根据需要生成数据。
下面是我的输入函数的样子:
import loompy as lp
import tensorflow as tf
from sklearn.model_selection import train_test_split
model_input_name = ""
input_size = 10000
batch_size = 32
epochs = 10
# Input functions for train, test and eval sets.
def train_input_fn():
return _input_fn('TRAIN')
def test_input_fn():
return _input_fn('TEST')
def eval_input_fn():
return _input_fn('EVAL')
# General purpose input function
def _input_fn(mode = 'TRAIN'):
"""
Arguments
mode : 'TRAIN', 'TEST', 'EVAL'
"""
# A generator to yield data and labels from the given FILE,
# based on the indices assigned to the "indices" variable.
# If you change the labels, remember to update the from_generator()
# parameters below, to reflect their datatype.
def gen():
with lp.connect(FILE, 'r') as ds:
if ae:
for i in indices:
yield model_input_name: ds[:, i], ds[:, i]
else:
for i in indices:
yield model_input_name: ds[:, i], ds.ca.x_CellType[i]
# Get the indices for train, test and eval sets
train_idx, test_idx, eval_idx = train_test_set_idx_split(TRAIN_RT, TEST_RT, EVAL_RT)
# Check condition and assign the respective set to the "indices" variable
if mode == 'TRAIN':
indices = train_idx
elif mode == 'TEST':
indices = test_idx
elif mode == 'EVAL':
indices = eval_idx
else:
print("Wrong mode choice: ", mode)
exit(1)
dataset = tf.data.Dataset.from_generator(gen, (model_input_name: tf.int64, tf.int64),
output_shapes=(model_input_name: [input_size,], []))
# Shuffle, batch, map, prefetch and repeat your dataset.
# If you need to do some preprocessing on the data, create your function on
# the cell above, and call it within a map() function.
dataset = dataset.shuffle(buffer_size=batch_size*50)
dataset = dataset.batch(batch_size)
dataset = dataset.map(_reshape_labels)
dataset = dataset.map(_int2float)
# Map on whatever other functions you need
dataset = dataset.map( ... )
dataset = dataset.prefetch(2)
dataset = dataset.repeat(epochs)
iterator = dataset.make_one_shot_iterator()
return iterator.get_next()
# Get train, test, eval indices for the given dataset
def train_test_set_idx_split(train_rt, test_rt, eval_rt):
""" This function returns indices for the train, test and evaluation sets,
given an input Dataset.
Arguments:
train_rt: ratio of the train dataset
test_rt: ratio of the test dataset
eval_rt: ratio of the evaluation dataset
Returns:
train_idx: indices (of the given dataset) for the train dataset
test_idx: indices (of the given dataset) for the test dataset
evel_idx: indices (of the given dataset) for the evaluation dataset
Note:
This function will work correctly as long as (test_rt == evel_rt) is True.
If you need (test_rt != evel_rt), you need something more sophisticated.
"""
with lp.connect(FILE, 'r') as ds:
idx = np.array(range(0, ds.shape[1]))
train_idx, test_idx = train_test_split(idx, train_size=train_rt, test_size=test_rt+eval_rt)
test_idx, eval_idx = train_test_split(test_idx, train_size=0.5, test_size=0.5)
return train_idx, test_idx, eval_idx
# Reshape labels as needed
def _reshape_labels(data, labels):
return data, tf.reshape(labels, (-1,1))
【讨论】:
【参考方案3】:dask 和 tensorflow 等机器学习框架之间的差距实际上是分布式内存缓存。
我们可以看到dask对于预处理部分来说是一个非常不错的选择,但是如何将预处理后的数据传输到tensorflow是我们比较担心的。
Vineyard(v6d.io) 通过提供与 dask 等数据引擎和 tf 和 pytorch 等 ml 引擎的集成来解决中间数据共享问题。 这是一个示例 (https://v6d.io/examples/distributed-learning.html),展示了如何将预处理后的数据从 dask 传输到 horovod.keras,希望对您有所帮助。
【讨论】:
以上是关于在 TensorFlow 中导入巨大的非图像数据集的主要内容,如果未能解决你的问题,请参考以下文章
在数组中导入数据集时,谷歌 colab 中的 ram 用完了
TensorFlow - 从 TensorBoard TFEvent 文件中导入数据?