tensorflow源码学习之一 -- tf架构

Posted lixiaolun

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了tensorflow源码学习之一 -- tf架构相关的知识,希望对你有一定的参考价值。

目前总共4篇:
 
 
tensorflow的架构图如下:
技术分享图片
我们主要使用的是Python client + C API + Distributed master + Dataflow executor(worker & ps)。
  1. client:
    定义计算数据流图(声明变量、执行Add操作等已经开始构建图了);
    使用session.run()将图(graph def)发送给。
  2. Distributed master
    根据session.run()传递的参数填充、优化数据流图
    使用placer.cc对数据流图中每个node分配机器和设备(CPU/GPU)
    使用graph_partition.cc根据placer结果将图切分到不同的机器上,并添加Send/Recv操作
    分发子图到worker & ps上
  3. Dataflow executor(worker service)
    执行图运算(包括Send/Recv操作)
交互关系图如下:
技术分享图片
注:"/job:worker/task:0" and "/job:ps/task:0" are both tasks with worker services。
下面是一段分布式训练示例代码:
‘‘‘
Distributed Tensorflow 0.8.0 example of using data parallelism and share model parameters.
Trains a simple sigmoid neural network on mnist for 20 epochs on three machines using one parameter server.
Change the hardcoded host urls below with your own hosts.
Run like this:
python train.py --job_name="ps" --task_index=0
python train.py --job_name="ps" --task_index=1
python train.py --job_name="worker" --task_index=0
python train.py --job_name="worker" --task_index=1
More details here: ischlag.github.io
‘‘‘

from __future__ import print_function

import tensorflow as tf
import sys
import time

# cluster specification
parameter_servers = ["node1:2221", "node1:2222"]
workers = [ "node1:2223", "node1:2224"]
cluster = tf.train.ClusterSpec({"ps":parameter_servers, "worker":workers})

# input flags
tf.app.flags.DEFINE_string("job_name", "", "Either ‘ps‘ or ‘worker‘")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
FLAGS = tf.app.flags.FLAGS

# start a server for a specific task
# 	ensorflowpython	rainingserver_lib.py (line 97)
server = tf.train.Server(cluster,
                         job_name=FLAGS.job_name,
                         task_index=FLAGS.task_index)

# config
batch_size = 100
learning_rate = 0.0005
training_epochs = 20
logs_path = "/tmp/mnist/log"

# load mnist data set
from tensorflow.examples.tutorials.mnist import input_data
mnist = input_data.read_data_sets(‘MNIST_data‘, one_hot=True)

if FLAGS.job_name == "ps":
  server.join()
elif FLAGS.job_name == "worker":

    # Between-graph replication
    with tf.device(tf.train.replica_device_setter(
        worker_device="/job:worker/task:%d" % FLAGS.task_index,
        cluster=cluster)):

        # count the number of updates
        global_step = tf.get_variable(‘global_step‘, [],
                                      initializer = tf.constant_initializer(0),
                                      trainable = False)

        # input images
        with tf.name_scope(‘input‘):
          # None -> batch size can be any size, 784 -> flattened mnist image
          x = tf.placeholder(tf.float32, shape=[None, 784], name="x-input")
          # target 10 output classes
          y_ = tf.placeholder(tf.float32, shape=[None, 10], name="y-input")

        # model parameters will change during training so we use tf.Variable
        tf.set_random_seed(1)
        # with tf.name_scope("weights"):
        #     W1 = tf.Variable(tf.random_normal([784, 100]))
        #     W2 = tf.Variable(tf.random_normal([100, 10]))
        #
        # # bias
        # with tf.name_scope("biases"):
        #     b1 = tf.Variable(tf.zeros([100]))
        #     b2 = tf.Variable(tf.zeros([10]))
        #
        # # implement model
        # with tf.name_scope("softmax"):
        #     # y is our prediction
        #     z2 = tf.add(tf.matmul(x,W1),b1)
        #     a2 = tf.nn.sigmoid(z2)
        #     z3 = tf.add(tf.matmul(a2,W2),b2)
        #     y  = tf.nn.softmax(z3)
        with tf.name_scope("weights"):
            W1 = tf.Variable(tf.random_normal([784, 10]))

        # bias
        with tf.name_scope("biases"):
            b1 = tf.Variable(tf.zeros([10]))

        # implement model
        with tf.name_scope("softmax"):
            # y is our prediction
            z2 = tf.add(tf.matmul(x,W1),b1)
            y  = tf.nn.softmax(z2)

        # specify cost function
        with tf.name_scope(‘cross_entropy‘):
            # this is our cost
            cross_entropy = tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1]))

        # specify optimizer
        with tf.name_scope(‘train‘):
            # optimizer is an "operation" which we can execute in a session
            grad_op = tf.train.GradientDescentOptimizer(learning_rate)
            ‘‘‘
            rep_op = tf.train.SyncReplicasOptimizer(grad_op,                    
                                                  replicas_to_aggregate=len(workers),
                                                  replica_id=FLAGS.task_index, 
                                                  total_num_replicas=len(workers),
                                                  use_locking=True)
            train_op = rep_op.minimize(cross_entropy, global_step=global_step)
            ‘‘‘
            train_op = grad_op.minimize(cross_entropy, global_step=global_step)

        ‘‘‘
        init_token_op = rep_op.get_init_tokens_op()
        chief_queue_runner = rep_op.get_chief_queue_runner()
        ‘‘‘

        with tf.name_scope(‘Accuracy‘):
            # accuracy
            correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1))
            accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

        # create a summary for our cost and accuracy
        tf.summary.scalar("cost", cross_entropy)
        tf.summary.scalar("accuracy", accuracy)

        # merge all summaries into a single "operation" which we can execute in a session
        summary_op = tf.summary.merge_all()
        init_op = tf.initialize_all_variables()
        print("Variables initialized ...")

    sv = tf.train.Supervisor(logdir=‘/tmp/distributed_mnist‘, is_chief=(FLAGS.task_index == 0),
                                                        global_step=global_step,
                                                        init_op=init_op)

    begin_time = time.time()
    frequency = 100
    with sv.prepare_or_wait_for_session(server.target) as sess:
        ‘‘‘
        # is chief
        if FLAGS.task_index == 0:
            sv.start_queue_runners(sess, [chief_queue_runner])
            sess.run(init_token_op)
        ‘‘‘
        # create log writer object (this will log on every machine)
        writer = tf.summary.FileWriter(logs_path, graph=tf.get_default_graph())

        # perform training cycles
        start_time = time.time()
        for epoch in range(training_epochs):

            # number of batches in one epoch
            batch_count = int(mnist.train.num_examples/batch_size)

            count = 0
            for i in range(batch_count):
                batch_x, batch_y = mnist.train.next_batch(batch_size)

                # perform the operations we defined earlier on batch
                _, cost, summary, step = sess.run(
                                                [train_op, cross_entropy, summary_op, global_step],
                                                feed_dict={x: batch_x, y_: batch_y})
                writer.add_summary(summary, step)

                count += 1
                if count % frequency == 0 or i+1 == batch_count:
                    elapsed_time = time.time() - start_time
                    start_time = time.time()
                    print("Step: %d," % (step+1),
                                " Epoch: %2d," % (epoch+1),
                                " Batch: %3d of %3d," % (i+1, batch_count),
                                " Cost: %.4f," % cost,
                                " AvgTime: %3.2fms" % float(elapsed_time*1000/frequency))
                    count = 0


        print("Test-Accuracy: %2.2f" % sess.run(accuracy, feed_dict={x: mnist.test.images, y_: mnist.test.labels}))
        print("Total Time: %3.2fs" % float(time.time() - begin_time))
        print("Final Cost: %.4f" % cost)

    sv.stop()
    print("done")

 

这里,我们使用的是Between-graph replication和Asynchronous training训练方式。
注意,tf支持多种训练方式,参考:https://www.tensorflow.org/deploy/distributed#replicated_training

  • Between-graph replication:需要设置tf.train.replica_device_setter;
  • Asynchronous training      :使用GradientDescentOptimizer。

如果使用同步训练,需要使用SyncReplicasOptimizer

以上是关于tensorflow源码学习之一 -- tf架构的主要内容,如果未能解决你的问题,请参考以下文章

TensorFlow版本变迁与tf1.0架构

小白学习tensorflow教程三TF2新特性@tf.function和AutoGraph

二少带你学习TensorFlow框架原理——2.1 环境准备与TF生态

Tensorflow源码解析1 -- 内核架构和源码结构

『计算机视觉』SSD源码学习_基于TensorFlow(待续)

学习Tensorflow之基本操作