如何有效地将 Pandas Dataframe 保存到一个/多个 TFRecord 文件中?

Posted

技术标签:

【中文标题】如何有效地将 Pandas Dataframe 保存到一个/多个 TFRecord 文件中?【英文标题】:How to efficiently save a Pandas Dataframe into one/more TFRecord file? 【发布时间】:2018-03-22 14:32:49 【问题描述】:

首先,我想快速介绍一些背景知识。我最终想要实现的是在 tensorflow 框架下为多类分类问题训练一个全连接的神经网络。

这个问题的挑战在于训练数据的大小是巨大的(~ 2 TB)。为了让训练在有限的内存下工作,我想将训练集保存到小文件中,并使用小批量梯度下降算法来训练模型。 (每次只有一个或几个文件被加载到内存中)。

现在假设我已经有两个包含已处理数据的数据框,一个带有 X_train(700 万个条目 * 200 个带有列名的特征),另一个带有 training_y(700 万个条目 * 1 个标签)。 如何有效地将其保存到 TFrecord 文件中,保留列名、行索引等,并且我可能希望每个文件包含 100,000 个条目?我知道我可以利用 TFrecord 下的所有内容一些在 tensorflow 中实现的整洁的洗牌和批处理功能。我可能需要一种非常有效的方法来写入此类记录,因为稍后我需要将 2TB 的数据写入此文件格式。

我尝试在 Google 上搜索“如何将 pandas 数据帧写入 TFRecords”,但没有找到任何好的示例。大多数示例要求我逐列、逐行创建tf.train.Example,并使用tf.python_io.TFRecordWriter 写入tfrecord 文件。只是想确认这是我能在这里得到的最好的结果。

如果您对我要解决的问题有其他建议,我们也将不胜感激!

【问题讨论】:

【参考方案1】:

您可以查看here 将 pandas df 写入 tfRecord

安装 pandas-tfrecords

pip install pandas-tfrecords

试试

import pandas as pd
from pandas_tfrecords import pd2tf, tf2pd

df = pd.DataFrame('A': [1, 2, 3], 'B': ['a', 'b', 'c'], 'C': [[1, 2], [3, 4], [5, 6]])

# local
pd2tf(df, './tfrecords')
my_df = tf2pd('./tfrecords')

希望这会有所帮助。

【讨论】:

【参考方案2】:

一种可行的解决方法是将pandas 数据框导出到镶木地板文件。这是有效存储数据的最佳方法之一,因为数据将被分割成一些文件。

您甚至可以决定将哪一列用于分区,以便该列的每个唯一值都将进入一个文件。更多信息to_parquet pandas doc。

然后您可以使用这些分区进行批处理。

【讨论】:

【参考方案3】:

如果您只使用tensorflowpandas,将文件转换为TFRecords(不幸的是)相当复杂。由于其他答案提供了避免这种情况的巧妙方法,我将展示如何仅使用 tensorflowpandas 进行转换,如果只是为了完整起见。

触发警告:很多 TF 样板。您已收到警告。

import pandas as pd
import tensorflow as tf

#Creating fake data for demonstration
X_train = pd.DataFrame('feat1':[1,2,3], 
                  'feat2':['one','two','three'])
training_y = pd.DataFrame('target': [3.4, 11.67, 44444.1])

X_train.to_csv('X_train.csv')
training_y.to_csv('training_y.csv')

#TFRecords boilerplate
def _bytes_feature(value):
    """Returns a bytes_list from a string / byte."""
    if isinstance(value, type(tf.constant(0))):
        value = value.numpy() # BytesList won't unpack a string from an EagerTensor.
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def _float_feature(value):
    """Returns a float_list from a float / double."""
    return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))

def _int64_feature(value):
    """Returns an int64_list from a bool / enum / int / uint."""
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

def serialize_example(index, feat1, feat2, target):
    """
    Creates a tf.train.Example message ready to be written to a file.
    """
    # Create a dictionary mapping the feature name to the tf.train.Example-compatible
    # data type.
    feature = 
      'index': _int64_feature(index),
      'feat1': _int64_feature(feat1),
      'feat2': _bytes_feature(feat2),
      'target': _float_feature(target)
    
    example_proto = tf.train.Example(features=tf.train.Features(feature=feature))
    return example_proto.SerializeToString()

#Loading the data into chunks of size 2.  Change this to 1e5 in your code
CHUNKSIZE = 2
train = pd.read_csv('X_train.csv', chunksize=CHUNKSIZE)
y = pd.read_csv('training_y.csv', chunksize=CHUNKSIZE)

file_num = 0
while 1:
    try:
        print(f'file_num')
        #Getting the data from the two files 
        df = pd.concat([train.get_chunk(), y.get_chunk()],1)
        
        #Writing the TFRecord
        with tf.io.TFRecordWriter(f'Record_file_num.tfrec') as writer:
            for k in range(df.shape[0]):
                row = df.iloc[k,:]
                example = serialize_example(
                    df.index[k],
                    row['feat1'],
                    str.encode(row['feat2']), #Note the str.encode to make tf play nice with strings
                    row['target']) 
                writer.write(example)    
        file_num += 1
    except:
        print(f'ERROR: sys.exc_info()[0]')
        break

上面的代码使用pandas.read_csvchunksize 参数分块加载文件。如果您的文件不是 csv,请检查相应的 pandas read_filetype 是否具有 chunksize 参数。

在写这篇文章时,我非常依赖Chris Deotte's How to Create TFRecords kernel。我试过the official documentation,但他们没有提到如何让 tf.io 读取你的 pandas 字符串。这让生活变得更加艰难。

如果出于某种原因,您需要检查 TFRecords 内部以确保数据正确,您将需要更多样板文件。享受吧。

#Reading the TFRecord
def read_tfrecord(example):
    LABELED_TFREC_FORMAT = 
        "index": tf.io.FixedLenFeature([], tf.int64), 
        "feat1": tf.io.FixedLenFeature([], tf.int64),
        "feat2": tf.io.FixedLenFeature([], tf.string),
        "target": tf.io.FixedLenFeature([], tf.float32)
    
    
    example = tf.io.parse_single_example(example, LABELED_TFREC_FORMAT)
    index = example['index']
    feat1 = example['feat1']
    feat2 = example['feat2']
    target = example['target']
    return index, feat1, feat2, target 

def load_dataset(filenames, labeled=True, ordered=False):
    # Read from TFRecords. For optimal performance, reading from multiple files at once and
    # disregarding data order. Order does not matter since we will be shuffling the data anyway.

    ignore_order = tf.data.Options()
    if not ordered:
        ignore_order.experimental_deterministic = False # disable order, increase speed

    dataset = tf.data.TFRecordDataset(filenames, num_parallel_reads=AUTO) # automatically interleaves reads from multiple files
    dataset = dataset.with_options(ignore_order) # uses data as soon as it streams in, rather than in its original order
    dataset = dataset.map(read_tfrecord)
    # returns a dataset of (image, label) pairs if labeled=True or (image, id) pairs if labeled=False
    return dataset

AUTO = tf.data.experimental.AUTOTUNE
def get_training_dataset(filenames, batch_size=2):
    dataset = load_dataset(filenames, labeled=True)
    dataset = dataset.repeat() # the training dataset must repeat for several epochs
    #dataset = dataset.shuffle(2048)
    dataset = dataset.batch(batch_size)
    dataset = dataset.prefetch(AUTO) # prefetch next batch while training (autotune prefetch buffer size)
    return dataset

training_dataset = get_training_dataset(filenames= ['Record_0.tfrec', 'Record_1.tfrec'])
#training_dataset = training_dataset.unbatch().batch(20)
next(iter(training_dataset))

(,

,

,

,

)

【讨论】:

以上是关于如何有效地将 Pandas Dataframe 保存到一个/多个 TFRecord 文件中?的主要内容,如果未能解决你的问题,请参考以下文章

有效地将值从一列替换到另一列 Pandas DataFrame

有效地将Pandas数据帧写入Google BigQuery

pandas 有效地将 DataFrames 与不匹配的分类列和 MultiIndex 级别连接起来

从 Numpy 3d 数组有效地创建 Pandas DataFrame

如何阻止 Pandas DataFrame 无缘无故地将 int 转换为 float?

Python pandas 通过 dt 访问器有效地将日期时间转换为时间戳