有状态 LSTM 和流预测
Posted
技术标签:
【中文标题】有状态 LSTM 和流预测【英文标题】:Stateful LSTM and stream predictions 【发布时间】:2019-04-10 22:18:18 【问题描述】:我已经在多批次的 7 个样本上训练了一个 LSTM 模型(使用 Keras 和 TF 构建),每个批次有 3 个特征,形状类似于下面的样本(下面的数字只是用于解释目的的占位符),每批次标记为 0 或 1:
数据:
[
[[1,2,3],[1,2,3],[1,2,3],[1,2,3],[1,2,3],[1,2,3],[1,2,3]]
[[1,2,3],[1,2,3],[1,2,3],[1,2,3],[1,2,3],[1,2,3],[1,2,3]]
[[1,2,3],[1,2,3],[1,2,3],[1,2,3],[1,2,3],[1,2,3],[1,2,3]]
...
]
即:m个序列的批次,每个长度为7,其元素是3维向量(因此批次具有形状(m73))
目标:
[
[1]
[0]
[1]
...
]
在我的生产环境中,数据是具有 3 个特征的样本流 ([1,2,3],[1,2,3]...
)。我想在每个样本到达我的模型时对其进行流式传输并获得中间概率,而无需等待整个批次 (7) - 请参阅下面的动画。
我的一个想法是用 0 填充缺失样本的批次,
[[0,0,0],[0,0,0],[0,0,0],[0,0,0],[0,0,0],[0,0,0],[1,2,3]]
但这似乎效率低下。
如果有任何帮助可以为我指明正确的方向,即以持久的方式保存 LSTM 中间状态,同时等待下一个样本并使用部分数据在特定批量大小上训练的模型进行预测,我将不胜感激。
更新,包括型号代码:
opt = optimizers.Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=10e-8, decay=0.001)
model = Sequential()
num_features = data.shape[2]
num_samples = data.shape[1]
first_lstm = LSTM(32, batch_input_shape=(None, num_samples, num_features),
return_sequences=True, activation='tanh')
model.add(first_lstm)
model.add(LeakyReLU())
model.add(Dropout(0.2))
model.add(LSTM(16, return_sequences=True, activation='tanh'))
model.add(Dropout(0.2))
model.add(LeakyReLU())
model.add(Flatten())
model.add(Dense(1, activation='sigmoid'))
model.compile(loss='binary_crossentropy', optimizer=opt,
metrics=['accuracy', keras_metrics.precision(),
keras_metrics.recall(), f1])
模型总结:
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
lstm_1 (LSTM) (None, 100, 32) 6272
_________________________________________________________________
leaky_re_lu_1 (LeakyReLU) (None, 100, 32) 0
_________________________________________________________________
dropout_1 (Dropout) (None, 100, 32) 0
_________________________________________________________________
lstm_2 (LSTM) (None, 100, 16) 3136
_________________________________________________________________
dropout_2 (Dropout) (None, 100, 16) 0
_________________________________________________________________
leaky_re_lu_2 (LeakyReLU) (None, 100, 16) 0
_________________________________________________________________
flatten_1 (Flatten) (None, 1600) 0
_________________________________________________________________
dense_1 (Dense) (None, 1) 1601
=================================================================
Total params: 11,009
Trainable params: 11,009
Non-trainable params: 0
_________________________________________________________________
【问题讨论】:
@ShlomiSchwartz 我在答案的开头添加了一条注释。请阅读以确保您走在正确的轨道上。 @ShlomiSchwartz This 可能有助于理解差异。 @ShlomiSchwartz 你的训练模型是否有状态并不重要。您可以使用涉及设置stateful=True
的解决方案,通过将训练模型的权重复制到推理模型并在@DanielMöller 的答案中启用有状态。
您好@ShlomiSchwartz,我根据您的模型架构更新了我的答案,请看一下。
如果批次是较长序列的一部分,则您的模型应该是有状态的,或者您应该使用包含“整个”序列的批次。请注意,在所有情况下,模型都不会看到同一批次中序列之间的任何关系。有状态模型将一个批次连接到另一批次。我建议详细查看链接中的答案,以准确了解 keras 如何解释您的数据:***.com/questions/38714959/understanding-keras-lstms/…
【参考方案1】:
如果我理解正确,您有一批m
序列,每个长度为 7,其元素是 3 维向量(因此批的形状为 (m*7*3)
)。
在任何Keras RNN 中,您都可以设置
return_sequences
标记到True
成为中间状态,即对于每个批次,而不是最终预测,您将获得相应的 7 个输出,其中输出 i
表示在给定所有输入的情况下在阶段 i
的预测从 0 到 i
。
但你最终会一下子得到所有。据我所知,Keras 不提供用于在处理批处理时检索吞吐量的直接接口。如果您使用任何 CUDNN
-optimized 变体,这可能会受到更多限制。你可以做的基本上是将你的batch视为7个连续的batch形状(m*1*3)
,并逐步将它们馈送到你的LSTM,记录每一步的隐藏状态和预测。为此,您可以将return_state
设置为True
并手动执行,也可以简单地将stateful
设置为True
并让对象跟踪它。
下面的 Python2+Keras 示例应该准确地代表您想要的。具体来说:
允许以持久的方式保存整个 LSTM 中间状态 等待下一个样本时 并在一个模型上进行预测,该模型是根据特定批量大小训练的,该批量大小可能是任意且未知的。为此,它包含一个stateful=True
的示例,用于最简单的训练,return_state=True
用于最精确的推理,因此您可以了解这两种方法。它还假设您获得了一个已序列化的模型,并且您对此了解不多。结构与吴恩达的课程密切相关,在这个话题上他肯定比我更有权威。由于您没有指定模型的训练方式,因此我假设了多对一的训练设置,但这很容易适应。
from __future__ import print_function
from keras.layers import Input, LSTM, Dense
from keras.models import Model, load_model
from keras.optimizers import Adam
import numpy as np
# globals
SEQ_LEN = 7
HID_DIMS = 32
OUTPUT_DIMS = 3 # outputs are assumed to be scalars
##############################################################################
# define the model to be trained on a fixed batch size:
# assume many-to-one training setup (otherwise set return_sequences=True)
TRAIN_BATCH_SIZE = 20
x_in = Input(batch_shape=[TRAIN_BATCH_SIZE, SEQ_LEN, 3])
lstm = LSTM(HID_DIMS, activation="tanh", return_sequences=False, stateful=True)
dense = Dense(OUTPUT_DIMS, activation='linear')
m_train = Model(inputs=x_in, outputs=dense(lstm(x_in)))
m_train.summary()
# a dummy batch of training data of shape (TRAIN_BATCH_SIZE, SEQ_LEN, 3), with targets of shape (TRAIN_BATCH_SIZE, 3):
batch123 = np.repeat([[1, 2, 3]], SEQ_LEN, axis=0).reshape(1, SEQ_LEN, 3).repeat(TRAIN_BATCH_SIZE, axis=0)
targets = np.repeat([[123,234,345]], TRAIN_BATCH_SIZE, axis=0) # dummy [[1,2,3],,,]-> [123,234,345] mapping to be learned
# train the model on a fixed batch size and save it
print(">> INFERECE BEFORE TRAINING MODEL:", m_train.predict(batch123, batch_size=TRAIN_BATCH_SIZE, verbose=0))
m_train.compile(optimizer=Adam(lr=0.5), loss='mean_squared_error', metrics=['mae'])
m_train.fit(batch123, targets, epochs=100, batch_size=TRAIN_BATCH_SIZE)
m_train.save("trained_lstm.h5")
print(">> INFERECE AFTER TRAINING MODEL:", m_train.predict(batch123, batch_size=TRAIN_BATCH_SIZE, verbose=0))
##############################################################################
# Now, although we aren't training anymore, we want to do step-wise predictions
# that do alter the inner state of the model, and keep track of that.
m_trained = load_model("trained_lstm.h5")
print(">> INFERECE AFTER RELOADING TRAINED MODEL:", m_trained.predict(batch123, batch_size=TRAIN_BATCH_SIZE, verbose=0))
# now define an analogous model that allows a flexible batch size for inference:
x_in = Input(shape=[SEQ_LEN, 3])
h_in = Input(shape=[HID_DIMS])
c_in = Input(shape=[HID_DIMS])
pred_lstm = LSTM(HID_DIMS, activation="tanh", return_sequences=False, return_state=True, name="lstm_infer")
h, cc, c = pred_lstm(x_in, initial_state=[h_in, c_in])
prediction = Dense(OUTPUT_DIMS, activation='linear', name="dense_infer")(h)
m_inference = Model(inputs=[x_in, h_in, c_in], outputs=[prediction, h,cc,c])
# Let's confirm that this model is able to load the trained parameters:
# first, check that the performance from scratch is not good:
print(">> INFERENCE BEFORE SWAPPING MODEL:")
predictions, hs, zs, cs = m_inference.predict([batch123,
np.zeros((TRAIN_BATCH_SIZE, HID_DIMS)),
np.zeros((TRAIN_BATCH_SIZE, HID_DIMS))],
batch_size=1)
print(predictions)
# import state from the trained model state and check that it works:
print(">> INFERENCE AFTER SWAPPING MODEL:")
for layer in m_trained.layers:
if "lstm" in layer.name:
m_inference.get_layer("lstm_infer").set_weights(layer.get_weights())
elif "dense" in layer.name:
m_inference.get_layer("dense_infer").set_weights(layer.get_weights())
predictions, _, _, _ = m_inference.predict([batch123,
np.zeros((TRAIN_BATCH_SIZE, HID_DIMS)),
np.zeros((TRAIN_BATCH_SIZE, HID_DIMS))],
batch_size=1)
print(predictions)
# finally perform granular predictions while keeping the recurrent activations. Starting the sequence with zeros is a common practice, but depending on how you trained, you might have an <END_OF_SEQUENCE> character that you might want to propagate instead:
h, c = np.zeros((TRAIN_BATCH_SIZE, HID_DIMS)), np.zeros((TRAIN_BATCH_SIZE, HID_DIMS))
for i in range(len(batch123)):
# about output shape: https://keras.io/layers/recurrent/#rnn
# h,z,c hold the network's throughput: h is the proper LSTM output, c is the accumulator and cc is (probably) the candidate
current_input = batch123[i:i+1] # the length of this feed is arbitrary, doesn't have to be 1
pred, h, cc, c = m_inference.predict([current_input, h, c])
print("input:", current_input)
print("output:", pred)
print(h.shape, cc.shape, c.shape)
raw_input("do something with your prediction and hidden state and press any key to continue")
附加信息:
因为我们有两种形式的状态持久性:
1. 模型的保存/训练参数对于每个序列都相同
2.a
、c
状态在整个序列中不断演变,可能会“重新启动”
看看 LSTM 对象的内脏是很有趣的。在我提供的 Python 示例中,a
和 c
权重被显式处理,但训练的参数不是,而且它们是如何在内部实现或它们的含义可能并不明显。可以按如下方式对其进行检查:
for w in lstm.weights:
print(w.name, w.shape)
在我们的例子中(32 个隐藏状态)返回以下内容:
lstm_1/kernel:0 (3, 128)
lstm_1/recurrent_kernel:0 (32, 128)
lstm_1/bias:0 (128,)
我们观察到的维度是 128。这是为什么呢? this link 描述 Keras LSTM 实现如下:
g 是循环激活,p 是激活,Ws 是核,Us 是循环核,h 是隐藏变量,也是输出,符号 * 是逐元素乘法。
这解释了128=32*4
是发生在 4 个门中的每一个门内的仿射变换的参数,串联:
(3, 128)
(名为kernel
)的矩阵处理给定序列元素的输入
形状矩阵(32, 128)
(命名为recurrent_kernel
)处理最后一个循环状态h
的输入。
形状矢量 (128,)
(命名为 bias
),与任何其他 NN 设置一样。
【讨论】:
谢谢,这个惊人的解释,我会试一试,并会更新。 如果我使用您将批次拆分为单个样本的建议,这是否意味着我需要在相同的输入上重新训练我的模型并在 (m*1*3) 形状上进行训练,就像这样?[[1,2,3],[1,2,3]...7 samples]
& 以相应的批次标签为目标,像这样吗? [1,1,1,1,1,1,1,0,0,0,0,0,0,0]
其中每 7 个标签对于每个批次都是相同的。
如果保持隐藏状态,执行m
一元素前向传播相当于执行一次m
-元素前向传播。但是,如果您想要 batch_size>1
的(加速和正则化)优势,您 确实 必须等到您获得所有结果然后对它们进行平均(如果我正确理解您的问题,您不想避免这样做)。为此,您可以有两种设置,一种用于训练批处理,一种用于生产循环,或者您可以将concatenator 添加到for
方法以人为地建立您的批处理
换句话说,如果你用批次训练它并且进展顺利,你仍然可以使用相同的模型来执行这样的一一预测:只要确保 c
隐藏状态保持不变,h
-state 适应当前情况(取决于你的训练方式,你可能有一个“初始状态”来开始一个新的序列,或者一个你可以推动的“序列结束”字符反复重置h
-state)
最好的方法是使用Model,就像我的例子一样,那么你就有很多内置的序列化支持。一旦你有了 numpy、json 或任何标准格式,后端几乎是一个选择问题(HDF5 正是文件系统和 kvdb 之间的折衷)【参考方案2】:
注意:此答案假设您的模型在训练阶段不是有状态的。您必须了解什么是有状态 RNN 层,并确保训练数据具有相应的有状态属性。简而言之,这意味着序列之间存在依赖关系,即一个序列是您要在模型中考虑的另一个序列的后续。如果您的模型和训练数据是有状态的,那么我认为其他涉及从一开始就为 RNN 层设置 stateful=True
的答案更简单。
更新:无论训练模型是否有状态,您始终可以将其权重复制到推理模型并启用有状态。所以我认为基于设置stateful=True
的解决方案比我的更短更好。它们唯一的缺点是这些解决方案中的批量大小必须固定。
请注意,LSTM 层在单个序列上的输出由其固定的权重矩阵及其内部状态(取决于先前处理的时间步长)确定。现在要获得长度为m
的单个序列的 LSTM 层的输出,一种明显的方法是将整个序列一次性馈送到 LSTM 层。然而,正如我之前所说,由于它的内部状态取决于前一个时间步长,我们可以利用这一事实并通过在处理块结束时获取 LSTM 层的状态并将其传递给 LSTM 来逐块提供单个序列块用于处理下一个块的层。为了更清楚,假设序列长度为 7(即它有 7 个时间步长的固定长度特征向量)。例如,可以像这样处理这个序列:
-
将时间步长 1 和 2 馈送到 LSTM 层;获取最终状态(称为
C1
)。
将时间步长 3、4 和 5 和状态 C1
作为初始状态提供给 LSTM 层;获取最终状态(称为C2
)。
将时间步长 6 和 7 并将 C2
作为初始状态提供给 LSTM 层;得到最终输出。
如果我们一次输入整个 7 个时间步长,则最终输出等同于 LSTM 层产生的输出。
所以要在 Keras 中实现这一点,可以将 LSTM 层的return_state
参数设置为True
,这样就可以得到中间状态。此外,在定义输入层时不要指定固定的时间步长。相反,使用None
能够为模型提供任意长度的序列,这使我们能够逐步处理每个序列(如果您在训练时的输入数据是固定长度的序列,那很好)。
由于您在推理时需要这种卡盘处理能力,我们需要定义一个新模型,该模型共享训练模型中使用的 LSTM 层,可以获取初始状态作为输入,也可以将结果状态作为输出。下面是它可以做的大概草图(注意,LSTM层的返回状态在训练模型时没有使用,我们只在测试时需要它):
# define training model
train_input = Input(shape=(None, n_feats)) # note that the number of timesteps is None
lstm_layer = LSTM(n_units, return_state=True)
lstm_output, _, _ = lstm_layer(train_input) # note that we ignore the returned states
classifier = Dense(1, activation='sigmoid')
train_output = classifier(lstm_output)
train_model = Model(train_input, train_output)
# compile and fit the model on training data ...
# ==================================================
# define inference model
inf_input = Input(shape=(None, n_feats))
state_h_input = Input(shape=(n_units,))
state_c_input = Input(shape=(n_units,))
# we use the layers of previous model
lstm_output, state_h, state_c = lstm_layer(inf_input,
initial_state=[state_h_input, state_c_input])
output = classifier(lstm_output)
inf_model = Model([inf_input, state_h_input, state_c_input],
[output, state_h, state_c]) # note that we return the states as output
现在,您可以向inf_model
提供尽可能多的序列时间步长。但是,请注意,最初您必须为状态提供全零向量(这是状态的默认初始值)。例如,如果序列长度为 7,那么当有新数据流可用时会发生什么情况的示意图如下:
state_h = np.zeros((1, n_units,))
state_c = np.zeros((1, n_units))
# three new timesteps are available
outputs = inf_model.predict([timesteps, state_h, state_c])
out = output[0,0] # you may ignore this output since the entire sequence has not been processed yet
state_h = outputs[0,1]
state_c = outputs[0,2]
# after some time another four new timesteps are available
outputs = inf_model.predict([timesteps, state_h, state_c])
# we have processed 7 timesteps, so the output is valid
out = output[0,0] # store it, pass it to another thread or do whatever you want to do with it
# reinitialize the state to make them ready for the next sequence chunk
state_h = np.zeros((1, n_units))
state_c = np.zeros((1, n_units))
# to be continued...
当然,您需要在某种循环中执行此操作或实现控制流结构来处理数据流,但我认为您应该明白大致的想法。
最后,虽然您的具体示例不是序列到序列模型,但我强烈建议阅读official Keras seq2seq tutorial,我认为可以从中学到很多想法。
【讨论】:
感谢您的回复,这很有教育意义 你能看看这里吗***.com/questions/53376761/… ?: :)【参考方案3】:我认为可能有更简单的解决方案。
如果您的模型没有卷积层或任何其他作用于长度/步数维度的层,您可以简单地将其标记为 stateful=True
警告:您的模型具有作用于长度维度的层!!
Flatten
层将长度维度转换为特征维度。这将完全阻止您实现目标。如果Flatten
层需要 7 个步骤,那么您将始终需要 7 个步骤。
因此,在应用我下面的答案之前,请修复您的模型以不使用 Flatten
层。相反,它可以只删除 last LSTM 层的return_sequences=True
。
下面的代码解决了这个问题,并准备了一些与下面的答案一起使用的东西:
def createModel(forTraining):
#model for training, stateful=False, any batch size
if forTraining == True:
batchSize = None
stateful = False
#model for predicting, stateful=True, fixed batch size
else:
batchSize = 1
stateful = True
model = Sequential()
first_lstm = LSTM(32,
batch_input_shape=(batchSize, num_samples, num_features),
return_sequences=True, activation='tanh',
stateful=stateful)
model.add(first_lstm)
model.add(LeakyReLU())
model.add(Dropout(0.2))
#this is the last LSTM layer, use return_sequences=False
model.add(LSTM(16, return_sequences=False, stateful=stateful, activation='tanh'))
model.add(Dropout(0.2))
model.add(LeakyReLU())
#don't add a Flatten!!!
#model.add(Flatten())
model.add(Dense(1, activation='sigmoid'))
if forTraining == True:
compileThisModel(model)
有了这个,你将能够用 7 个步骤训练并用一个步骤进行预测。否则将不可能。
使用有状态模型解决您的问题
首先,再次训练这个新模型,因为它没有 Flatten 层:
trainingModel = createModel(forTraining=True)
trainThisModel(trainingModel)
现在,使用这个经过训练的模型,您可以像创建经过训练的模型一样简单地创建一个新模型,但在其所有 LSTM 层中标记 stateful=True
。我们应该从训练好的模型中复制权重。
由于这些新层需要一个固定的批量大小(Keras 的规则),我假设它会是 1(一个流,而不是 m 个流)并将其添加到上面的模型创建中。
predictingModel = createModel(forTraining=False)
predictingModel.set_weights(trainingModel.get_weights())
然后瞧。只需一步预测模型的输出:
pseudo for loop as samples arrive to your model:
prob = predictingModel.predict_on_batch(sample)
#where sample.shape == (1, 1, 3)
当您决定到达您认为的连续序列的末尾时,请致电 predictingModel.reset_states()
,这样您就可以安全地开始一个新序列,而模型不会认为它应该在前一个序列的末尾进行修正。
保存和加载状态
只需获取并设置它们,使用 h5py 保存:
def saveStates(model, saveName):
f = h5py.File(saveName,'w')
for l, lay in enumerate(model.layers):
#if you have nested models,
#consider making this recurrent testing for layers in layers
if isinstance(lay,RNN):
for s, stat in enumerate(lay.states):
f.create_dataset('states_' + str(l) + '_' + str(s),
data=K.eval(stat),
dtype=K.dtype(stat))
f.close()
def loadStates(model, saveName):
f = h5py.File(saveName, 'r')
allStates = list(f.keys())
for stateKey in allStates:
name, layer, state = stateKey.split('_')
layer = int(layer)
state = int(state)
K.set_value(model.layers[layer].states[state], f.get(stateKey))
f.close()
保存/加载状态的工作测试
import h5py, numpy as np
from keras.layers import RNN, LSTM, Dense, Input
from keras.models import Model
import keras.backend as K
def createModel():
inp = Input(batch_shape=(1,None,3))
out = LSTM(5,return_sequences=True, stateful=True)(inp)
out = LSTM(2, stateful=True)(out)
out = Dense(1)(out)
model = Model(inp,out)
return model
def saveStates(model, saveName):
f = h5py.File(saveName,'w')
for l, lay in enumerate(model.layers):
#if you have nested models, consider making this recurrent testing for layers in layers
if isinstance(lay,RNN):
for s, stat in enumerate(lay.states):
f.create_dataset('states_' + str(l) + '_' + str(s), data=K.eval(stat), dtype=K.dtype(stat))
f.close()
def loadStates(model, saveName):
f = h5py.File(saveName, 'r')
allStates = list(f.keys())
for stateKey in allStates:
name, layer, state = stateKey.split('_')
layer = int(layer)
state = int(state)
K.set_value(model.layers[layer].states[state], f.get(stateKey))
f.close()
def printStates(model):
for l in model.layers:
#if you have nested models, consider making this recurrent testing for layers in layers
if isinstance(l,RNN):
for s in l.states:
print(K.eval(s))
model1 = createModel()
model2 = createModel()
model1.predict_on_batch(np.ones((1,5,3))) #changes model 1 states
print('model1')
printStates(model1)
print('model2')
printStates(model2)
saveStates(model1,'testStates5')
loadStates(model2,'testStates5')
print('model1')
printStates(model1)
print('model2')
printStates(model2)
数据方面的考虑
在您的第一个模型中(如果是stateful=False
),它认为m
中的每个序列都是独立的,并且不与其他序列相连。它还认为每个批次都包含唯一的序列。
如果不是这种情况,您可能希望改为训练有状态模型(考虑到每个序列实际上都连接到前一个序列)。然后你需要m
1 个序列的批次。 -> m x (1, 7 or None, 3)
.
【讨论】:
我认为reset_states()
会弄乱训练的参数,但我可以确认它不会,所以这与predict_on_batch
结合使用(它克服了必须为@987654341 指定batch_size 的问题@) 使它成为一个非常紧凑和优雅的解决方案,我一定会自己使用!较少冗长的缺点是它没有满足saving the LSTM intermediate state in a persistent way
的要求
@fr_andres ,现在保存和加载状态。
@DanielMöller 我对这里提到的有状态解决方案有疑问。如果您能告诉我我错在哪里,我将不胜感激:据我所知,有状态 LSTM 层意味着一批中的一个样本是上一批中相应样本的后继(来自keras docs:如果x1
和x2
是连续批次的样本,那么x2[i]
是x1[i]
的后续序列,对于每个i
)。这就是您正确提到批量大小必须相同的原因。但是,OP 没有提到 >>>>
@DanielMöller >>> 状态假设适用于训练数据和训练阶段。所以从一开始就设置stateful=True
(即在训练模型中)可能会完全破坏学习过程。如果您没有在训练模型中设置stateful=True
参数,则无法在其副本模型中对其进行修改以进行推理阶段。我错过了什么?
@DanielMöller 我认为您可以像在答案中那样在推理模型中设置权重,并仅在其中启用状态性。我得到了我的答案。谢谢。【参考方案4】:
据我所知,由于 Tensorflow 中的静态图,没有有效的方法来提供与训练输入长度不同长度的输入。
填充是解决此问题的官方方法,但效率较低且占用内存。我建议您查看 Pytorch,这对于解决您的问题将是微不足道的。
有很多great posts用Pytorch搭建lstm,看到动态图的好处就明白了。
【讨论】:
以上是关于有状态 LSTM 和流预测的主要内容,如果未能解决你的问题,请参考以下文章