如何为我的 tensorflow 模型提高此数据管道的性能
Posted
技术标签:
【中文标题】如何为我的 tensorflow 模型提高此数据管道的性能【英文标题】:How to improve the performance of this data pipeline for my tensorflow model 【发布时间】:2019-04-24 17:39:43 【问题描述】:我有一个张量流模型,我正在 google-colab 上进行训练。实际模型更复杂,但我将其浓缩为reproducible example(去掉了保存/恢复、学习率衰减、断言、张量板事件、梯度裁剪等)。该模型工作合理(收敛到可接受的损失),我正在寻找加快训练速度的方法(每秒迭代次数)。
目前在 colab 的 GPU 上训练 1000 次迭代需要 10 分钟。我当前的批量大小为 512,这意味着该模型每秒处理 ~850 个示例(我希望批量大小为 512,除非其他大小提供合理的加速。改变批量大小本身并不会改变速度)。
所以目前我有一个以 tfrecord 格式存储的数据:这是一个500Mb example file,总数据大小约为 0.5Tb。这些数据通过了一个相当繁重的预处理步骤(我不能事先进行预处理,因为它会使我的 tfrecords 的大小大大超出我的承受能力)。预处理通过tf.data 完成,输出张量((batch_size, 8, 8, 24)
被视为 NHWC,(batch_size, 10)
)被传递到模型中。示例 colab 不包含仅用作示例的简化模型。
我尝试了一些方法来加快训练速度:
manual device placement(在 cpu 上进行数据预处理,在 gpu 上进行传播),但我所有的尝试都导致速度变差(从 10% 提高到 50%)。 改进数据预处理。我查看了tf.data video 和data tutorials。我尝试了该教程中的几乎所有技术都没有任何改进(速度从 0% 降低到 15%)。特别是我尝试过:dataset.prefetch(...)
将num_parallel_calls
传递给地图
在tf.contrib.data.map_and_batch
中结合map和batch
使用parallel_interleave
数据预处理相关的代码在这里(这里是full reproducible example和example data):
_keys_to_map =
'd': tf.FixedLenFeature([], tf.string), # data
's': tf.FixedLenFeature([], tf.int64), # score
def _parser(record):][3]
parsed = tf.parse_single_example(record, _keys_to_map)
return parsed['d'], parsed['s']
def init_tfrecord_dataset():
files_train = glob.glob(DIR_TFRECORDS + '*.tfrecord')
random.shuffle(files_train)
with tf.name_scope('tfr_iterator'):
ds = tf.data.TFRecordDataset(files_train) # define data from randomly ordered files
ds = ds.shuffle(buffer_size=10000) # select elements randomly from the buffer
ds = ds.map(_parser) # map them based on tfrecord format
ds = ds.batch(BATCH_SIZE, drop_remainder=True) # group elements in batch (remove batch of less than BATCH_SIZE)
ds = ds.repeat() # iterate infinitely
return ds.make_initializable_iterator() # initialize the iterator
def iterator_to_data(iterator):
"""Creates a part of the graph which reads the raw data from an iterator and transforms it to a
data ready to be passed to model.
Args:
iterator - iterator. Created by `init_tfrecord_dataset`
Returns:
data_board - (BATCH_SIZE, 8, 8, 24) you can think about as NWHC for images.
data_flags - (BATCH_SIZE, 10)
combined_score - (BATCH_SIZE,)
"""
b = tf.constant((128, 64, 32, 16, 8, 4, 2, 1), dtype=tf.uint8, name='unpacked_const')
with tf.name_scope('tfr_parse'):
with tf.name_scope('packed_data'):
next_element = iterator.get_next()
data_packed, score_int = next_element
score = tf.cast(score_int, tf.float64, name='score_float')
# https://***.com/q/45454470/1090562
with tf.name_scope('data_unpacked'):
data_unpacked = tf.reshape(tf.mod(tf.to_int32(tf.decode_raw(data_packed, tf.uint8)[:,:,None] // b), 2), [BATCH_SIZE, 1552], name='data_unpack')
with tf.name_scope('score'):
with tf.name_scope('is_mate'):
score_is_mate = tf.cast(tf.squeeze(tf.slice(data_unpacked, [0, 1546], [BATCH_SIZE, 1])), tf.float64, name='is_mate')
with tf.name_scope('combined'):
combined_score = (1 - score_is_mate) * VALUE_A * tf.tanh(score / VALUE_K) + score_is_mate * tf.sign(score) * (VALUE_A + (1 - VALUE_A) / (VALUE_B - 1) * tf.reduce_max(tf.stack([tf.zeros(BATCH_SIZE, dtype=tf.float64), VALUE_B - tf.abs(score)]), axis=0))
with tf.name_scope('board'):
with tf.name_scope('reshape_layers'):
data_board = tf.reshape(tf.slice(data_unpacked, [0, 0], [BATCH_SIZE, 8 * 8 * 24]), [BATCH_SIZE, 8, 8, 24], name='board_reshape')
with tf.name_scope('combine_layers'):
data_board = tf.cast(tf.stack([
data_board[:,:,:, 0],
data_board[:,:,:, 4],
data_board[:,:,:, 8],
data_board[:,:,:,12],
data_board[:,:,:,16],
data_board[:,:,:,20],
- data_board[:,:,:, 1],
- data_board[:,:,:, 5],
- data_board[:,:,:, 9],
- data_board[:,:,:,13],
- data_board[:,:,:,17],
- data_board[:,:,:,21],
data_board[:,:,:, 2],
data_board[:,:,:, 6],
data_board[:,:,:,10],
data_board[:,:,:,14],
data_board[:,:,:,18],
data_board[:,:,:,22],
- data_board[:,:,:, 3],
- data_board[:,:,:, 7],
- data_board[:,:,:,11],
- data_board[:,:,:,15],
- data_board[:,:,:,19],
- data_board[:,:,:,23],
], axis=3), tf.float64, name='board_compact')
with tf.name_scope('flags'):
data_flags = tf.cast(tf.slice(data_unpacked, [0, 1536], [BATCH_SIZE, 10]), tf.float64, name='flags')
return data_board, data_flags, combined_score
我正在寻找可以提高训练速度(以示例/秒计)的实用解决方案(我已经尝试了大量的理论想法)。我不是在寻找提高模型准确性(或修改模型)的方法,因为这只是一个测试模型。
我花了很多时间尝试优化这个(并且放弃了)。所以我很乐意奖励 200 赏金,奖励一个有很好解释的可行解决方案。
【问题讨论】:
你是从驱动器读取 tfrecords 吗? @mlRocks 是的,我正在从 gDrive 读取它。您实际上可以在问题的完整可重现链接中查看完整的实现。 这可能会有所帮助:tensorflow.org/guide/performance/…github.com/tensorflow/tensorflow/issues/14857 @SalvadorDali 这是已知问题。因为它不是连接到计算机的物理驱动器,所以读取它会很慢 【参考方案1】:我有几个建议:
1) 创建批次后,整个批次由iterator_to_data()
函数处理。这并不是真正在多个线程上分配任务,至少不是在 api 级别。相反,您可以在 init_tfrecord_dataset()
函数中尝试这样的操作:
ds = tf.data.TFRecordDataset(files_train) # define data from randomly ordered files
ds = ds.shuffle(buffer_size=10000) # select elements randomly from the buffer
ds = ds.map(_parser)
ds = ds.map(map_func=iterator_to_data, num_parallel_calls=FLAGS.num_preprocessing_threads)
ds = ds.batch(BATCH_SIZE, drop_remainder=True) # group elements in batch (remove batch of less than BATCH_SIZE)
ds = ds.repeat()
您可能还想更改 iterator_to_data() 函数中的几行,因为输入参数不是具有上述更改的迭代器。
2) 您可能还想使用tf.train.ProfilerHook
之类的方式获取分析信息。这可以告诉您瓶颈是cpu还是gpu。例如,如果瓶颈在于 CPU,您可能会看到 GPU 操作正在等待 memcpyHtoD 操作完成。
【讨论】:
【参考方案2】:suggestion from hampi 用于描述您的培训工作是一个不错的选择,可能需要了解您的管道中的实际瓶颈。 Input Pipeline performance guide 中的其他建议也应该很有用。
但是,还有另一种可能有用的“快速修复”。在某些情况下,Dataset.map()
转换中的工作量可能非常小,并且主要由为每个元素调用函数的成本决定。在这些情况下,我们经常尝试矢量化 map 函数,并在Dataset.batch()
转换之后移动它,以减少调用函数的次数(在这种情况下为 1/512 次) ,并在每个批次上执行更大且可能更容易并行化的操作。幸运的是,您的管道可以按如下方式进行矢量化:
def _batch_parser(record_batch):
# NOTE: Use `tf.parse_example()` to operate on batches of records.
parsed = tf.parse_example(record_batch, _keys_to_map)
return parsed['d'], parsed['s']
def init_tfrecord_dataset():
files_train = glob.glob(DIR_TFRECORDS + '*.tfrecord')
random.shuffle(files_train)
with tf.name_scope('tfr_iterator'):
ds = tf.data.TFRecordDataset(files_train) # define data from randomly ordered files
ds = ds.shuffle(buffer_size=10000) # select elements randomly from the buffer
# NOTE: Change begins here.
ds = ds.batch(BATCH_SIZE, drop_remainder=True) # group elements in batch (remove batch of less than BATCH_SIZE)
ds = ds.map(_batch_parser) # map batches based on tfrecord format
# NOTE: Change ends here.
ds = ds.repeat() # iterate infinitely
return ds.make_initializable_iterator() # initialize the iterator
目前,矢量化是您必须手动进行的更改,但tf.data
团队正在处理an optimization pass that provides automatic vectorization。
【讨论】:
非常感谢。根据我的测试(在更大的数据集上),这给了我约 3-5% 的加速(不确定这是统计显着还是随机波动)。没有我希望的那么多,但对于 3 行更改非常好。 @mrry 我的 TFRecords 中有非标量特征。虽然您的解决方案适用于标量功能,但我没有通过io.parse_tensor
管理解析张量。我遇到以下警告:WARNING:tensorflow:Using a while_loop for converting ParseTensor
。您对如何解决这个问题有什么建议吗?以上是关于如何为我的 tensorflow 模型提高此数据管道的性能的主要内容,如果未能解决你的问题,请参考以下文章
如何为 keras 模型使用 tensorflow 自定义损失?
TensorFlow 2.0 Keras:如何为TensorBoard编写图像摘要
TensorFlow 2.0 Keras:如何为 TensorBoard 编写图像摘要