如何让 Keras LSTM 在多变量设置中对多个时间序列进行预测?

Posted

技术标签:

【中文标题】如何让 Keras LSTM 在多变量设置中对多个时间序列进行预测?【英文标题】:How to have Keras LSTM make predictions for multiple time-series in a multivariate setting? 【发布时间】:2020-06-29 03:21:48 【问题描述】:

我正在尝试使用 Keras 对多个变量进行同时预测。使用this example here,我想预测所有特征的值,包括 pm 2.5、DEWP、TEMP 等,而不仅仅是污染(pm 2.5)。本质上,这是在给定所有变量的情况下,建立一个模型来将所有变量预测为时间序列,而不仅仅是预测一个变量。

我使用重构后的 3D 数据修改了原始示例代码,但出现错误。代码如下:

from sklearn.preprocessing import LabelEncoder, MinMaxScaler

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense


from math import sqrt
from numpy import concatenate
from matplotlib import pyplot
from pandas import read_csv
from pandas import DataFrame
from pandas import concat


from pandas import read_csv, DataFrame, concat
from datetime import datetime
# load data
def parse(x):
    return datetime.strptime(x, '%Y %m %d %H')
dataset = read_csv('raw.csv',  parse_dates = [['year', 'month', 'day', 'hour']], index_col=0, date_parser=parse)
dataset.drop('No', axis=1, inplace=True)
# manually specify column names
dataset.columns = ['pollution', 'dew', 'temp', 'press', 'wnd_dir', 'wnd_spd', 'snow', 'rain']
dataset.index.name = 'date'
# mark all NA values with 0
dataset['pollution'].fillna(0, inplace=True)
# drop the first 24 hours
dataset = dataset[24:]
# summarize first 5 rows
print(dataset.head(5))
# save to file
dataset.to_csv('pollution.csv')


# convert series to supervised learning
def series_to_supervised(data, n_in=1, n_out=1, dropnan=True):
    n_vars = 1 if type(data) is list else data.shape[1]
    df = DataFrame(data)
    cols, names = list(), list()
    # input sequence (t-n, ... t-1)
    for i in range(n_in, 0, -1):
        cols.append(df.shift(i))
        names += [('var%d(t-%d)' % (j+1, i)) for j in range(n_vars)]
    # forecast sequence (t, t+1, ... t+n)
    for i in range(0, n_out):
        cols.append(df.shift(-i))
        if i == 0:
            names += [('var%d(t)' % (j+1)) for j in range(n_vars)]
        else:
            names += [('var%d(t+%d)' % (j+1, i)) for j in range(n_vars)]
    # put it all together
    agg = concat(cols, axis=1)
    agg.columns = names
    # drop rows with NaN values
    if dropnan:
        agg.dropna(inplace=True)
    return agg


# load dataset
dataset = read_csv('pollution.csv', header=0, index_col=0)
values = dataset.values
# integer encode direction
encoder = LabelEncoder()
values[:,4] = encoder.fit_transform(values[:,4])
# ensure all data is float
values = values.astype('float32')
# normalize features
scaler = MinMaxScaler(feature_range=(0, 1))
scaled = scaler.fit_transform(values)
# frame as supervised learning
reframed = series_to_supervised(scaled, 1, 1)


# split into train and test sets
values = reframed.values
n_train_hours = 365 * 24
train = values[:n_train_hours, :]
test = values[n_train_hours:, :]
# split into input and outputs
train_X, train_y = train[:, :-8], train[:, -8:]
test_X, test_y = test[:, :-8], test[:, -8:]
# reshape input to be 3D [samples, timesteps, features]
train_X_3d = train_X.reshape((train_X.shape[0], 1, train_X.shape[1]))
test_X_3d = test_X.reshape((test_X.shape[0], 1, test_X.shape[1]))
train_y_3d = train_y.reshape((train_y.shape[0], 1, train_y.shape[1]))
test_y_3d = test_y.reshape((test_y.shape[0], 1, test_y.shape[1]))
print(train_X.shape, train_y.shape, test_X.shape, test_y.shape)
print(train_X_3d.shape, train_y_3d.shape, test_X_3d.shape, test_y_3d.shape)

# design network
model = Sequential()
model.add(LSTM(50, input_shape=(train_X_3d.shape[1], train_X_3d.shape[2])))
#model.add(Dense(1))
model.compile(loss='mae', optimizer='adam')
# fit network
history = model.fit(train_X_3d, train_y_3d, 
                    epochs=50, batch_size=72, 
                    validation_data=(test_X_3d, test_y_3d), verbose=2, shuffle=False)
# plot history
pyplot.plot(history.history['loss'], label='train')
pyplot.plot(history.history['val_loss'], label='test')
pyplot.legend()
pyplot.show()

# make a prediction
yhat = model.predict(test_X_3d)
yhat

输出是:

                     pollution  dew  temp   press wnd_dir  wnd_spd  snow  rain
date                                                                          
2010-01-02 00:00:00      129.0  -16  -4.0  1020.0      SE     1.79     0     0
2010-01-02 01:00:00      148.0  -15  -4.0  1020.0      SE     2.68     0     0
2010-01-02 02:00:00      159.0  -11  -5.0  1021.0      SE     3.57     0     0
2010-01-02 03:00:00      181.0   -7  -5.0  1022.0      SE     5.36     1     0
2010-01-02 04:00:00      138.0   -7  -5.0  1022.0      SE     6.25     2     0
(8760, 8) (8760, 8) (35039, 8) (35039, 8)
(8760, 1, 8) (8760, 1, 8) (35039, 1, 8) (35039, 1, 8)
Train on 8760 samples, validate on 35039 samples
Epoch 1/50
---------------------------------------------------------------------------
InvalidArgumentError                      Traceback (most recent call last)
~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/framework/ops.py in _create_c_op(graph, node_def, inputs, control_inputs)
   1609   try:
-> 1610     c_op = c_api.TF_FinishOperation(op_desc)
   1611   except errors.InvalidArgumentError as e:

InvalidArgumentError: Dimensions must be equal, but are 50 and 8 for 'loss/lstm_loss/sub' (op: 'Sub') with input shapes: [?,50], [?,1,8].

During handling of the above exception, another exception occurred:

ValueError                                Traceback (most recent call last)
<ipython-input-1-00c079ab5854> in <module>
     97 history = model.fit(train_X_3d, train_y_3d, 
     98                     epochs=50, batch_size=72,
---> 99                     validation_data=(test_X_3d, test_y_3d), verbose=2, shuffle=False)
    100 # plot history
    101 pyplot.plot(history.history['loss'], label='train')

~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training.py in fit(self, x, y, batch_size, epochs, verbose, callbacks, validation_split, validation_data, shuffle, class_weight, sample_weight, initial_epoch, steps_per_epoch, validation_steps, validation_freq, max_queue_size, workers, use_multiprocessing, **kwargs)
    726         max_queue_size=max_queue_size,
    727         workers=workers,
--> 728         use_multiprocessing=use_multiprocessing)
    729 
    730   def evaluate(self,

~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2.py in fit(self, model, x, y, batch_size, epochs, verbose, callbacks, validation_split, validation_data, shuffle, class_weight, sample_weight, initial_epoch, steps_per_epoch, validation_steps, validation_freq, **kwargs)
    322                 mode=ModeKeys.TRAIN,
    323                 training_context=training_context,
--> 324                 total_epochs=epochs)
    325             cbks.make_logs(model, epoch_logs, training_result, ModeKeys.TRAIN)
    326 

~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2.py in run_one_epoch(model, iterator, execution_function, dataset_size, batch_size, strategy, steps_per_epoch, num_samples, mode, training_context, total_epochs)
    121         step=step, mode=mode, size=current_batch_size) as batch_logs:
    122       try:
--> 123         batch_outs = execution_function(iterator)
    124       except (StopIteration, errors.OutOfRangeError):
    125         # TODO(kaftan): File bug about tf function and errors.OutOfRangeError?

~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2_utils.py in execution_function(input_fn)
     84     # `numpy` translates Tensors to values in Eager mode.
     85     return nest.map_structure(_non_none_constant_value,
---> 86                               distributed_function(input_fn))
     87 
     88   return execution_function

~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py in __call__(self, *args, **kwds)
    455 
    456     tracing_count = self._get_tracing_count()
--> 457     result = self._call(*args, **kwds)
    458     if tracing_count == self._get_tracing_count():
    459       self._call_counter.called_without_tracing()

~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py in _call(self, *args, **kwds)
    501       # This is the first call of __call__, so we have to initialize.
    502       initializer_map = object_identity.ObjectIdentityDictionary()
--> 503       self._initialize(args, kwds, add_initializers_to=initializer_map)
    504     finally:
    505       # At this point we know that the initialization is complete (or less

~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py in _initialize(self, args, kwds, add_initializers_to)
    406     self._concrete_stateful_fn = (
    407         self._stateful_fn._get_concrete_function_internal_garbage_collected(  # pylint: disable=protected-access
--> 408             *args, **kwds))
    409 
    410     def invalid_creator_scope(*unused_args, **unused_kwds):

~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py in _get_concrete_function_internal_garbage_collected(self, *args, **kwargs)
   1846     if self.input_signature:
   1847       args, kwargs = None, None
-> 1848     graph_function, _, _ = self._maybe_define_function(args, kwargs)
   1849     return graph_function
   1850 

~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py in _maybe_define_function(self, args, kwargs)
   2148         graph_function = self._function_cache.primary.get(cache_key, None)
   2149         if graph_function is None:
-> 2150           graph_function = self._create_graph_function(args, kwargs)
   2151           self._function_cache.primary[cache_key] = graph_function
   2152         return graph_function, args, kwargs

~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py in _create_graph_function(self, args, kwargs, override_flat_arg_shapes)
   2039             arg_names=arg_names,
   2040             override_flat_arg_shapes=override_flat_arg_shapes,
-> 2041             capture_by_value=self._capture_by_value),
   2042         self._function_attributes,
   2043         # Tell the ConcreteFunction to clean up its graph once it goes out of

~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/framework/func_graph.py in func_graph_from_py_func(name, python_func, args, kwargs, signature, func_graph, autograph, autograph_options, add_control_dependencies, arg_names, op_return_value, collections, capture_by_value, override_flat_arg_shapes)
    913                                           converted_func)
    914 
--> 915       func_outputs = python_func(*func_args, **func_kwargs)
    916 
    917       # invariant: `func_outputs` contains only Tensors, CompositeTensors,

~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py in wrapped_fn(*args, **kwds)
    356         # __wrapped__ allows AutoGraph to swap in a converted function. We give
    357         # the function a weak reference to itself to avoid a reference cycle.
--> 358         return weak_wrapped_fn().__wrapped__(*args, **kwds)
    359     weak_wrapped_fn = weakref.ref(wrapped_fn)
    360 

~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2_utils.py in distributed_function(input_iterator)
     71     strategy = distribution_strategy_context.get_strategy()
     72     outputs = strategy.experimental_run_v2(
---> 73         per_replica_function, args=(model, x, y, sample_weights))
     74     # Out of PerReplica outputs reduce or pick values to return.
     75     all_outputs = dist_utils.unwrap_output_dict(

~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/distribute/distribute_lib.py in experimental_run_v2(self, fn, args, kwargs)
    758       fn = autograph.tf_convert(fn, ag_ctx.control_status_ctx(),
    759                                 convert_by_default=False)
--> 760       return self._extended.call_for_each_replica(fn, args=args, kwargs=kwargs)
    761 
    762   def reduce(self, reduce_op, value, axis):

~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/distribute/distribute_lib.py in call_for_each_replica(self, fn, args, kwargs)
   1785       kwargs = 
   1786     with self._container_strategy().scope():
-> 1787       return self._call_for_each_replica(fn, args, kwargs)
   1788 
   1789   def _call_for_each_replica(self, fn, args, kwargs):

~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/distribute/distribute_lib.py in _call_for_each_replica(self, fn, args, kwargs)
   2130         self._container_strategy(),
   2131         replica_id_in_sync_group=constant_op.constant(0, dtypes.int32)):
-> 2132       return fn(*args, **kwargs)
   2133 
   2134   def _reduce_to(self, reduce_op, value, destinations):

~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/autograph/impl/api.py in wrapper(*args, **kwargs)
    290   def wrapper(*args, **kwargs):
    291     with ag_ctx.ControlStatusCtx(status=ag_ctx.Status.DISABLED):
--> 292       return func(*args, **kwargs)
    293 
    294   if inspect.isfunction(func) or inspect.ismethod(func):

~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2_utils.py in train_on_batch(model, x, y, sample_weight, class_weight, reset_metrics)
    262       y,
    263       sample_weights=sample_weights,
--> 264       output_loss_metrics=model._output_loss_metrics)
    265 
    266   if reset_metrics:

~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_eager.py in train_on_batch(model, inputs, targets, sample_weights, output_loss_metrics)
    309           sample_weights=sample_weights,
    310           training=True,
--> 311           output_loss_metrics=output_loss_metrics))
    312   if not isinstance(outs, list):
    313     outs = [outs]

~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_eager.py in _process_single_batch(model, inputs, targets, output_loss_metrics, sample_weights, training)
    250               output_loss_metrics=output_loss_metrics,
    251               sample_weights=sample_weights,
--> 252               training=training))
    253       if total_loss is None:
    254         raise ValueError('The model cannot be run '

~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_eager.py in _model_loss(model, inputs, targets, output_loss_metrics, sample_weights, training)
    164 
    165         if hasattr(loss_fn, 'reduction'):
--> 166           per_sample_losses = loss_fn.call(targets[i], outs[i])
    167           weighted_losses = losses_utils.compute_weighted_loss(
    168               per_sample_losses,

~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/keras/losses.py in call(self, y_true, y_pred)
    219       y_pred, y_true = tf_losses_util.squeeze_or_expand_dimensions(
    220           y_pred, y_true)
--> 221     return self.fn(y_true, y_pred, **self._fn_kwargs)
    222 
    223   def get_config(self):

~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/keras/losses.py in mean_absolute_error(y_true, y_pred)
    781   y_pred = ops.convert_to_tensor(y_pred)
    782   y_true = math_ops.cast(y_true, y_pred.dtype)
--> 783   return K.mean(math_ops.abs(y_pred - y_true), axis=-1)
    784 
    785 

~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/ops/math_ops.py in binary_op_wrapper(x, y)
    897     with ops.name_scope(None, op_name, [x, y]) as name:
    898       if isinstance(x, ops.Tensor) and isinstance(y, ops.Tensor):
--> 899         return func(x, y, name=name)
    900       elif not isinstance(y, sparse_tensor.SparseTensor):
    901         try:

~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/ops/gen_math_ops.py in sub(x, y, name)
  11086   # Add nodes to the TensorFlow graph.
  11087   _, _, _op = _op_def_lib._apply_op_helper(
> 11088         "Sub", x=x, y=y, name=name)
  11089   _result = _op.outputs[:]
  11090   _inputs_flat = _op.inputs

~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/framework/op_def_library.py in _apply_op_helper(self, op_type_name, name, **keywords)
    791         op = g.create_op(op_type_name, inputs, dtypes=None, name=scope,
    792                          input_types=input_types, attrs=attr_protos,
--> 793                          op_def=op_def)
    794       return output_structure, op_def.is_stateful, op
    795 

~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/framework/func_graph.py in create_op(***failed resolving arguments***)
    546     return super(FuncGraph, self)._create_op_internal(  # pylint: disable=protected-access
    547         op_type, inputs, dtypes, input_types, name, attrs, op_def,
--> 548         compute_device)
    549 
    550   def capture(self, tensor, name=None):

~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/framework/ops.py in _create_op_internal(self, op_type, inputs, dtypes, input_types, name, attrs, op_def, compute_device)
   3427           input_types=input_types,
   3428           original_op=self._default_original_op,
-> 3429           op_def=op_def)
   3430       self._create_op_helper(ret, compute_device=compute_device)
   3431     return ret

~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/framework/ops.py in __init__(self, node_def, g, inputs, output_types, control_inputs, input_types, original_op, op_def)
   1771           op_def, inputs, node_def.attr)
   1772       self._c_op = _create_c_op(self._graph, node_def, grouped_inputs,
-> 1773                                 control_input_ops)
   1774     # pylint: enable=protected-access
   1775 

~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/framework/ops.py in _create_c_op(graph, node_def, inputs, control_inputs)
   1611   except errors.InvalidArgumentError as e:
   1612     # Convert to ValueError for backwards compatibility.
-> 1613     raise ValueError(str(e))
   1614 
   1615   return c_op

ValueError: Dimensions must be equal, but are 50 and 8 for 'loss/lstm_loss/sub' (op: 'Sub') with input shapes: [?,50], [?,1,8].

数据集可用here。

我的问题是:

Keras LSTM 层不是为此设计的吗?也许它只适用于一个时间序列? 如果 LSTM 很好,有什么好的解决方法? 如果 LSTM 不好,如果我想对特征之间的交互进行建模并同时对所有特征进行预测,我应该使用哪些其他方法?

谢谢!

【问题讨论】:

【参考方案1】:

我找到了解决方案here(在“多个并行系列”下)。我们只需要重塑特征和标签并在网络中输入,它就可以工作了!特征应具有 (n_steps, n_features) 的形状,而标签应具有 (n_samples, n_features) 的形状(如果我们预测 1 个时间步)。

from sklearn.preprocessing import LabelEncoder, MinMaxScaler

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense


from math import sqrt
from numpy import array, concatenate
from matplotlib import pyplot
from pandas import read_csv
from pandas import DataFrame
from pandas import concat


from pandas import read_csv, DataFrame, concat
from datetime import datetime
# load data
def parse(x):
    return datetime.strptime(x, '%Y %m %d %H')
dataset = read_csv('raw.csv',  parse_dates = [['year', 'month', 'day', 'hour']], index_col=0, date_parser=parse)
dataset.drop('No', axis=1, inplace=True)
# manually specify column names
dataset.columns = ['pollution', 'dew', 'temp', 'press', 'wnd_dir', 'wnd_spd', 'snow', 'rain']
dataset.index.name = 'date'
# mark all NA values with 0
dataset['pollution'].fillna(0, inplace=True)
# drop the first 24 hours
dataset = dataset[24:]
# summarize first 5 rows
print(dataset.head(5))
# save to file
dataset.to_csv('pollution.csv')


# load dataset
dataset = read_csv('pollution.csv', header=0, index_col=0)
values = dataset.values
# integer encode direction
encoder = LabelEncoder()
values[:,4] = encoder.fit_transform(values[:,4])
# ensure all data is float
values = values.astype('float32')
# normalize features
scaler = MinMaxScaler(feature_range=(0, 1))
scaled = scaler.fit_transform(values)

n_steps = 10
n_features = 8

def split_sequences(sequences, n_steps):
    X, y = list(), list()
    for i in range(len(sequences)):
        # find the end of this pattern
        end_ix = i + n_steps
        # check if we are beyond the dataset
        if end_ix > len(sequences)-1:
            break
        # gather input and output parts of the pattern
        seq_x, seq_y = sequences[i:end_ix, :], sequences[end_ix, :]
        X.append(seq_x)
        y.append(seq_y)
    return array(X), array(y)

X, y = split_sequences(sequences=scaled, n_steps=n_steps)
print(X.shape, y.shape)

X = X[:1000, :]
y = y[:1000, :]

# define model
model = Sequential()
model.add(LSTM(50, activation='relu', return_sequences=False, input_shape=(n_steps, n_features)))
#model.add(LSTM(100, activation='relu'))
model.add(Dense(n_features))
model.compile(optimizer='adam', loss='mse')


# fit model
model.fit(X, y, epochs=100, verbose=1)
# demonstrate prediction
x_input = X[0]
x_input = x_input.reshape((1, n_steps, n_features))
yhat = model.predict(x_input, verbose=0)
print(yhat)

【讨论】:

以上是关于如何让 Keras LSTM 在多变量设置中对多个时间序列进行预测?的主要内容,如果未能解决你的问题,请参考以下文章

如何在keras中堆叠多个lstm?

教你搭建多变量时间序列预测模型LSTM(附代码数据集)

keras 序贯(Sequential)模型 常见设置

如何选择 LSTM Keras 参数?

如何在 keras 中拟合两个连接 LSTM 的模型?

如何在Keras训练LSTM的初始状态?