双向LSTM模型的tensorflow实现
Posted kisetsu
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了双向LSTM模型的tensorflow实现相关的知识,希望对你有一定的参考价值。
来源:https://github.com/jiangxinyang227/NLP-Project/text_classifier
import tensorflow as tf
from .base import BaseModel
class BiLstmAttenModel(BaseModel):
def __init__(self, config, vocab_size, word_vectors):
super(BiLstmAttenModel, self).__init__(config=config, vocab_size=vocab_size, word_vectors=word_vectors)
# 构建模型
self.build_model()
# 初始化保存模型的saver对象
self.init_saver()
def build_model(self):
# 词嵌入层
with tf.name_scope("embedding"):
# 利用预训练的词向量初始化词嵌入矩阵
if self.word_vectors is not None:
embedding_w = tf.Variable(tf.cast(self.word_vectors, dtype=tf.float32, name="word2vec"), name="embedding_w")
else:
embedding_w = tf.get_variable("embedding_w", shape=[self.vocab_size, self.config["embedding_size"]],initializer=tf.contrib.layers.xavier_initializer())
# 利用词嵌入矩阵将输入的数据中的词转换成词向量,维度[batch_size, sequence_length, embedding_size]
embedded_words = tf.nn.embedding_lookup(embedding_w, self.inputs)
# 定义两层双向LSTM的模型结构
with tf.name_scope("Bi-LSTM"):
for idx, hidden_size in enumerate(self.config["hidden_sizes"]):
with tf.name_scope("Bi-LSTM" + str(idx)):
# 定义前向LSTM结构
lstm_fw_cell = tf.nn.rnn_cell.DropoutWrapper(
tf.nn.rnn_cell.LSTMCell(num_units=hidden_size, state_is_tuple=True),
output_keep_prob=self.keep_prob)
# 定义反向LSTM结构
lstm_bw_cell = tf.nn.rnn_cell.DropoutWrapper(
tf.nn.rnn_cell.LSTMCell(num_units=hidden_size, state_is_tuple=True),
output_keep_prob=self.keep_prob)
# 采用动态rnn,可以动态的输入序列的长度,若没有输入,则取序列的全长
# outputs是一个元祖(output_fw, output_bw),其中两个元素的维度都是[batch_size, max_time, hidden_size],
# fw和bw的hidden_size一样
# self.current_state 是最终的状态,二元组(state_fw, state_bw),state_fw=[batch_size, s],s是一个元祖(h, c)
outputs, current_state = tf.nn.bidirectional_dynamic_rnn(lstm_fw_cell, lstm_bw_cell,
embedded_words, dtype=tf.float32,
scope="bi-lstm" + str(idx))
# 对outputs中的fw和bw的结果拼接 [batch_size, time_step, hidden_size * 2]
embedded_words = tf.concat(outputs, 2)
# 将最后一层Bi-LSTM输出的结果分割成前向和后向的输出
outputs = tf.split(embedded_words, 2, -1)
# 在Bi-LSTM+Attention的论文中,将前向和后向的输出相加
with tf.name_scope("Attention"):
H = outputs[0] + outputs[1]
# 得到Attention的输出
output = self._attention(H)
output_size = self.config["hidden_sizes"][-1]
# 全连接层的输出
with tf.name_scope("output"):
output_w = tf.get_variable(
"output_w",
shape=[output_size, self.config["num_classes"]],
initializer=tf.contrib.layers.xavier_initializer())
output_b = tf.Variable(tf.constant(0.1, shape=[self.config["num_classes"]]), name="output_b")
self.l2_loss += tf.nn.l2_loss(output_w)
self.l2_loss += tf.nn.l2_loss(output_b)
self.logits = tf.nn.xw_plus_b(output, output_w, output_b, name="logits")
self.predictions = self.get_predictions()
self.loss = self.cal_loss()
self.train_op, self.summary_op = self.get_train_op()
def _attention(self, H):
"""
利用Attention机制得到句子的向量表示
"""
# 获得最后一层LSTM的神经元数量
hidden_size = self.config["hidden_sizes"][-1]
# 初始化一个权重向量,是可训练的参数
W = tf.Variable(tf.random_normal([hidden_size], stddev=0.1))
# 对Bi-LSTM的输出用激活函数做非线性转换
M = tf.tanh(H)
# 对W和M做矩阵运算,M=[batch_size, time_step, hidden_size],计算前做维度转换成[batch_size * time_step, hidden_size]
# newM = [batch_size, time_step, 1],每一个时间步的输出由向量转换成一个数字
newM = tf.matmul(tf.reshape(M, [-1, hidden_size]), tf.reshape(W, [-1, 1]))
# 对newM做维度转换成[batch_size, time_step]
restoreM = tf.reshape(newM, [-1, self.config["sequence_length"]])
# 用softmax做归一化处理[batch_size, time_step]
self.alpha = tf.nn.softmax(restoreM)
# 利用求得的alpha的值对H进行加权求和,用矩阵运算直接操作
r = tf.matmul(tf.transpose(H, [0, 2, 1]), tf.reshape(self.alpha, [-1, self.config["sequence_length"], 1]))
# 将三维压缩成二维sequeezeR=[batch_size, hidden_size]
sequeezeR = tf.squeeze(r)
sentenceRepren = tf.tanh(sequeezeR)
# 对Attention的输出可以做dropout处理
output = tf.nn.dropout(sentenceRepren, self.keep_prob)
return output
base.py
import tensorflow as tf
import numpy as np
class BaseModel(object):
def __init__(self, config, vocab_size=None, word_vectors=None):
"""
文本分类的基类,提供了各种属性和训练,验证,测试的方法
:param config: 模型的配置参数
:param vocab_size: 当不提供词向量的时候需要vocab_size来初始化词向量
:param word_vectors:预训练的词向量,word_vectors 和 vocab_size必须有一个不为None
"""
self.config = config
self.vocab_size = vocab_size
self.word_vectors = word_vectors
self.inputs = tf.placeholder(tf.int32, [None, None], name="inputs") # 数据输入
self.labels = tf.placeholder(tf.float32, [None], name="labels") # 标签
self.keep_prob = tf.placeholder(tf.float32, name="keep_prob") # dropout
self.l2_loss = tf.constant(0.0) # 定义l2损失
self.loss = 0.0 # 损失
self.train_op = None # 训练入口
self.summary_op = None
self.logits = None # 模型最后一层的输出
self.predictions = None # 预测结果
self.saver = None # 保存为ckpt模型的对象
def cal_loss(self):
"""
计算损失,支持二分类和多分类
:return:
"""
with tf.name_scope("loss"):
losses = 0.0
if self.config["num_classes"] == 1:
losses = tf.nn.sigmoid_cross_entropy_with_logits(logits=self.logits,
labels=tf.reshape(self.labels, [-1, 1]))
elif self.config["num_classes"] > 1:
self.labels = tf.cast(self.labels, dtype=tf.int32)
losses = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=self.logits,
labels=self.labels)
loss = tf.reduce_mean(losses)
return loss
def get_optimizer(self):
"""
获得优化器
:return:
"""
optimizer = None
if self.config["optimization"] == "adam":
optimizer = tf.train.AdamOptimizer(self.config["learning_rate"])
if self.config["optimization"] == "rmsprop":
optimizer = tf.train.RMSPropOptimizer(self.config["learning_rate"])
if self.config["optimization"] == "sgd":
optimizer = tf.train.GradientDescentOptimizer(self.config["learning_rate"])
return optimizer
def get_train_op(self):
"""
获得训练的入口
:return:
"""
# 定义优化器
optimizer = self.get_optimizer()
trainable_params = tf.trainable_variables()
gradients = tf.gradients(self.loss, trainable_params)
# 对梯度进行梯度截断
clip_gradients, _ = tf.clip_by_global_norm(gradients, self.config["max_grad_norm"])
train_op = optimizer.apply_gradients(zip(clip_gradients, trainable_params))
tf.summary.scalar("loss", self.loss)
summary_op = tf.summary.merge_all()
return train_op, summary_op
def get_predictions(self):
"""
得到预测结果
:return:
"""
predictions = None
if self.config["num_classes"] == 1:
predictions = tf.cast(tf.greater_equal(self.logits, 0.0), tf.int32, name="predictions")
elif self.config["num_classes"] > 1:
predictions = tf.argmax(self.logits, axis=-1, name="predictions")
return predictions
def build_model(self):
"""
创建模型
:return:
"""
raise NotImplementedError
def init_saver(self):
"""
初始化saver对象
:return:
"""
self.saver = tf.train.Saver(tf.global_variables())
def train(self, sess, batch, dropout_prob):
"""
训练模型
:param sess: tf的会话对象
:param batch: batch数据
:param dropout_prob: dropout比例
:return: 损失和预测结果
"""
feed_dict = {self.inputs: batch["x"],
self.labels: batch["y"],
self.keep_prob: dropout_prob}
# 训练模型
_, summary, loss, predictions = sess.run([self.train_op, self.summary_op, self.loss, self.predictions],
feed_dict=feed_dict)
return summary, loss, predictions
def eval(self, sess, batch):
"""
验证模型
:param sess: tf中的会话对象
:param batch: batch数据
:return: 损失和预测结果
"""
feed_dict = {self.inputs: batch["x"],
self.labels: batch["y"],
self.keep_prob: 1.0}
summary, loss, predictions = sess.run([self.summary_op, self.loss, self.predictions], feed_dict=feed_dict)
return summary, loss, predictions
def infer(self, sess, inputs):
"""
预测新数据
:param sess: tf中的会话对象
:param inputs: batch数据
:return: 预测结果
"""
feed_dict = {self.inputs: np.array([inputs]),
self.keep_prob: 1.0}
predict = sess.run(self.predictions, feed_dict=feed_dict)
return predict
以上是关于双向LSTM模型的tensorflow实现的主要内容,如果未能解决你的问题,请参考以下文章
在 TensorFlow 中使用 LSTM-CGAN 生成 MNIST 数字
第二十一节,使用TensorFlow实现LSTM和GRU网络