PTB rnn 模型的图间复制版本比单 GPU 版本慢(即使在 tf 1.0.0 中)

Posted

技术标签:

【中文标题】PTB rnn 模型的图间复制版本比单 GPU 版本慢(即使在 tf 1.0.0 中)【英文标题】:Between-graph replication version of PTB rnn model is slower than single-gpu version ( even in tf 1.0.0 ) 【发布时间】:2017-02-28 05:17:51 【问题描述】:

我将 PTB 模型(您可以在 tensorflow/models/tutorials/rnn/ptb 中找到)更改为中间图版本,但是这个分布式版本(带有 1 ps 服务器,2 worker)没有加速效果,即使ps和工人在单机上。时间线分析显示 GPU 作业和分布式版本的 CPU 作业之间存在显着延迟。以下是代码和时间线图:

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import time

import numpy as np
import tensorflow as tf

import reader
import tempfile

flags = tf.flags
logging = tf.logging

flags.DEFINE_string(
    "model", "small",
    "A type of model. Possible options are: small, medium, large.")
flags.DEFINE_string("data_path", None,
                    "Where the training/test data is stored.")
flags.DEFINE_string("save_path", None,
                    "Model output directory.")
flags.DEFINE_bool("use_fp16", False,
                  "Train using 16-bit floats instead of 32bit floats")

flags.DEFINE_string("ps_hosts","IP1:2222",
                    "Comma-separated list of hostname:port pairs")
flags.DEFINE_string("worker_hosts", "IP1:2223,IP1:2224",
                    "Comma-separated list of hostname:port pairs")
flags.DEFINE_string("job_name", None,"job name: worker or ps")
flags.DEFINE_integer("task_index", None,
                     "Worker task index, should be >= 0. task_index=0 is "
                     "the master worker task the performs the variable "
                     "initialization ")
flags.DEFINE_integer("num_gpus", 1,
                     "Total number of gpus for each machine."
                     "If you don't use GPU, please set it to '0'")
flags.DEFINE_integer("replicas_to_aggregate", None,
                     "Number of replicas to aggregate before parameter update"
                     "is applied (For sync_replicas mode only; default: "
                     "num_workers)")
flags.DEFINE_boolean("sync_replicas", False,
                     "Use the sync_replicas (synchronized replicas) mode, "
                     "wherein the parameter updates from workers are aggregated "
                     "before applied to avoid stale gradients")
flags.DEFINE_boolean(
    "existing_servers", False, "Whether servers already exists. If True, "
    "will use the worker hosts via their GRPC URLs (one client process "
    "per worker host). Otherwise, will create an in-process TensorFlow "
    "server.")

FLAGS = flags.FLAGS


def data_type():
  return tf.float16 if FLAGS.use_fp16 else tf.float32


class PTBInput(object):
  """The input data."""

  def __init__(self, config, data, ix, worker_num, name=None):
    data_len = len(data) // worker_num
    data = data[data_len * ix:data_len * (ix + 1)]

    self.batch_size = batch_size = config.batch_size
    self.num_steps = num_steps = config.num_steps
    self.epoch_size = ((len(data) // batch_size) - 1) // num_steps
    self.input_data, self.targets = reader.ptb_producer(
        data, batch_size, num_steps, name=name)


class PTBModel(object):
  """The PTB model."""

  def __init__(self, is_training, config, input_, num_workers=0, global_step=None):
    self._input = input_

    batch_size = input_.batch_size
    num_steps = input_.num_steps
    size = config.hidden_size
    vocab_size = config.vocab_size

    # Slightly better results can be obtained with forget gate biases
    # initialized to 1 but the hyperparameters of the model would need to be
    # different than reported in the paper.
    def lstm_cell():
      # return tf.contrib.rnn.BasicLSTMCell(
      return tf.nn.rnn_cell.BasicLSTMCell(
          size, forget_bias=0.0, state_is_tuple=True)
    attn_cell = lstm_cell
    if is_training and config.keep_prob < 1:
      def attn_cell():
        return tf.contrib.rnn.DropoutWrapper(
            lstm_cell(), output_keep_prob=config.keep_prob)
    # cell = tf.contrib.rnn.MultiRNNCell(
    cell = tf.nn.rnn_cell.MultiRNNCell(
        [attn_cell() for _ in range(config.num_layers)], state_is_tuple=True)

    self._initial_state = cell.zero_state(batch_size, data_type())

    with tf.device("/cpu:0"):
      embedding = tf.get_variable(
          "embedding", [vocab_size, size], dtype=data_type())
      inputs = tf.nn.embedding_lookup(embedding, input_.input_data)

    if is_training and config.keep_prob < 1:
      inputs = tf.nn.dropout(inputs, config.keep_prob)

    # Simplified version of models/tutorials/rnn/rnn.py's rnn().
    # This builds an unrolled LSTM for tutorial purposes only.
    # In general, use the rnn() or state_saving_rnn() from rnn.py.
    #
    # The alternative version of the code below is:
    #
    # inputs = tf.unstack(inputs, num=num_steps, axis=1)
    # outputs, state = tf.nn.rnn(cell, inputs,
    #                            initial_state=self._initial_state)
    outputs = []
    state = self._initial_state
    with tf.variable_scope("RNN"):
      for time_step in range(num_steps):
        if time_step > 0: tf.get_variable_scope().reuse_variables()
        (cell_output, state) = cell(inputs[:, time_step, :], state)
        outputs.append(cell_output)

    output = tf.reshape(tf.concat(1, outputs), [-1, size])
    softmax_w = tf.get_variable(
        "softmax_w", [size, vocab_size], dtype=data_type())
    softmax_b = tf.get_variable("softmax_b", [vocab_size], dtype=data_type())
    logits = tf.matmul(output, softmax_w) + softmax_b
    # loss = tf.contrib.legacy_seq2seq.sequence_loss_by_example(
    loss = tf.nn.seq2seq.sequence_loss_by_example(
        [logits],
        [tf.reshape(input_.targets, [-1])],
        [tf.ones([batch_size * num_steps], dtype=data_type())])
    self._cost = cost = tf.reduce_sum(loss) / batch_size
    self._final_state = state

    if not is_training:
      return

    self._lr = tf.Variable(0.0, trainable=False)
    tvars = tf.trainable_variables()
    grads, _ = tf.clip_by_global_norm(tf.gradients(cost, tvars),
                                      config.max_grad_norm)
    self._opt = tf.train.GradientDescentOptimizer(self._lr)

    if FLAGS.sync_replicas:
      if FLAGS.replicas_to_aggregate is None:
        replicas_to_aggregate = num_workers
      else:
        replicas_to_aggregate = FLAGS.replicas_to_aggregate

      self._opt = tf.train.SyncReplicasOptimizer(
        self._opt,
        replicas_to_aggregate=replicas_to_aggregate,
        total_num_replicas=num_workers,
        name="ptb_sync_replicas")

    # train_step = opt.minimize(cross_entropy, global_step=global_step)

    self._train_op = self._opt.apply_gradients(
        zip(grads, tvars),
        global_step)
        # global_step=tf.contrib.framework.get_or_create_global_step())

    self._new_lr = tf.placeholder(
        tf.float32, shape=[], name="new_learning_rate")
    self._lr_update = tf.assign(self._lr, self._new_lr)

  def assign_lr(self, session, lr_value):
    session.run(self._lr_update, feed_dict=self._new_lr: lr_value)

  @property
  def input(self):
    return self._input

  @property
  def initial_state(self):
    return self._initial_state

  @property
  def cost(self):
    return self._cost

  @property
  def final_state(self):
    return self._final_state

  @property
  def lr(self):
    return self._lr

  @property
  def opt(self):
    return self._opt

  @property
  def train_op(self):
    return self._train_op


class SmallConfig(object):
  """Small config."""
  init_scale = 0.1
  learning_rate = 1.0
  max_grad_norm = 5
  num_layers = 2
  num_steps = 20
  hidden_size = 200
  max_epoch = 4
  max_max_epoch = 13
  keep_prob = 1.0
  lr_decay = 0.5
  batch_size = 20
  vocab_size = 10000


class MediumConfig(object):
  """Medium config."""
  init_scale = 0.05
  learning_rate = 1.0
  max_grad_norm = 5
  num_layers = 2
  num_steps = 35
  hidden_size = 650
  max_epoch = 6
  max_max_epoch = 39
  keep_prob = 0.5
  lr_decay = 0.8
  batch_size = 20
  vocab_size = 10000


class LargeConfig(object):
  """Large config."""
  init_scale = 0.04
  learning_rate = 1.0
  max_grad_norm = 10
  num_layers = 2
  num_steps = 35
  hidden_size = 1500
  max_epoch = 14
  max_max_epoch = 55
  keep_prob = 0.35
  lr_decay = 1 / 1.15
  batch_size = 20
  vocab_size = 10000


class TestConfig(object):
  """Tiny config, for testing."""
  init_scale = 0.1
  learning_rate = 1.0
  max_grad_norm = 1
  num_layers = 1
  num_steps = 2
  hidden_size = 2
  max_epoch = 1
  max_max_epoch = 1
  keep_prob = 1.0
  lr_decay = 0.5
  batch_size = 20
  vocab_size = 10000


def run_epoch(session, model, global_step, eval_op=None, verbose=False):
  """Runs the model on the given data."""
  start_time = time.time()
  costs = 0.0
  iters = 0
  state = session.run(model.initial_state)

  fetches = 
      "cost": model.cost,
      "final_state": model.final_state,
      "global_step": global_step,
  
  if eval_op is not None:
    fetches["eval_op"] = eval_op

  for step in range(model.input.epoch_size):
    feed_dict = 
    for i, (c, h) in enumerate(model.initial_state):
      feed_dict[c] = state[i].c
      feed_dict[h] = state[i].h

    vals = session.run(fetches, feed_dict)
    cost = vals["cost"]
    state = vals["final_state"]

    costs += cost
    iters += model.input.num_steps

    if verbose and step % (model.input.epoch_size // 10) == 10:
      print("%.3f perplexity: %.3f speed: %.0f wps" %
            (step * 1.0 / model.input.epoch_size, np.exp(costs / iters),
             iters * model.input.batch_size / (time.time() - start_time)))
  print("esize is %.3f, one epoch time: %.0f s" % (step,(time.time() - start_time))) 
  return np.exp(costs / iters)


def get_config():
  if FLAGS.model == "small":
    return SmallConfig()
  elif FLAGS.model == "medium":
    return MediumConfig()
  elif FLAGS.model == "large":
    return LargeConfig()
  elif FLAGS.model == "test":
    return TestConfig()
  else:
    raise ValueError("Invalid model: %s", FLAGS.model)


def main(_):
  if not FLAGS.data_path:
    raise ValueError("Must set --data_path to PTB data directory")

  if FLAGS.job_name is None or FLAGS.job_name == "":
    raise ValueError("Must specify an explicit `job_name`")
  if FLAGS.task_index is None or FLAGS.task_index =="":
    raise ValueError("Must specify an explicit `task_index`")

  print("job name = %s" % FLAGS.job_name)
  print("task index = %d" % FLAGS.task_index)

  #Construct the cluster and start the server
  ps_spec = FLAGS.ps_hosts.split(",")
  worker_spec = FLAGS.worker_hosts.split(",")

  # Get the number of workers.
  num_workers = len(worker_spec)

  cluster = tf.train.ClusterSpec(
      "ps": ps_spec,
      "worker": worker_spec)

  if not FLAGS.existing_servers:
    # Not using existing servers. Create an in-process server.
    server = tf.train.Server(
        cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)
    if FLAGS.job_name == "ps":
      server.join()

  is_chief = (FLAGS.task_index == 0)
  if FLAGS.num_gpus > 0:
    # if FLAGS.num_gpus < num_workers:
    #  raise ValueError("number of gpus is less than number of workers")
    # Avoid gpu allocation conflict: now allocate task_num -> #gpu
    # for each worker in the corresponding machine
    gpu = 0 # (FLAGS.task_index % FLAGS.num_gpus)
    worker_device = "/job:worker/task:%d/gpu:%d" % (FLAGS.task_index, gpu)
  elif FLAGS.num_gpus == 0:
    # Just allocate the CPU to worker server
    cpu = 0
    worker_device = "/job:worker/task:%d/cpu:%d" % (FLAGS.task_index, cpu)
  # The device setter will automatically place Variables ops on separate
  # parameter servers (ps). The non-Variable ops will be placed on the workers.
  # The ps use CPU and workers use corresponding GPU
  raw_data = reader.ptb_raw_data(FLAGS.data_path)
  train_data, valid_data, test_data, _ = raw_data

  config = get_config()
  eval_config = get_config()
  eval_config.batch_size = 1
  eval_config.num_steps = 1

  # with tf.Graph().as_default():
  with tf.device(
      tf.train.replica_device_setter(
          worker_device=worker_device,
          ps_device="/job:ps/cpu:0",
          cluster=cluster)):
    '''raw_data = reader.ptb_raw_data(FLAGS.data_path)
    train_data, valid_data, test_data, _ = raw_data

    config = get_config()
    eval_config = get_config()
    eval_config.batch_size = 1
    eval_config.num_steps = 1'''

    # with tf.Graph().as_default():
    global_step = tf.Variable(0, name="global_step", trainable=False)
    initializer = tf.random_uniform_initializer(-config.init_scale,
                                                config.init_scale)

    with tf.name_scope("Train"):
      train_input = PTBInput(config=config, data=train_data,
                             ix=FLAGS.task_index, worker_num=num_workers, name="TrainInput")
      with tf.variable_scope("Model", reuse=None, initializer=initializer):
        m = PTBModel(is_training=True, config=config, input_=train_input, num_workers=num_workers,
                     global_step = global_step)
      tf.scalar_summary("Training Loss", m.cost)
      tf.scalar_summary("Learning Rate", m.lr)

    if FLAGS.sync_replicas:
      local_init_op = m.opt.local_step_init_op
      if is_chief:
        local_init_op = m.opt.chief_init_op

      ready_for_local_init_op = m.opt.ready_for_local_init_op

      # Initial token and chief queue runners required by the sync_replicas mode
      chief_queue_runner = m.opt.get_chief_queue_runner()
      sync_init_op = m.opt.get_init_tokens_op()

    # init_op = tf.global_variables_initializer()
    init_op = tf.initialize_all_variables()
    train_dir = tempfile.mkdtemp()

    with tf.name_scope("Valid"):
      valid_input = PTBInput(config=config, data=valid_data,
                             ix=FLAGS.task_index, worker_num=num_workers, name="ValidInput")
      with tf.variable_scope("Model", reuse=True, initializer=initializer):
        mvalid = PTBModel(is_training=False, config=config, input_=valid_input, num_workers=num_workers,
                          global_step=global_step)
      tf.scalar_summary("Validation Loss", mvalid.cost)

    with tf.name_scope("Test"):
      test_input = PTBInput(config=eval_config, data=test_data,
                            ix=0, worker_num=1, name="TestInput")
      with tf.variable_scope("Model", reuse=True, initializer=initializer):
        mtest = PTBModel(is_training=False, config=eval_config,
                         input_=test_input, num_workers=num_workers,
                         global_step=global_step)

    if FLAGS.sync_replicas:
      sv = tf.train.Supervisor(
          is_chief=is_chief,
          logdir=train_dir,
          init_op=init_op,
          local_init_op=local_init_op,
          ready_for_local_init_op=ready_for_local_init_op,
          recovery_wait_secs=1,
          global_step=global_step)
    else:
      sv = tf.train.Supervisor(
          is_chief=is_chief,
          logdir=train_dir,
          init_op=init_op,
          recovery_wait_secs=1,
          global_step=global_step)

    sess_config = tf.ConfigProto(
        allow_soft_placement=True,
        log_device_placement=False,
        device_filters=["/job:ps", "/job:worker/task:%d" % FLAGS.task_index])

    # The chief worker (task_index==0) session will prepare the session,
    # while the remaining workers will wait for the preparation to complete.
    if is_chief:
      print("Worker %d: Initializing session..." % FLAGS.task_index)
    else:
      print("Worker %d: Waiting for session to be initialized..." %
            FLAGS.task_index)

    if FLAGS.existing_servers:
      server_grpc_url = "grpc://" + worker_spec[FLAGS.task_index]
      print("Using existing server at: %s" % server_grpc_url)

      session = sv.prepare_or_wait_for_session(server_grpc_url,
                                            config=sess_config)
    else:
      session = sv.prepare_or_wait_for_session(server.target, config=sess_config)

    print("Worker %d: Session initialization complete." % FLAGS.task_index)

    if FLAGS.sync_replicas and is_chief:
      # Chief worker will start the chief queue runner and call the init op.
      session.run(sync_init_op)
      sv.start_queue_runners(session, [chief_queue_runner])

    # sv = tf.train.Supervisor(logdir=FLAGS.save_path)
    # with sv.managed_session() as session:
    for i in range(config.max_max_epoch):
      lr_decay = config.lr_decay ** max(i + 1 - config.max_epoch, 0.0)
      m.assign_lr(session, config.learning_rate * lr_decay)

      print("Epoch: %d Learning rate: %.3f" % (i + 1, session.run(m.lr)))
      train_perplexity = run_epoch(session, m, global_step, eval_op=m.train_op,
                                   verbose=True)
      print("Epoch: %d Train Perplexity: %.3f" % (i + 1, train_perplexity))
      valid_perplexity = run_epoch(session, mvalid, global_step)
      print("Epoch: %d Valid Perplexity: %.3f" % (i + 1, valid_perplexity))

    test_perplexity = run_epoch(session, mtest, global_step)
    print("Test Perplexity: %.3f" % test_perplexity)

    if FLAGS.save_path:
      print("Saving model to %s." % FLAGS.save_path)
      sv.saver.save(session, FLAGS.save_path, global_step=sv.global_step)


if __name__ == "__main__":
  tf.app.run()

原单gpu版本的时间线(1个ps server,2个worker)如下(1次迭代):

enter image description here

分布式版本的worker 0的时间线如下(一次迭代),worker 1的时间线类似:

enter image description here

机器有两个Tesla m40 gpus,单gpu版本性能约11000 wps(GPU util约为60%),而图间版本每个worker仅6000 wps(GPU util约超过每个 gpu 30%),因此两个工作人员(两个 gpu)的加速比仅为 1.09。 同时,我还制作了 PTB 模型的多 GPU 版本(没有分布式图间或图内框架),在同一台机器上具有 1.6+ 的加速。那么是什么导致分布式版本的性能不佳?

运行命令: PS: CUDA_VISIBLE_DEVICES="" python ptb_word_lm_dist.py --data_path=/data/simple-examples/data/ --model small --job_name=ps --task_index=0

Worker 0: CUDA_VISIBLE_DEVICES=0 python ptb_word_lm_dist.py --data_path=/data/simple-examples/data/ --model small --job_name=worker --task_index=0

工人 1:CUDA_VISIBLE_DEVICES=1 python ptb_word_lm_dist.py --data_path=/data/simple-examples/data/ --model small --job_name=worker --task_index=1

(使用 tensorflow 0.12 测试。tf 1.0.0 的修改版本性能更差)

【问题讨论】:

【参考方案1】:

如果你使用单进程 tensorflow 来做这件事呢? IE,带有CUDA_VISIBLE_DEVICES=0,1的单个进程?首先尝试这样做是有意义的,以排除与分布式 TensorFlow 无关的问题。

我从https://github.com/rafaljozefowicz/lm 看到了分布式 TensorFlow 运行模型的情况,其中 8 个 GPU 在一个工作人员上比 8 个 TensorFlow 工作人员快 8 倍。

一些挖掘发现这个问题是一个很大的贡献者https://github.com/tensorflow/tensorflow/issues/6116

现在发生的情况是,通过 gRPC 发送大张量非常低效。该修复程序已在 master 中,但尚未在 TF 1.0 中,因此您必须获得最新的 nightly 才能尝试。

【讨论】:

非常感谢!使用可见 0,1 gpus 的单个进程似乎不起作用。我还阅读了 issue 6116 并从源代码构建了一个 grpc-fix 版本 (github.com/llhe/tensorflow/tree/grpc-fix)。分布式版本确实比原来的单 GPU 版本快了将近 2 倍,但是,分布式和单 GPU 版本的绝对性能都比官方 pip 轮迅速下降(分布式:两个工人的每个工人 1450 wps,单- gpu 版本:1613 wps)。 我还构建了 yahoo 的 RDMA 加速版本,它有同样的问题(2 倍加速但两个版本的绝对性能下降严重) 你能找出瓶颈在哪里吗?您可以在模型中的各个点添加“tf.Print”节点,这会将带有微秒粒度时间戳的日志语句打印到 stderr。有各种模型几乎与机器线性扩展(即,GoogLeNet3 到 8 台机器),所以它有可能是模型而不是 TensorFlow 的错误。我试图检查您是否在一台机器上读取数据并将其分发到多台机器(不好),但无法从您发布的代码中分辨出来 谢谢,我会尝试 tf.Print 来找到瓶颈。如果此代码中有错误,那就太好了。这段代码使用了图间编程,每个worker分别从本地机器(语句“raw_data = reader.ptb_raw_data(...)”)读取输入数据(数据/简单示例/数据)。 @YaroslavBulatov 你能看一下这个问题吗,***.com/q/58926940/5904928为什么 lstm 最终输出与状态输出之间存在巨大差异。【参考方案2】:

可能瓶颈是磁盘 IO,因为单 GPU 版本的性能约为 11000 wps(GPU util 约为 60%)

【讨论】:

以上是关于PTB rnn 模型的图间复制版本比单 GPU 版本慢(即使在 tf 1.0.0 中)的主要内容,如果未能解决你的问题,请参考以下文章

TensorFlow官方样例

tensorflow 单机多GPU训练时间比单卡更慢/没有很大时间上提升

加载了 tensorflow.js 的图模型可以在 GPU 上使用数据而不先将其传输到 CPU 吗?

Recurrent neural network (RNN) - Pytorch版

如何为 RNN/LSTM 模型重塑数据集? [复制]

吴裕雄--天生自然 pythonTensorFlow自然语言处理:PTB 语言模型