将 .tfrecords 文件拆分为多个 .tfrecords 文件

Posted

技术标签:

【中文标题】将 .tfrecords 文件拆分为多个 .tfrecords 文件【英文标题】:Split .tfrecords file into many .tfrecords files 【发布时间】:2019-06-28 09:08:04 【问题描述】:

有没有什么方法可以直接将 .tfrecords 文件拆分成多个 .tfrecords 文件,而不用回写每个 Dataset 示例?

【问题讨论】:

【参考方案1】:

在 tensorflow 2.0.0 中,这将起作用:

import tensorflow as tf

raw_dataset = tf.data.TFRecordDataset("input_file.tfrecord")

shards = 10

for i in range(shards):
    writer = tf.data.experimental.TFRecordWriter(f"output_file-part-i.tfrecord")
    writer.write(raw_dataset.shard(shards, i))

【讨论】:

【参考方案2】:

你可以使用这样的函数:

import tensorflow as tf

def split_tfrecord(tfrecord_path, split_size):
    with tf.Graph().as_default(), tf.Session() as sess:
        ds = tf.data.TFRecordDataset(tfrecord_path).batch(split_size)
        batch = ds.make_one_shot_iterator().get_next()
        part_num = 0
        while True:
            try:
                records = sess.run(batch)
                part_path = tfrecord_path + '.:03d'.format(part_num)
                with tf.python_io.TFRecordWriter(part_path) as writer:
                    for record in records:
                        writer.write(record)
                part_num += 1
            except tf.errors.OutOfRangeError: break

例如,要将文件 my_records.tfrecord 拆分为每部分 100 条记录,您可以:

split_tfrecord(my_records.tfrecord, 100)

这将创建多个较小的记录文件my_records.tfrecord.000my_records.tfrecord.001 等。

【讨论】:

【参考方案3】:

使用.batch() 而不是.shard() 来避免多次迭代数据集

一种更高效的方法(与使用 tf.data.Dataset.shard() 相比)是使用批处理:

import tensorflow as tf

ITEMS_PER_FILE = 100 # Assuming we are saving 100 items per .tfrecord file


raw_dataset = tf.data.TFRecordDataset('in.tfrecord')

batch_idx = 0
for batch in raw_dataset.batch(ITEMS_PER_FILE):

    # Converting `batch` back into a `Dataset`, assuming batch is a `tuple` of `tensors`
    batch_ds = tf.data.Dataset.from_tensor_slices(tuple([*batch]))
    filename = f'out.tfrecord.batch_idx:03d'

    writer = tf.data.experimental.TFRecordWriter(filename)
    writer.write(batch_ds)

    batch_idx += 1

【讨论】:

【参考方案4】:

TensorFlow 2.x 非常有效的方法

正如@yongjieyongjie 所述,您应该使用.batch() 而不是.shard(),以避免根据需要更频繁地迭代数据集。 但是如果你有一个非常大的数据集,对内存来说太大了,它会失败(但不会出错),只给你一些文件和原始数据集的一小部分。

首先,您应该对数据集进行批处理,并使用您希望每个文件拥有的记录数量作为批处理大小(我假设您的数据集已经是序列化格式,否则请参阅here)。

dataset = dataset.batch(ITEMS_PER_FILE)

接下来要做的事情是使用生成器来避免内存不足。

def write_generator():
    i = 0
    iterator = iter(dataset)
    optional = iterator.get_next_as_optional()
    while optional.has_value().numpy():
        ds = optional.get_value()
        optional = iterator.get_next_as_optional()
        batch_ds = tf.data.Dataset.from_tensor_slices(ds)
        writer = tf.data.experimental.TFRecordWriter(save_to + "\\" + name + "-" + str(i) + ".tfrecord", compression_type='GZIP')#compression_type='GZIP'
        i += 1
        yield batch_ds, writer, i
    return

现在只需在普通的 for 循环中使用生成器

for data, wri, i in write_generator():
    start_time = time.time()
    wri.write(data)
    print("Time needed: ", time.time() - start_time, "s", "\t", NAME_OF_FILES + "-" + str(i) + ".tfrecord")

只要一个文件适合内存中的原始文件,这应该可以正常工作。

【讨论】:

【参考方案5】:

不均匀的分割

如果您想平均分成大小相等的文件,大多数其他答案都可以使用。这适用于不均匀的分割:

# `splits` is a list of the number of records you want in each output file
def split_files(filename: str, splits: List[int]) -> None:
    dataset: tf.data.Dataset = tf.data.TFRecordDataset(filename)
    rec_counter: int = 0

    # An extra iteration over the data to get the size
    total_records: int = len([r for r in dataset])
    print(f"Found total_records records in source file.")

    if sum(splits) != total_records:
        raise ValueError(f"Sum of splits sum(splits) does not equal "
                         f"total number of records total_records")

    rec_iter:Iterator = iter(dataset)
    split: int
    for split_idx, split in enumerate(splits):
        outfile: str = filename + f".split_idx-split"
        with tf.io.TFRecordWriter(outfile) as writer:
            for out_idx in range(split):
                rec: tf.Tensor = next(rec_iter, None)
                rec_counter +=1
                writer.write(rec.numpy())
        print(f"Finished writing split records to file split_idx")

虽然我认为从技术上讲,OP 询问了without writing back each Dataset example(这就是它的作用),但这至少是在不反序列化每个示例的情况下做到的。

对于非常大的文件,它有点慢。可能有一种方法可以修改其他一些基于批处理的答案,以便使用批处理输入读取但仍然写入不均匀拆分,但我没有尝试过。

【讨论】:

【参考方案6】:

分成 N 次分割 (在 tensorflow 1.13.1 中测试)

import os
import hashlib
import tensorflow as tf
from tqdm import tqdm


def split_tfrecord(tfrecord_path, n_splits):
    dataset = tf.data.TFRecordDataset(tfrecord_path)
    outfiles=[]
    for n_split in range(n_splits):
        output_tfrecord_dir = f"os.path.splitext(tfrecord_path)[0]"
        if not os.path.exists(output_tfrecord_dir):
            os.makedirs(output_tfrecord_dir)
        output_tfrecord_path=os.path.join(output_tfrecord_dir, f"n_split:03d.tfrecord")
        out_f = tf.io.TFRecordWriter(output_tfrecord_path)
        outfiles.append(out_f)

    for record in tqdm(dataset):
        sample = tf.train.Example()
        record = record.numpy()
        sample.ParseFromString(record)

        idx = int(hashlib.sha1(record).hexdigest(),16) % n_splits
        outfiles[idx].write(example.SerializeToString())

    for file in outfiles:
        file.close()

【讨论】:

以上是关于将 .tfrecords 文件拆分为多个 .tfrecords 文件的主要内容,如果未能解决你的问题,请参考以下文章

tensorflow二进制文件读取与tfrecords文件读取

『TensorFlow』TFR数据预处理探究以及框架搭建

将队列拆分为训练/测试集

将图像/掩码对转换为tfrecord

吴裕雄--天生自然 pythonTensorFlow图形数据处理:数据集高层操作

如何有效地将 Pandas Dataframe 保存到一个/多个 TFRecord 文件中?