Keras:将MDN层添加到LSTM网络

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Keras:将MDN层添加到LSTM网络相关的知识,希望对你有一定的参考价值。

我的问题简要说明:根据舞蹈序列训练数据,下面详述的长短期记忆网络是否经过适当设计以产生新的舞蹈序列?

背景:我正在与一位希望使用神经网络产生新舞蹈序列的舞者合作。她向我发送了2016 chor-rnn paper,它使用最后的混合密度网络层的LSTM网络完成了这项任务。然而,在我的LSTM网络中添加MDN层之后,我的损失变为负值,结果看起来很混乱。这可能是由于非常小的训练数据,但我想在扩大训练数据大小之前验证模型基础。如果有人可以建议下面的模型是否忽略了一些基本的东西(很有可能),我会非常感谢他们的反馈。

我输入网络的样本数据(下面的X)具有形状(626,55,3),其对应于55个身体位置的626个时间快照,每个具有3个坐标(x,y,然后是z)。所以X1 [11] [2]是时间1的第11个身体部位的z位置:

import requests
import numpy as np

# download the data
requests.get('https://s3.amazonaws.com/duhaime/blog/dancing-with-robots/dance.npy')

# X.shape = time_intervals, n_body_parts, 3
X = np.load('dance.npy')

为了确保正确提取数据,我想象了X的前几帧:

import mpl_toolkits.mplot3d.axes3d as p3
import matplotlib.pyplot as plt
from IPython.display import html
from matplotlib import animation
import matplotlib

matplotlib.rcParams['animation.embed_limit'] = 2**128

def update_points(time, points, X):
  arr = np.array([[ X[time][i][0], X[time][i][1] ] for i in range(int(X.shape[1]))])
  points.set_offsets(arr) # set x, y values
  points.set_3d_properties(X[time][:,2][:], zdir='z') # set z value

def get_plot(X, lim=2, frames=200, duration=45):
  fig = plt.figure()
  ax = p3.Axes3D(fig)
  ax.set_xlim(-lim, lim)
  ax.set_ylim(-lim, lim)
  ax.set_zlim(-lim, lim)
  points = ax.scatter(X[0][:,0][:], X[0][:,1][:], X[0][:,2][:], depthshade=False) # x,y,z vals
  return animation.FuncAnimation(fig,
    update_points,
    frames,
    interval=duration,
    fargs=(points, X),
    blit=False  
  ).to_jshtml()

HTML(get_plot(X, frames=int(X.shape[0])))

这会产生一个像这样的小舞蹈序列:

enter image description here

到现在为止还挺好。接下来,我将x,y和z维度的特征居中:

X -= np.amin(X, axis=(0, 1))
X /= np.amax(X, axis=(0, 1))

使用X可视化生成的HTML(get_plot(X, frames=int(X.shape[0])))显示这些线条使数据居中。接下来,我使用Keras中的Sequential API构建模型本身:

from keras.models import Sequential, Model
from keras.layers import Dense, LSTM, Dropout, Activation
from keras.layers.advanced_activations import LeakyReLU
from keras.losses import mean_squared_error
from keras.optimizers import Adam
import keras, os

# config
look_back = 32 # number of previous time frames to use to predict the positions at time i
lstm_cells = 256 # number of cells in each LSTM "layer"
n_features = int(X.shape[1]) * int(X.shape[2]) # number of coordinate values to be predicted by each of `m` models
input_shape = (look_back, n_features) # shape of inputs
m = 32 # number of gaussian models to build

# set boolean controlling whether we use MDN or not
use_mdn = True

model = Sequential()
model.add(LSTM(lstm_cells, return_sequences=True, input_shape=input_shape))
model.add(LSTM(lstm_cells, return_sequences=True))
model.add(LSTM(lstm_cells))

if use_mdn:
  model.add(MDN(n_features, m))
  model.compile(loss=get_mixture_loss_func(n_features, m), optimizer=Adam(lr=0.000001))
else:
  model.add(Dense(n_features, activation='tanh'))
  model.compile(loss=mean_squared_error, optimizer='sgd')

model.summary()

建立模型后,我将数据安排在X中以备培训。在这里,我们想要通过检查先前look_back时间片上每个身体部位的位置来预测55个身体部位的x,y,z位置:

# get training data in right shape
train_x = []
train_y = []

n_time, n_obs, n_attrs = [int(i) for i in X.shape]

for i in range(look_back, n_time-1, 1):
  train_x.append( X[i-look_back:i].reshape(look_back, n_obs * n_attrs) )
  train_y.append( X[i+1].ravel() )

train_x = np.array(train_x)
train_y = np.array(train_y)

最后我训练模型:

from livelossplot import PlotLossesKeras

# fit the model
model.fit(train_x, train_y, epochs=1024, batch_size=1, callbacks=[PlotLossesKeras()])

训练后,我可视化模型创建的新时间片:

# generate `n_frames` of new output time slices
n_frames = 3000

# seed the data to plot with the first `look_back` animation frames
data = X[0:look_back]

x0, x1, x2 = [int(i) for i in train_x.shape]
d0, d1, d2 = [int(i) for i in data.shape]

for i in range(look_back, n_frames, 1):
  # get the model's prediction for the next position of points at time `i`
  result = model.predict(train_x[i].reshape(1, x1, x2))
  # if using the mixed density network, pull out vals that describe vertex positions
  if use_mdn:
    result = np.apply_along_axis(sample_from_output, 1, result, n_features, m, temp=1.0)
  # reshape the result into the form of rows in `X`
  result = result.reshape(1, d1, d2)
  # push the result into the shape of `train_x` observations
  stacked = np.vstack((data[i-look_back+1:i], result)).reshape(1, x1, x2)
  # add the result to the `train_x` observations
  train_x = np.vstack((train_x, stacked))
  # add the result to the dataset for plotting
  data = np.vstack((data[:i], result))

如果我将use_mdn设置为上面的False而是使用简单的平方误差损失(L2 Loss),那么得到的可视化似乎有点令人毛骨悚然,但仍然具有普遍的人类形状。

但是,如果我将use_mdn设置为True,并使用自定义MDN损失函数,则结果非常奇怪。我认识到MDN层添加了大量需要训练的参数,并且可能需要数量级更多的训练数据来实现与L2损失函数输出一样的人形输出。

也就是说,我想问一下,与神经网络模型合作的其他人是否比我自己更广泛地看到了上述方法的任何根本错误。对这个问题的任何见解都会非常有帮助。

答案

天哪,我得到了它[gist]!这是MDN类:

from keras.layers.advanced_activations import LeakyReLU
from keras.models import Sequential, Model
from keras.layers import Dense, Input, merge, concatenate, Dense, LSTM, CuDNNLSTM
from keras.engine.topology import Layer
from keras import backend as K
import tensorflow_probability as tfp
import tensorflow as tf

# check tfp version, as tfp causes cryptic error if out of date
assert float(tfp.__version__.split('.')[1]) >= 5

class MDN(Layer):
  '''Mixture Density Network with unigaussian kernel'''
  def __init__(self, n_mixes, output_dim, **kwargs):
    self.n_mixes = n_mixes
    self.output_dim = output_dim

    with tf.name_scope('MDN'):
      self.mdn_mus    = Dense(self.n_mixes * self.output_dim, name='mdn_mus')
      self.mdn_sigmas = Dense(self.n_mixes, activation=K.exp, name='mdn_sigmas')
      self.mdn_alphas = Dense(self.n_mixes, activation=K.softmax, name='mdn_alphas')
    super(MDN, self).__init__(**kwargs)

  def build(self, input_shape):
    self.mdn_mus.build(input_shape)
    self.mdn_sigmas.build(input_shape)
    self.mdn_alphas.build(input_shape)
    self.trainable_weights = self.mdn_mus.trainable_weights + \
      self.mdn_sigmas.trainable_weights + \
      self.mdn_alphas.trainable_weights
    self.non_trainable_weights = self.mdn_mus.non_trainable_weights + \
      self.mdn_sigmas.non_trainable_weights + \
      self.mdn_alphas.non_trainable_weights
    self.built = True

  def call(self, x, mask=None):
    with tf.name_scope('MDN'):
      mdn_out = concatenate([
        self.mdn_mus(x),
        self.mdn_sigmas(x),
        self.mdn_alphas(x)
      ], name='mdn_outputs')
    return mdn_out

  def get_output_shape_for(self, input_shape):
    return (input_shape[0], self.output_dim)

  def get_config(self):
    config = 
      'output_dim': self.output_dim,
      'n_mixes': self.n_mixes,
    
    base_config = super(MDN, self).get_config()
    return dict(list(base_config.items()) + list(config.items()))

  def get_loss_func(self):
    def unigaussian_loss(y_true, y_pred):
      mix = tf.range(start = 0, limit = self.n_mixes)
      out_mu, out_sigma, out_alphas = tf.split(y_pred, num_or_size_splits=[
        self.n_mixes * self.output_dim,
        self.n_mixes,
        self.n_mixes,
      ], axis=-1, name='mdn_coef_split')

      def loss_i(i):
        batch_size = tf.shape(out_sigma)[0]
        sigma_i = tf.slice(out_sigma, [0, i], [batch_size, 1], name='mdn_sigma_slice')
        alpha_i = tf.slice(out_alphas, [0, i], [batch_size, 1], name='mdn_alpha_slice')
        mu_i = tf.slice(out_mu, [0, i * self.output_dim], [batch_size, self.output_dim], name='mdn_mu_slice')
        dist = tfp.distributions.Normal(loc=mu_i, scale=sigma_i)
        loss = dist.prob(y_true) # find the pdf around each value in y_true
        loss = alpha_i * loss
        return loss

      result = tf.map_fn(lambda  m: loss_i(m), mix, dtype=tf.float32, name='mix_map_fn')
      result = tf.reduce_sum(result, axis=0, keepdims=False)
      result = -tf.log(result)
      result = tf.reduce_mean(result)
      return result

    with tf.name_scope('MDNLayer'):
      return unigaussian_loss

而LSTM课程:

class LSTM_MDN:
  def __init__(self, n_verts=15, n_dims=3, n_mixes=2, look_back=1, cells=[32,32,32,32], use_mdn=True):
    self.n_verts = n_verts
    self.n_dims = n_dims
    self.n_mixes = n_mixes
    self.look_back = look_back
    self.cells = cells
    self.use_mdn = use_mdn
    self.LSTM = CuDNNLSTM if len(gpus) > 0 else LSTM
    self.model = self.build_model()
    if use_mdn:
      self.model.compile(loss=MDN(n_mixes, n_verts*n_dims).get_loss_func(), optimizer='adam', metrics=['accuracy'])
    else:
      self.model.compile(loss='mse', optimizer='adam', metrics=['accuracy'])

  def build_model(self):
    i = Input((self.look_back, self.n_verts*self.n_dims))
    h = self.LSTM(self.cells[0], return_sequences=True)(i) # return sequences, stateful
    h = self.LSTM(self.cells[1], return_sequences=True)(h)
    h = self.LSTM(self.cells[2])(h)
    h = Dense(self.cells[3])(h)
    if self.use_mdn:
      o = MDN(self.n_mixes, self.n_verts*self.n_dims)(h)
    else:
      o = Dense(self.n_verts*self.n_dims)(h)
    return Model(inputs=[i], outputs=[o])

  def prepare_inputs(self, X, look_back=2):
    '''
    Prepare inputs in shape expected by LSTM
    @returns:
      numpy.ndarray train_X: has shape: n_samples, lookback, verts * dims
      numpy.ndarray train_Y: has shape: n_samples, verts * dims
    '''
    # prepare data for the LSTM_MDN
    X = X.swapaxes(0, 1) # reshape to time, vert, dim
    n_time, n_verts, n_dims = X.shape

    # validate shape attributes
    if n_verts != self.n_verts: raise Exception(' ! got', n_verts, 'vertices, expected', self.n_verts)
    if n_dims != self.n_dims: raise Exception(' ! got', n_dims, 'dims, expected', self.n_dims)
    if look_back != self.look_back: raise Exception(' ! got', look_back, 'for look_back, expected', self.look_back)

    # lstm expects data in shape [samples_in_batch, timestamps, values]
    train_X = []
    train_Y = []
    for i in range(look_back, n_time, 1):
      train_X.append( X[i-look_back:i,:,:].reshape(look_back, n_verts * n_dims) ) # look_back, verts * dims
      train_Y.append( X[i,:,:].reshape(n_verts * n_dims) ) # verts * dims
    train_X = np.array(train_X) # n_samples, lookback, verts * dims
    train_Y = np.array(train_Y) # n_samples, verts * dims
    return [train_X, train_Y]

  def predict_positions(self, input_X):
    '''
    Predict the output for a series of input frames. Each prediction has shape (1, y), where y contains:
      mus = y[:n_mixes*n_verts*n_dims]
      sigs = y[n_mixes*n_verts*n_dims:-n_mixes]
      alphas = softmax(y[-n_mixes:])
    @param numpy.ndarray input_X: has shape: n_samples, look_back, n_verts * n_dims
    @returns:
      numpy.ndarray X: has shape: verts, time, dims
    '''
    predictions = []
    for i in range(input_X.shape[0]):
      y = self.model.predict( train_X[i:i+1] ).squeeze()
      mus = y[:n_mixes*n_verts*n_dims]
      sigs = y[n_mixes*n_verts*n_dims:-n_mixes]
      alphas = self.softmax(y[-n_mixes:])

      # find the most likely distribution then pull out the mus that correspond to that selected index
      alpha_idx = np.argmax(alphas) # 0
      alpha_idx = 0
      predictions.append( mus[alpha_idx*self.n_verts*self.n_dims:(alpha_idx+1)*self.n_verts*self.n_dims] )
    predictions = np.array(predictions).reshape(train_X.shape[0], self.n_verts, self.n_dims).swapaxes(0, 1)
    return predictions # shape = n_verts, n_time, n_dims

  def softmax(self, x):
    ''''Compute softmax values for vector `x`'''
    r = np.exp(x - np.max(x))
    return r / r.sum()

然后设置课程:

X = data.selected.X
n_verts, n_time, n_dims = X.shape
n_mixes = 3
look_back = 2

lstm_mdn = LSTM_MDN(n_verts=n_verts, n_dims=n_dims, n_mixes=n_mixes, look_back=look_back)
train_X, train_Y = lstm_mdn.prepare_inputs(X, look_back=look_back)

上面链接的要点有完整的血腥细节,如果有人想要重现这一点并将其分开以更好地理解机制......

以上是关于Keras:将MDN层添加到LSTM网络的主要内容,如果未能解决你的问题,请参考以下文章

python tensorflow 2.0 不使用 Keras 搭建简单的 LSTM 网络

将 Pytorch LSTM 的状态参数转换为 Keras LSTM

keras中LSTM的units是啥意思

如何在 Keras 的两个 LSTM 层之间添加注意力层

在 LSTM 网络的输入上使用 Masking 时,Keras(TensorFlow 后端)多 GPU 模型(4gpus)失败

如何将 keras LSTM 层的输出输入到输入层?