如何*实际上*在 TensorFlow 中读取 CSV 数据?
Posted
技术标签:
【中文标题】如何*实际上*在 TensorFlow 中读取 CSV 数据?【英文标题】:How to *actually* read CSV data in TensorFlow? 【发布时间】:2016-09-02 16:18:13 【问题描述】:我对 TensorFlow 的世界还比较陌生,并且对您如何实际上将 CSV 数据读入 TensorFlow 中的可用示例/标签张量感到非常困惑。 TensorFlow tutorial on reading CSV data 中的示例非常分散,只能让您在 CSV 数据上进行训练。
这是我根据 CSV 教程拼凑的代码:
from __future__ import print_function
import tensorflow as tf
def file_len(fname):
with open(fname) as f:
for i, l in enumerate(f):
pass
return i + 1
filename = "csv_test_data.csv"
# setup text reader
file_length = file_len(filename)
filename_queue = tf.train.string_input_producer([filename])
reader = tf.TextLineReader(skip_header_lines=1)
_, csv_row = reader.read(filename_queue)
# setup CSV decoding
record_defaults = [[0],[0],[0],[0],[0]]
col1,col2,col3,col4,col5 = tf.decode_csv(csv_row, record_defaults=record_defaults)
# turn features back into a tensor
features = tf.stack([col1,col2,col3,col4])
print("loading, " + str(file_length) + " line(s)\n")
with tf.Session() as sess:
tf.initialize_all_variables().run()
# start populating filename queue
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
for i in range(file_length):
# retrieve a single instance
example, label = sess.run([features, col5])
print(example, label)
coord.request_stop()
coord.join(threads)
print("\ndone loading")
这是我正在加载的 CSV 文件中的一个简短示例 - 非常基本的数据 - 4 个特征列和 1 个标签列:
0,0,0,0,0
0,15,0,0,0
0,30,0,0,0
0,45,0,0,0
上面的所有代码都是从 CSV 文件中逐个打印每个示例,虽然不错,但对训练毫无用处。
我在这里苦苦挣扎的是,您实际上如何将这些单独的示例,一个一个地加载,转化为训练数据集。例如,here's a notebook 我正在学习 Udacity 深度学习课程。我基本上想获取我正在加载的 CSV 数据,并将其放入 train_dataset 和 train_labels 之类的东西中:
def reformat(dataset, labels):
dataset = dataset.reshape((-1, image_size * image_size)).astype(np.float32)
# Map 2 to [0.0, 1.0, 0.0 ...], 3 to [0.0, 0.0, 1.0 ...]
labels = (np.arange(num_labels) == labels[:,None]).astype(np.float32)
return dataset, labels
train_dataset, train_labels = reformat(train_dataset, train_labels)
valid_dataset, valid_labels = reformat(valid_dataset, valid_labels)
test_dataset, test_labels = reformat(test_dataset, test_labels)
print('Training set', train_dataset.shape, train_labels.shape)
print('Validation set', valid_dataset.shape, valid_labels.shape)
print('Test set', test_dataset.shape, test_labels.shape)
我试过像这样使用tf.train.shuffle_batch
,但它只是莫名其妙地挂起:
for i in range(file_length):
# retrieve a single instance
example, label = sess.run([features, colRelevant])
example_batch, label_batch = tf.train.shuffle_batch([example, label], batch_size=file_length, capacity=file_length, min_after_dequeue=10000)
print(example, label)
所以总结一下,这是我的问题:
我在这个过程中遗漏了什么? 感觉我缺少一些关于如何正确构建输入管道的关键直觉。 有没有办法避免知道 CSV 文件的长度? 必须知道要处理的行数感觉很不雅(上面的for i in range(file_length)
代码行)
编辑: 一旦雅罗斯拉夫指出我可能在这里混淆了命令式和图形构造部分,它就开始变得更加清晰。我能够汇总以下代码,我认为这更接近从 CSV 训练模型时通常会执行的操作(不包括任何模型训练代码):
from __future__ import print_function
import numpy as np
import tensorflow as tf
import math as math
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('dataset')
args = parser.parse_args()
def file_len(fname):
with open(fname) as f:
for i, l in enumerate(f):
pass
return i + 1
def read_from_csv(filename_queue):
reader = tf.TextLineReader(skip_header_lines=1)
_, csv_row = reader.read(filename_queue)
record_defaults = [[0],[0],[0],[0],[0]]
colHour,colQuarter,colAction,colUser,colLabel = tf.decode_csv(csv_row, record_defaults=record_defaults)
features = tf.stack([colHour,colQuarter,colAction,colUser])
label = tf.stack([colLabel])
return features, label
def input_pipeline(batch_size, num_epochs=None):
filename_queue = tf.train.string_input_producer([args.dataset], num_epochs=num_epochs, shuffle=True)
example, label = read_from_csv(filename_queue)
min_after_dequeue = 10000
capacity = min_after_dequeue + 3 * batch_size
example_batch, label_batch = tf.train.shuffle_batch(
[example, label], batch_size=batch_size, capacity=capacity,
min_after_dequeue=min_after_dequeue)
return example_batch, label_batch
file_length = file_len(args.dataset) - 1
examples, labels = input_pipeline(file_length, 1)
with tf.Session() as sess:
tf.initialize_all_variables().run()
# start populating filename queue
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
try:
while not coord.should_stop():
example_batch, label_batch = sess.run([examples, labels])
print(example_batch)
except tf.errors.OutOfRangeError:
print('Done training, epoch reached')
finally:
coord.request_stop()
coord.join(threads)
【问题讨论】:
我一直在尝试您的代码,但无法使其正常工作。你确定有什么我想念的吗?谢谢。我在这里发布了一个帖子,以便您了解更多详细信息:***.com/questions/40143019/… 【参考方案1】:我认为您在这里混淆了命令式和图形构造部分。操作tf.train.shuffle_batch
创建了一个新的队列节点,单个节点可以用来处理整个数据集。所以我认为你挂了,因为你在 for 循环中创建了一堆 shuffle_batch
队列并且没有为它们启动队列运行器。
正常的输入管道用法如下所示:
-
将
shuffle_batch
等节点添加到输入管道
(可选,防止无意修改图)最终确定图
--- 图构建结束,命令式编程开始 --
tf.start_queue_runners
while(True): session.run()
为了更具可扩展性(避免使用 Python GIL),您可以使用 TensorFlow 管道生成所有数据。但是,如果性能不重要,您可以使用 slice_input_producer.
将 numpy 数组连接到输入管道这是一个示例,其中包含一些 Print
节点以查看发生了什么(Print
中的消息在节点为运行)
tf.reset_default_graph()
num_examples = 5
num_features = 2
data = np.reshape(np.arange(num_examples*num_features), (num_examples, num_features))
print data
(data_node,) = tf.slice_input_producer([tf.constant(data)], num_epochs=1, shuffle=False)
data_node_debug = tf.Print(data_node, [data_node], "Dequeueing from data_node ")
data_batch = tf.batch([data_node_debug], batch_size=2)
data_batch_debug = tf.Print(data_batch, [data_batch], "Dequeueing from data_batch ")
sess = tf.InteractiveSession()
sess.run(tf.initialize_all_variables())
tf.get_default_graph().finalize()
tf.start_queue_runners()
try:
while True:
print sess.run(data_batch_debug)
except tf.errors.OutOfRangeError as e:
print "No more inputs."
你应该看到这样的东西
[[0 1]
[2 3]
[4 5]
[6 7]
[8 9]]
[[0 1]
[2 3]]
[[4 5]
[6 7]]
No more inputs.
“8, 9”数字没有填满整个批次,所以它们没有被生产出来。 tf.Print
也打印到 sys.stdout 中,因此它们分别显示在终端中。
PS:将batch
连接到手动初始化队列的最低限度在github issue 2193
此外,出于调试目的,您可能希望在会话中设置timeout
,这样您的 IPython 笔记本就不会挂在空队列出列上。我在会话中使用这个辅助函数
def create_session():
config = tf.ConfigProto(log_device_placement=True)
config.gpu_options.per_process_gpu_memory_fraction=0.3 # don't hog all vRAM
config.operation_timeout_in_ms=60000 # terminate on long hangs
# create interactive session to register a default session
sess = tf.InteractiveSession("", config=config)
return sess
可扩展性说明:
tf.constant
将您的数据副本内联到图表中。 Graph 定义的大小有 2GB 的基本限制,因此这是数据大小的上限
您可以通过使用 v=tf.Variable
并通过在右侧运行带有 tf.placeholder
的 v.assign_op
并将 numpy 数组提供给占位符 (feed_dict
) 来将数据保存到该限制中
这仍然会创建两个数据副本,因此为了节省内存,您可以制作自己的 slice_input_producer
版本,它在 numpy 数组上运行,并使用 feed_dict
一次上传一行
【讨论】:
啊,是的!你是完全正确的 - 一旦你说:“我认为你在这里混淆了命令式和图形构造部分”,我开始明白我哪里出错了。我已经发布了对我的问题的编辑,其中包括我放在一起的最新代码,这实际上让我更接近我想要的 - 我能够成功读取 CSV 数据并以我可以训练的方式对其进行批处理模型。 我建议更新此答案,以便它适用于最新版本的 TensorFlow:将tf.slice_input_producer()
替换为 tf.train.slice_input_producer()
(对于其他几个函数也是如此)。并且还在sess.run(tf.initialize_all_variables())
之后添加sess.run(tf.initialize_local_variables())
。
需要进行更多更改:pack()
现在是 stack()
,initialize_all_variables()
应替换为 global_variables_initializer()
和 local_variables_initializer()
。
使用 tensorflow 1.0.1,您需要将局部和全局变量初始化为 tf.group(tf.global_variables_initializer(), tf.local_variables_initializer()).run()
。由于您使用的是 num_epochs 并且根据 documentation “注意:如果 num_epochs
不是 None
,此函数将创建本地计数器 epochs
,因此您需要初始化局部变量。”跨度>
【参考方案2】:
或者你可以试试这个,代码使用 pandas 和 numpy 将 Iris 数据集加载到 tensorflow 中,并在会话中打印一个简单的神经元输出。希望对基本的理解有所帮助.... [我还没有添加一个热解码标签的方式]。
import tensorflow as tf
import numpy
import pandas as pd
df=pd.read_csv('/home/nagarjun/Desktop/Iris.csv',usecols = [0,1,2,3,4],skiprows = [0],header=None)
d = df.values
l = pd.read_csv('/home/nagarjun/Desktop/Iris.csv',usecols = [5] ,header=None)
labels = l.values
data = numpy.float32(d)
labels = numpy.array(l,'str')
#print data, labels
#tensorflow
x = tf.placeholder(tf.float32,shape=(150,5))
x = data
w = tf.random_normal([100,150],mean=0.0, stddev=1.0, dtype=tf.float32)
y = tf.nn.softmax(tf.matmul(w,x))
with tf.Session() as sess:
print sess.run(y)
【讨论】:
这很有启发性,但如果我理解正确,它并没有显示如何使用数据进行训练...... 是的,我会尽快添加它们...这应该很简单不是吗...计算损失,运行优化器我会尽快添加它们 嗨,dividebyzero,对不起,我迟到了!我发现了另一个有趣的链接,并且真正缓解了问题tensorflow.org/tutorials/tflearn....在这里您可以加载 csv 文件,训练它们,执行分类... @NagarjunGururaj 我可以在普通的tensorflow例程中使用contrib_learn构建的数据集吗? 哪个数据集?你是说鸢尾花还是其他?【参考方案3】:您可以使用最新的 tf.data API:
dataset = tf.contrib.data.make_csv_dataset(filepath)
iterator = dataset.make_initializable_iterator()
columns = iterator.get_next()
with tf.Session() as sess:
sess.run([iteator.initializer])
【讨论】:
【参考方案4】:如果有人来这里寻找一种简单的方法来读取 tf.estimator API 中绝对大且分片的 CSV 文件,那么请看下面我的代码
CSV_COLUMNS = ['ID','text','class']
LABEL_COLUMN = 'class'
DEFAULTS = [['x'],['no'],[0]] #Default values
def read_dataset(filename, mode, batch_size = 512):
def _input_fn(v_test=False):
# def decode_csv(value_column):
# columns = tf.decode_csv(value_column, record_defaults = DEFAULTS)
# features = dict(zip(CSV_COLUMNS, columns))
# label = features.pop(LABEL_COLUMN)
# return add_engineered(features), label
# Create list of files that match pattern
file_list = tf.gfile.Glob(filename)
# Create dataset from file list
#dataset = tf.data.TextLineDataset(file_list).map(decode_csv)
dataset = tf.contrib.data.make_csv_dataset(file_list,
batch_size=batch_size,
column_names=CSV_COLUMNS,
column_defaults=DEFAULTS,
label_name=LABEL_COLUMN)
if mode == tf.estimator.ModeKeys.TRAIN:
num_epochs = None # indefinitely
dataset = dataset.shuffle(buffer_size = 10 * batch_size)
else:
num_epochs = 1 # end-of-input after this
batch_features, batch_labels = dataset.make_one_shot_iterator().get_next()
#Begins - Uncomment for testing only -----------------------------------------------------<
if v_test == True:
with tf.Session() as sess:
print(sess.run(batch_features))
#End - Uncomment for testing only -----------------------------------------------------<
return add_engineered(batch_features), batch_labels
return _input_fn
TF.estimator 中的使用示例:
train_spec = tf.estimator.TrainSpec(input_fn = read_dataset(
filename = train_file,
mode = tf.estimator.ModeKeys.TRAIN,
batch_size = 128),
max_steps = num_train_steps)
【讨论】:
【参考方案5】:2.0 兼容解决方案:此答案可能由上述线程中的其他人提供,但我将提供有助于社区的其他链接。
dataset = tf.data.experimental.make_csv_dataset(
file_path,
batch_size=5, # Artificially small to make examples easier to show.
label_name=LABEL_COLUMN,
na_value="?",
num_epochs=1,
ignore_errors=True,
**kwargs)
更多信息请参考Tensorflow Tutorial。
【讨论】:
我发现这个答案(以及教程和文档)非常令人沮丧。用 OP 的话来说,它仍然只是“能够训练 CSV 数据的一部分”。它创建了一个“数据集”(但是什么类型 - 它甚至是 tf.data.Dataset?文档不清楚)并且数据集似乎是面向列的,而不是面向行的。大多数模型需要成批的行传递给他们进行训练——如何实现这一步?我问this question 寻求端到端的例子。 请提供 make_csv_dataset 的端到端示例,而不是仅仅放置抽象级别的文档!以上是关于如何*实际上*在 TensorFlow 中读取 CSV 数据?的主要内容,如果未能解决你的问题,请参考以下文章