PTB rnn 模型的图间复制版本比单 GPU 版本慢(即使在 tf 1.0.0 中)
Posted
技术标签:
【中文标题】PTB rnn 模型的图间复制版本比单 GPU 版本慢(即使在 tf 1.0.0 中)【英文标题】:Between-graph replication version of PTB rnn model is slower than single-gpu version ( even in tf 1.0.0 ) 【发布时间】:2017-02-28 05:17:51 【问题描述】:我将 PTB 模型(您可以在 tensorflow/models/tutorials/rnn/ptb 中找到)更改为中间图版本,但是这个分布式版本(带有 1 ps 服务器,2 worker)没有加速效果,即使ps和工人在单机上。时间线分析显示 GPU 作业和分布式版本的 CPU 作业之间存在显着延迟。以下是代码和时间线图:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import time
import numpy as np
import tensorflow as tf
import reader
import tempfile
flags = tf.flags
logging = tf.logging
flags.DEFINE_string(
"model", "small",
"A type of model. Possible options are: small, medium, large.")
flags.DEFINE_string("data_path", None,
"Where the training/test data is stored.")
flags.DEFINE_string("save_path", None,
"Model output directory.")
flags.DEFINE_bool("use_fp16", False,
"Train using 16-bit floats instead of 32bit floats")
flags.DEFINE_string("ps_hosts","IP1:2222",
"Comma-separated list of hostname:port pairs")
flags.DEFINE_string("worker_hosts", "IP1:2223,IP1:2224",
"Comma-separated list of hostname:port pairs")
flags.DEFINE_string("job_name", None,"job name: worker or ps")
flags.DEFINE_integer("task_index", None,
"Worker task index, should be >= 0. task_index=0 is "
"the master worker task the performs the variable "
"initialization ")
flags.DEFINE_integer("num_gpus", 1,
"Total number of gpus for each machine."
"If you don't use GPU, please set it to '0'")
flags.DEFINE_integer("replicas_to_aggregate", None,
"Number of replicas to aggregate before parameter update"
"is applied (For sync_replicas mode only; default: "
"num_workers)")
flags.DEFINE_boolean("sync_replicas", False,
"Use the sync_replicas (synchronized replicas) mode, "
"wherein the parameter updates from workers are aggregated "
"before applied to avoid stale gradients")
flags.DEFINE_boolean(
"existing_servers", False, "Whether servers already exists. If True, "
"will use the worker hosts via their GRPC URLs (one client process "
"per worker host). Otherwise, will create an in-process TensorFlow "
"server.")
FLAGS = flags.FLAGS
def data_type():
return tf.float16 if FLAGS.use_fp16 else tf.float32
class PTBInput(object):
"""The input data."""
def __init__(self, config, data, ix, worker_num, name=None):
data_len = len(data) // worker_num
data = data[data_len * ix:data_len * (ix + 1)]
self.batch_size = batch_size = config.batch_size
self.num_steps = num_steps = config.num_steps
self.epoch_size = ((len(data) // batch_size) - 1) // num_steps
self.input_data, self.targets = reader.ptb_producer(
data, batch_size, num_steps, name=name)
class PTBModel(object):
"""The PTB model."""
def __init__(self, is_training, config, input_, num_workers=0, global_step=None):
self._input = input_
batch_size = input_.batch_size
num_steps = input_.num_steps
size = config.hidden_size
vocab_size = config.vocab_size
# Slightly better results can be obtained with forget gate biases
# initialized to 1 but the hyperparameters of the model would need to be
# different than reported in the paper.
def lstm_cell():
# return tf.contrib.rnn.BasicLSTMCell(
return tf.nn.rnn_cell.BasicLSTMCell(
size, forget_bias=0.0, state_is_tuple=True)
attn_cell = lstm_cell
if is_training and config.keep_prob < 1:
def attn_cell():
return tf.contrib.rnn.DropoutWrapper(
lstm_cell(), output_keep_prob=config.keep_prob)
# cell = tf.contrib.rnn.MultiRNNCell(
cell = tf.nn.rnn_cell.MultiRNNCell(
[attn_cell() for _ in range(config.num_layers)], state_is_tuple=True)
self._initial_state = cell.zero_state(batch_size, data_type())
with tf.device("/cpu:0"):
embedding = tf.get_variable(
"embedding", [vocab_size, size], dtype=data_type())
inputs = tf.nn.embedding_lookup(embedding, input_.input_data)
if is_training and config.keep_prob < 1:
inputs = tf.nn.dropout(inputs, config.keep_prob)
# Simplified version of models/tutorials/rnn/rnn.py's rnn().
# This builds an unrolled LSTM for tutorial purposes only.
# In general, use the rnn() or state_saving_rnn() from rnn.py.
#
# The alternative version of the code below is:
#
# inputs = tf.unstack(inputs, num=num_steps, axis=1)
# outputs, state = tf.nn.rnn(cell, inputs,
# initial_state=self._initial_state)
outputs = []
state = self._initial_state
with tf.variable_scope("RNN"):
for time_step in range(num_steps):
if time_step > 0: tf.get_variable_scope().reuse_variables()
(cell_output, state) = cell(inputs[:, time_step, :], state)
outputs.append(cell_output)
output = tf.reshape(tf.concat(1, outputs), [-1, size])
softmax_w = tf.get_variable(
"softmax_w", [size, vocab_size], dtype=data_type())
softmax_b = tf.get_variable("softmax_b", [vocab_size], dtype=data_type())
logits = tf.matmul(output, softmax_w) + softmax_b
# loss = tf.contrib.legacy_seq2seq.sequence_loss_by_example(
loss = tf.nn.seq2seq.sequence_loss_by_example(
[logits],
[tf.reshape(input_.targets, [-1])],
[tf.ones([batch_size * num_steps], dtype=data_type())])
self._cost = cost = tf.reduce_sum(loss) / batch_size
self._final_state = state
if not is_training:
return
self._lr = tf.Variable(0.0, trainable=False)
tvars = tf.trainable_variables()
grads, _ = tf.clip_by_global_norm(tf.gradients(cost, tvars),
config.max_grad_norm)
self._opt = tf.train.GradientDescentOptimizer(self._lr)
if FLAGS.sync_replicas:
if FLAGS.replicas_to_aggregate is None:
replicas_to_aggregate = num_workers
else:
replicas_to_aggregate = FLAGS.replicas_to_aggregate
self._opt = tf.train.SyncReplicasOptimizer(
self._opt,
replicas_to_aggregate=replicas_to_aggregate,
total_num_replicas=num_workers,
name="ptb_sync_replicas")
# train_step = opt.minimize(cross_entropy, global_step=global_step)
self._train_op = self._opt.apply_gradients(
zip(grads, tvars),
global_step)
# global_step=tf.contrib.framework.get_or_create_global_step())
self._new_lr = tf.placeholder(
tf.float32, shape=[], name="new_learning_rate")
self._lr_update = tf.assign(self._lr, self._new_lr)
def assign_lr(self, session, lr_value):
session.run(self._lr_update, feed_dict=self._new_lr: lr_value)
@property
def input(self):
return self._input
@property
def initial_state(self):
return self._initial_state
@property
def cost(self):
return self._cost
@property
def final_state(self):
return self._final_state
@property
def lr(self):
return self._lr
@property
def opt(self):
return self._opt
@property
def train_op(self):
return self._train_op
class SmallConfig(object):
"""Small config."""
init_scale = 0.1
learning_rate = 1.0
max_grad_norm = 5
num_layers = 2
num_steps = 20
hidden_size = 200
max_epoch = 4
max_max_epoch = 13
keep_prob = 1.0
lr_decay = 0.5
batch_size = 20
vocab_size = 10000
class MediumConfig(object):
"""Medium config."""
init_scale = 0.05
learning_rate = 1.0
max_grad_norm = 5
num_layers = 2
num_steps = 35
hidden_size = 650
max_epoch = 6
max_max_epoch = 39
keep_prob = 0.5
lr_decay = 0.8
batch_size = 20
vocab_size = 10000
class LargeConfig(object):
"""Large config."""
init_scale = 0.04
learning_rate = 1.0
max_grad_norm = 10
num_layers = 2
num_steps = 35
hidden_size = 1500
max_epoch = 14
max_max_epoch = 55
keep_prob = 0.35
lr_decay = 1 / 1.15
batch_size = 20
vocab_size = 10000
class TestConfig(object):
"""Tiny config, for testing."""
init_scale = 0.1
learning_rate = 1.0
max_grad_norm = 1
num_layers = 1
num_steps = 2
hidden_size = 2
max_epoch = 1
max_max_epoch = 1
keep_prob = 1.0
lr_decay = 0.5
batch_size = 20
vocab_size = 10000
def run_epoch(session, model, global_step, eval_op=None, verbose=False):
"""Runs the model on the given data."""
start_time = time.time()
costs = 0.0
iters = 0
state = session.run(model.initial_state)
fetches =
"cost": model.cost,
"final_state": model.final_state,
"global_step": global_step,
if eval_op is not None:
fetches["eval_op"] = eval_op
for step in range(model.input.epoch_size):
feed_dict =
for i, (c, h) in enumerate(model.initial_state):
feed_dict[c] = state[i].c
feed_dict[h] = state[i].h
vals = session.run(fetches, feed_dict)
cost = vals["cost"]
state = vals["final_state"]
costs += cost
iters += model.input.num_steps
if verbose and step % (model.input.epoch_size // 10) == 10:
print("%.3f perplexity: %.3f speed: %.0f wps" %
(step * 1.0 / model.input.epoch_size, np.exp(costs / iters),
iters * model.input.batch_size / (time.time() - start_time)))
print("esize is %.3f, one epoch time: %.0f s" % (step,(time.time() - start_time)))
return np.exp(costs / iters)
def get_config():
if FLAGS.model == "small":
return SmallConfig()
elif FLAGS.model == "medium":
return MediumConfig()
elif FLAGS.model == "large":
return LargeConfig()
elif FLAGS.model == "test":
return TestConfig()
else:
raise ValueError("Invalid model: %s", FLAGS.model)
def main(_):
if not FLAGS.data_path:
raise ValueError("Must set --data_path to PTB data directory")
if FLAGS.job_name is None or FLAGS.job_name == "":
raise ValueError("Must specify an explicit `job_name`")
if FLAGS.task_index is None or FLAGS.task_index =="":
raise ValueError("Must specify an explicit `task_index`")
print("job name = %s" % FLAGS.job_name)
print("task index = %d" % FLAGS.task_index)
#Construct the cluster and start the server
ps_spec = FLAGS.ps_hosts.split(",")
worker_spec = FLAGS.worker_hosts.split(",")
# Get the number of workers.
num_workers = len(worker_spec)
cluster = tf.train.ClusterSpec(
"ps": ps_spec,
"worker": worker_spec)
if not FLAGS.existing_servers:
# Not using existing servers. Create an in-process server.
server = tf.train.Server(
cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)
if FLAGS.job_name == "ps":
server.join()
is_chief = (FLAGS.task_index == 0)
if FLAGS.num_gpus > 0:
# if FLAGS.num_gpus < num_workers:
# raise ValueError("number of gpus is less than number of workers")
# Avoid gpu allocation conflict: now allocate task_num -> #gpu
# for each worker in the corresponding machine
gpu = 0 # (FLAGS.task_index % FLAGS.num_gpus)
worker_device = "/job:worker/task:%d/gpu:%d" % (FLAGS.task_index, gpu)
elif FLAGS.num_gpus == 0:
# Just allocate the CPU to worker server
cpu = 0
worker_device = "/job:worker/task:%d/cpu:%d" % (FLAGS.task_index, cpu)
# The device setter will automatically place Variables ops on separate
# parameter servers (ps). The non-Variable ops will be placed on the workers.
# The ps use CPU and workers use corresponding GPU
raw_data = reader.ptb_raw_data(FLAGS.data_path)
train_data, valid_data, test_data, _ = raw_data
config = get_config()
eval_config = get_config()
eval_config.batch_size = 1
eval_config.num_steps = 1
# with tf.Graph().as_default():
with tf.device(
tf.train.replica_device_setter(
worker_device=worker_device,
ps_device="/job:ps/cpu:0",
cluster=cluster)):
'''raw_data = reader.ptb_raw_data(FLAGS.data_path)
train_data, valid_data, test_data, _ = raw_data
config = get_config()
eval_config = get_config()
eval_config.batch_size = 1
eval_config.num_steps = 1'''
# with tf.Graph().as_default():
global_step = tf.Variable(0, name="global_step", trainable=False)
initializer = tf.random_uniform_initializer(-config.init_scale,
config.init_scale)
with tf.name_scope("Train"):
train_input = PTBInput(config=config, data=train_data,
ix=FLAGS.task_index, worker_num=num_workers, name="TrainInput")
with tf.variable_scope("Model", reuse=None, initializer=initializer):
m = PTBModel(is_training=True, config=config, input_=train_input, num_workers=num_workers,
global_step = global_step)
tf.scalar_summary("Training Loss", m.cost)
tf.scalar_summary("Learning Rate", m.lr)
if FLAGS.sync_replicas:
local_init_op = m.opt.local_step_init_op
if is_chief:
local_init_op = m.opt.chief_init_op
ready_for_local_init_op = m.opt.ready_for_local_init_op
# Initial token and chief queue runners required by the sync_replicas mode
chief_queue_runner = m.opt.get_chief_queue_runner()
sync_init_op = m.opt.get_init_tokens_op()
# init_op = tf.global_variables_initializer()
init_op = tf.initialize_all_variables()
train_dir = tempfile.mkdtemp()
with tf.name_scope("Valid"):
valid_input = PTBInput(config=config, data=valid_data,
ix=FLAGS.task_index, worker_num=num_workers, name="ValidInput")
with tf.variable_scope("Model", reuse=True, initializer=initializer):
mvalid = PTBModel(is_training=False, config=config, input_=valid_input, num_workers=num_workers,
global_step=global_step)
tf.scalar_summary("Validation Loss", mvalid.cost)
with tf.name_scope("Test"):
test_input = PTBInput(config=eval_config, data=test_data,
ix=0, worker_num=1, name="TestInput")
with tf.variable_scope("Model", reuse=True, initializer=initializer):
mtest = PTBModel(is_training=False, config=eval_config,
input_=test_input, num_workers=num_workers,
global_step=global_step)
if FLAGS.sync_replicas:
sv = tf.train.Supervisor(
is_chief=is_chief,
logdir=train_dir,
init_op=init_op,
local_init_op=local_init_op,
ready_for_local_init_op=ready_for_local_init_op,
recovery_wait_secs=1,
global_step=global_step)
else:
sv = tf.train.Supervisor(
is_chief=is_chief,
logdir=train_dir,
init_op=init_op,
recovery_wait_secs=1,
global_step=global_step)
sess_config = tf.ConfigProto(
allow_soft_placement=True,
log_device_placement=False,
device_filters=["/job:ps", "/job:worker/task:%d" % FLAGS.task_index])
# The chief worker (task_index==0) session will prepare the session,
# while the remaining workers will wait for the preparation to complete.
if is_chief:
print("Worker %d: Initializing session..." % FLAGS.task_index)
else:
print("Worker %d: Waiting for session to be initialized..." %
FLAGS.task_index)
if FLAGS.existing_servers:
server_grpc_url = "grpc://" + worker_spec[FLAGS.task_index]
print("Using existing server at: %s" % server_grpc_url)
session = sv.prepare_or_wait_for_session(server_grpc_url,
config=sess_config)
else:
session = sv.prepare_or_wait_for_session(server.target, config=sess_config)
print("Worker %d: Session initialization complete." % FLAGS.task_index)
if FLAGS.sync_replicas and is_chief:
# Chief worker will start the chief queue runner and call the init op.
session.run(sync_init_op)
sv.start_queue_runners(session, [chief_queue_runner])
# sv = tf.train.Supervisor(logdir=FLAGS.save_path)
# with sv.managed_session() as session:
for i in range(config.max_max_epoch):
lr_decay = config.lr_decay ** max(i + 1 - config.max_epoch, 0.0)
m.assign_lr(session, config.learning_rate * lr_decay)
print("Epoch: %d Learning rate: %.3f" % (i + 1, session.run(m.lr)))
train_perplexity = run_epoch(session, m, global_step, eval_op=m.train_op,
verbose=True)
print("Epoch: %d Train Perplexity: %.3f" % (i + 1, train_perplexity))
valid_perplexity = run_epoch(session, mvalid, global_step)
print("Epoch: %d Valid Perplexity: %.3f" % (i + 1, valid_perplexity))
test_perplexity = run_epoch(session, mtest, global_step)
print("Test Perplexity: %.3f" % test_perplexity)
if FLAGS.save_path:
print("Saving model to %s." % FLAGS.save_path)
sv.saver.save(session, FLAGS.save_path, global_step=sv.global_step)
if __name__ == "__main__":
tf.app.run()
原单gpu版本的时间线(1个ps server,2个worker)如下(1次迭代):
enter image description here
分布式版本的worker 0的时间线如下(一次迭代),worker 1的时间线类似:
enter image description here
机器有两个Tesla m40 gpus,单gpu版本性能约11000 wps(GPU util约为60%),而图间版本每个worker仅6000 wps(GPU util约超过每个 gpu 30%),因此两个工作人员(两个 gpu)的加速比仅为 1.09。 同时,我还制作了 PTB 模型的多 GPU 版本(没有分布式图间或图内框架),在同一台机器上具有 1.6+ 的加速。那么是什么导致分布式版本的性能不佳?
运行命令: PS: CUDA_VISIBLE_DEVICES="" python ptb_word_lm_dist.py --data_path=/data/simple-examples/data/ --model small --job_name=ps --task_index=0
Worker 0: CUDA_VISIBLE_DEVICES=0 python ptb_word_lm_dist.py --data_path=/data/simple-examples/data/ --model small --job_name=worker --task_index=0
工人 1:CUDA_VISIBLE_DEVICES=1 python ptb_word_lm_dist.py --data_path=/data/simple-examples/data/ --model small --job_name=worker --task_index=1
(使用 tensorflow 0.12 测试。tf 1.0.0 的修改版本性能更差)
【问题讨论】:
【参考方案1】:如果你使用单进程 tensorflow 来做这件事呢? IE,带有CUDA_VISIBLE_DEVICES=0,1
的单个进程?首先尝试这样做是有意义的,以排除与分布式 TensorFlow 无关的问题。
我从https://github.com/rafaljozefowicz/lm 看到了分布式 TensorFlow 运行模型的情况,其中 8 个 GPU 在一个工作人员上比 8 个 TensorFlow 工作人员快 8 倍。
一些挖掘发现这个问题是一个很大的贡献者https://github.com/tensorflow/tensorflow/issues/6116
现在发生的情况是,通过 gRPC 发送大张量非常低效。该修复程序已在 master 中,但尚未在 TF 1.0 中,因此您必须获得最新的 nightly 才能尝试。
【讨论】:
非常感谢!使用可见 0,1 gpus 的单个进程似乎不起作用。我还阅读了 issue 6116 并从源代码构建了一个 grpc-fix 版本 (github.com/llhe/tensorflow/tree/grpc-fix)。分布式版本确实比原来的单 GPU 版本快了将近 2 倍,但是,分布式和单 GPU 版本的绝对性能都比官方 pip 轮迅速下降(分布式:两个工人的每个工人 1450 wps,单- gpu 版本:1613 wps)。 我还构建了 yahoo 的 RDMA 加速版本,它有同样的问题(2 倍加速但两个版本的绝对性能下降严重) 你能找出瓶颈在哪里吗?您可以在模型中的各个点添加“tf.Print”节点,这会将带有微秒粒度时间戳的日志语句打印到 stderr。有各种模型几乎与机器线性扩展(即,GoogLeNet3 到 8 台机器),所以它有可能是模型而不是 TensorFlow 的错误。我试图检查您是否在一台机器上读取数据并将其分发到多台机器(不好),但无法从您发布的代码中分辨出来 谢谢,我会尝试 tf.Print 来找到瓶颈。如果此代码中有错误,那就太好了。这段代码使用了图间编程,每个worker分别从本地机器(语句“raw_data = reader.ptb_raw_data(...)”)读取输入数据(数据/简单示例/数据)。 @YaroslavBulatov 你能看一下这个问题吗,***.com/q/58926940/5904928为什么 lstm 最终输出与状态输出之间存在巨大差异。【参考方案2】:可能瓶颈是磁盘 IO,因为单 GPU 版本的性能约为 11000 wps(GPU util 约为 60%)
【讨论】:
以上是关于PTB rnn 模型的图间复制版本比单 GPU 版本慢(即使在 tf 1.0.0 中)的主要内容,如果未能解决你的问题,请参考以下文章
tensorflow 单机多GPU训练时间比单卡更慢/没有很大时间上提升
加载了 tensorflow.js 的图模型可以在 GPU 上使用数据而不先将其传输到 CPU 吗?