实战:Spark和Tensorflow整合
Posted Spark技术日报
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了实战:Spark和Tensorflow整合相关的知识,希望对你有一定的参考价值。
1
Spark 和 Tensorflow 整合
知识点:
Tensorflow 编程
PySpark MLlib 相关知识
PySpark 相关知识
这个章节工程的东西偏多,可能大数据架构或者研发更适合看。前面【PySpark MLlib 基础】已经做了一些铺垫,比如把 Spark Deep Learning 打成了一个Jar包。还记得这个文件么?
target/scala-2.11/spark-deep-learning-assembly-0.2.0-spark2.1.jar
Spark Deep Learning (后续我会简称为SDL)项目就是为了让 Spark 和Tensorflow 整合的一个项目。
Spark 成功的实现了当年的承诺,让数据处理变得更容易,现在,雄心勃勃的 Databricks 公司展开了一个新的愿景:让深度学习变得更容易。 SDL则是这个愿景的产物。
尽管如此,SDL 现阶段依然有几个问题:
进度缓慢
只做了图像相关的工作,还没有NLP相关的工具使用
分布式调参功能,基本不可用
不支持 TensorFlow 分布式训练
第二点经过我一段时间的努力,SDL 现在也支持 NLP 相关的处理了,更实际的应用会在下一篇文章中说明。
先对以下两点做些解释:
所谓分布式调参功能,是指运行多个 Tensorflow 实例,但是给不同的超参数,可以很大的加快我们 tunning 的效率。
TensorFlow 分布式训练指的是,TF 自身支持 PS 和 Worker 角色的划分(这个是机器学习的典型架构),但是 SDL 不支持这点。
为啥说 SDL 的分布式调参功能不可用呢?原因是现阶段 SDL 会把所有数据都 collect 到 driver 端,然后 broadcast 到每个 executor 节点,事实上是,随便一个数据集都远大于单一节点的内存,这在实际场景里基本是无法忍受的。
因此需要对 SDL 做一些改造
Spark 数据处理是完全分布式的,比如进行特征工程,进行 word2vec 训练之类,所以最后数据可能在多个分区里。Tensorflow 也会在多个Executor 里,默认某个 Tensorflow 所在 Executor 是没办法到其他Executor 去获取数据的,为了解决这个问题,我们引入 Kafka。
Spark 做完数据预处理后,会将数据写入 Kafka,同时在新 Task 里启动TensorFlow 实例,并且到 Kafka 拉取数据,喂给 Tensorflow 实例。 Kafka 的效率非常高,基本可以吃满你的网卡,所以性能不用担心。
下面我们看看具体实现
通过 Tensorflow 进行训练,所以我们要实现本质上是 Estimator,如果大家忘了 Estimator 是个什么概念,不妨回到【PySpark MLlib 基础】再温习下概念。这里我们实现的是一个 TextEstimator(名字不算准确,TFEstimator 可能更好)。在看具体代码之前,先假设我们已经实现了这个 TextEstimator,现在展示这个工具类怎么使用的:
estimator = TextEstimator(kafkaParam={"bootstrap_servers": ["127.0.0.1"], "topic": "test", "group_id": "sdl_1", "test_mode": False},
runningMode="Normal",
fitParam=[{"epochs": 10, "batch_size": 32, "sequence_length": 64, "num_classes": num_classes, "word_embedding_bs": word2v_mapping_br.value, "tensor_board_dir": "./tb_dir6", "dev_collection": dev_collection}],
mapFnParam=text_cnn_map_fun)
有点复杂。第一个是 Kafka 的参数,比如服务器在哪,组名是什么。第二个参数是运行的模式,Normal 表示的是分布式调参功能,TFoS 则表示分布式训练。fitParam 填充的是一些模型超参数,给多少组,就会有多少个 ensorflow 实例同时运行。mapFnParam 则是你的Tensorflow代码。
我们简要看下 mapFnParam 的定义:
def text_cnn_map_fun(args={}, ctx=None, _read_data=None):
import tensorflow as tf import sys from sklearn.utils import Bunch
FLAGS = Bunch(**args["params"]["fitParam"])
embedded_vec = FLAGS.word_embedding_bs
函数名称随便定义,只要保持和 TextEstimator 配置的一样即可。它接收三个参数,分别是:
args 外部传递过来的参数
ctx 如果是 TFoS 模式的话,表示的 Tensorflow 的分布式上下文。如果是 Normal 模式则为 None
_read_data 一个迭代器,可以方便拿到语料
比如我想拿到前面配置的每个 batch 的大小,那么通过下面的方式获取。
BATCHS_SIZE = FLAGS.batch_size
如果我要拿训练数据,按 BATCHS_SIZE 一批一批的拿,像下面这样:
for items in _read_data(max_records=BATCHS_SIZE):
X = [item["features"] for item in items]
Y = [item["label"].toArray() for item in items]
_, gs = sess.run([train_step, global_step],
feed_dict={input_x: X, input_y: Y})
可以看到,非常方便。要运行起来也比较简单:
export JAR="....../target/scala-2.11/spark-deep-learning-assembly-0.2.0-spark2.2.jar"export PYTHONIOENCODING=utf8;./bin/spark-submit \
--driver-memory 12g \
--py-files $JAR,$PY_HOME/your_tensforlow_program.py \
--jars $JAR \
--master "local[*]" $PY_HOME/you_spark_main_program.py
现在我们来看看 TextEstimator 是如何实现的,优先看 Normal 模式,也就是分布式调参,TFoS 后续会提及。
def _fitInParallel(self, dataset, paramMaps):
from time import gmtime, strftime
kafaParams = self.getKafkaParam()
topic = kafaParams["topic"] + "_" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
group_id = kafaParams["group_id"]
bootstrap_servers = kafaParams["bootstrap_servers"]
首先我们获取 Kafka 参数,这里需要注意的是因为我们可能会反复进行训练,如果都是往同一 topic 写,会产生问题,所以我们会给 topic 加一个时间后缀,这样每次训练都会使用新的 topic,但是外部不会有感知。
def _write_data():
def _write_partition(index, d_iter):
producer = KafkaProducer(
bootstrap_servers=bootstrap_servers) try: for d in d_iter:
producer.send(topic, pickle.dumps(d))
producer.send(topic, pickle.dumps("_stop_"))
producer.flush() finally:
producer.close() return []
dataset.rdd.mapPartitionsWithIndex(_write_partition).count()
t = threading.Thread(target=_write_data)
t.start()
定义了一个写 Kafka 的方法,并且启动一个新线程往里面写。Spark 每个分区写入完成,都会写入一个_stop_
标记。这样方便消费者知道是不是还需要继续消费。
接着定义消费的方法:
def _local_fit(override_param_map):
def _read_data(max_records=64):
consumer = KafkaConsumer(
topic,
auto_offset_reset="earliest",
enable_auto_commit=False
) try:
stop_count = 0
fail_msg_count = 0
while True: if kafka_test_mode:
time.sleep(1)
messages = consumer.poll(timeout_ms=1000, max_records=max_records)
group_msgs = [] for tp, records in messages.items(): for record in records: try:
msg_value = pickle.loads(record.value) if msg_value == "_stop_":
stop_count += 1
else:
group_msgs.append(msg_value) except:
fail_msg_count += 0
pass
if len(group_msgs) > 0: yield group_msgs
if stop_count >= stop_flag_num and len(group_msgs) == 0: break
finally:
consumer.close()
result = self.getMapFnParam()(args={"params": params},
ctx=None,
_read_data=_read_data) return result return paramMapsRDD.map(lambda paramMap: (paramMap, _local_fit(paramMap)))
这里有几个点需要注意:
消费时需要从头 k 开始消费,并且不自动 commit 消费进度。
消费时,会进行计数,如果发现
_stop_
标记达到了要求,就停止消费。把前面配置的fitParams参数构建成一个 RDD,并且确保每个子参数集对应一个分区。
代码里的_read_data
就是前面我们看到的 Tensorflow 的入口函数。做法还是比较简单的。
上面就是所有的核心逻辑了。
前面我们提到 SDL 的第四个缺点:不支持 TensorFlow 分布式训练。为了解决这个问题,我们需要引入 TFoS。 TFoS 是 Yahoo 开发的一个开源项目,利用 Spark 的分布式机制自动实现 Tensorflow 分布式集群的构建,非常方便。所以我们只要集成到 SDL 就好。同样的,在看实现代码之前,我们先看看是怎么用的。
因为TFoS无法用 Spark Local 模式跑,需要启动 Standalone 模式。完整的示例代码可以在 TFoSExample。
使用方式和前面一样:
estimator = TextEstimator(fitParam=[{"epochs": 1, "cluster_size": 2, "batch_size": 1, "model": "/tmp/model"}],
runningMode="TFoS",
mapFnParam=map_fun)
estimator.fit(df).collect()
map_fun 里 tensorflow 里面的代码有较大不同:
需要获取ClusterServer等相关对象
使用tf.train.Supervisor等来管理会话
区分ps 和worker角色
了解完使用,大家也可能好奇 SDL 和 TFoS 集成是如何实现的:
def _fitInCluster(self, dataset, paramMaps):
sc = JVMAPI._curr_sc()
baseParamMap = self.extractParamMap()
baseParamDict = dict([(param.name, val) for param, val in baseParamMap.items()])
args = self._clusterModelDefaultValue(sc, paramMaps[0])
args["params"] = baseParamDict
cluster = TFCluster.run(sc, self.getMapFnParam(), args, args['cluster_size'], args['num_ps'],
args['tensorboard'],
TFCluster.InputMode.SPARK)
cluster.train(dataset.rdd, args["epochs"])
cluster.shutdown()
使用 TFoS 的 TFCluster 来创建具体集群,传递诸如cluster_size
, num_ps
等相关信息即可。可以看到,实现非常简单。但是TF里的代码如前所述,会比较复杂一些。
Spark 和 Tensorflow 通过 TextEstimator 集成,解决了我上面说的几个痛点,同时能够很好的让工程和算法同学进行分工协作,意义重大。
Tips:
一个热爱 Spark 的技术人(记得备注哈)
感谢大家的点赞
↓↓↓
以上是关于实战:Spark和Tensorflow整合的主要内容,如果未能解决你的问题,请参考以下文章
Spark 实战系列Phoenix 整合 spark 进行查询分析
Spark Streaming实时流处理项目实战Spark Streaming整合Kafka实战一
Spark Streaming实时流处理项目实战Spark Streaming整合Kafka实战一
Spark Streaming实时流处理项目实战Spark Streaming整合Kafka实战一