如何让 Keras LSTM 在多变量设置中对多个时间序列进行预测?
Posted
技术标签:
【中文标题】如何让 Keras LSTM 在多变量设置中对多个时间序列进行预测?【英文标题】:How to have Keras LSTM make predictions for multiple time-series in a multivariate setting? 【发布时间】:2020-06-29 03:21:48 【问题描述】:我正在尝试使用 Keras 对多个变量进行同时预测。使用this example here,我想预测所有特征的值,包括 pm 2.5、DEWP、TEMP 等,而不仅仅是污染(pm 2.5)。本质上,这是在给定所有变量的情况下,建立一个模型来将所有变量预测为时间序列,而不仅仅是预测一个变量。
我使用重构后的 3D 数据修改了原始示例代码,但出现错误。代码如下:
from sklearn.preprocessing import LabelEncoder, MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from math import sqrt
from numpy import concatenate
from matplotlib import pyplot
from pandas import read_csv
from pandas import DataFrame
from pandas import concat
from pandas import read_csv, DataFrame, concat
from datetime import datetime
# load data
def parse(x):
return datetime.strptime(x, '%Y %m %d %H')
dataset = read_csv('raw.csv', parse_dates = [['year', 'month', 'day', 'hour']], index_col=0, date_parser=parse)
dataset.drop('No', axis=1, inplace=True)
# manually specify column names
dataset.columns = ['pollution', 'dew', 'temp', 'press', 'wnd_dir', 'wnd_spd', 'snow', 'rain']
dataset.index.name = 'date'
# mark all NA values with 0
dataset['pollution'].fillna(0, inplace=True)
# drop the first 24 hours
dataset = dataset[24:]
# summarize first 5 rows
print(dataset.head(5))
# save to file
dataset.to_csv('pollution.csv')
# convert series to supervised learning
def series_to_supervised(data, n_in=1, n_out=1, dropnan=True):
n_vars = 1 if type(data) is list else data.shape[1]
df = DataFrame(data)
cols, names = list(), list()
# input sequence (t-n, ... t-1)
for i in range(n_in, 0, -1):
cols.append(df.shift(i))
names += [('var%d(t-%d)' % (j+1, i)) for j in range(n_vars)]
# forecast sequence (t, t+1, ... t+n)
for i in range(0, n_out):
cols.append(df.shift(-i))
if i == 0:
names += [('var%d(t)' % (j+1)) for j in range(n_vars)]
else:
names += [('var%d(t+%d)' % (j+1, i)) for j in range(n_vars)]
# put it all together
agg = concat(cols, axis=1)
agg.columns = names
# drop rows with NaN values
if dropnan:
agg.dropna(inplace=True)
return agg
# load dataset
dataset = read_csv('pollution.csv', header=0, index_col=0)
values = dataset.values
# integer encode direction
encoder = LabelEncoder()
values[:,4] = encoder.fit_transform(values[:,4])
# ensure all data is float
values = values.astype('float32')
# normalize features
scaler = MinMaxScaler(feature_range=(0, 1))
scaled = scaler.fit_transform(values)
# frame as supervised learning
reframed = series_to_supervised(scaled, 1, 1)
# split into train and test sets
values = reframed.values
n_train_hours = 365 * 24
train = values[:n_train_hours, :]
test = values[n_train_hours:, :]
# split into input and outputs
train_X, train_y = train[:, :-8], train[:, -8:]
test_X, test_y = test[:, :-8], test[:, -8:]
# reshape input to be 3D [samples, timesteps, features]
train_X_3d = train_X.reshape((train_X.shape[0], 1, train_X.shape[1]))
test_X_3d = test_X.reshape((test_X.shape[0], 1, test_X.shape[1]))
train_y_3d = train_y.reshape((train_y.shape[0], 1, train_y.shape[1]))
test_y_3d = test_y.reshape((test_y.shape[0], 1, test_y.shape[1]))
print(train_X.shape, train_y.shape, test_X.shape, test_y.shape)
print(train_X_3d.shape, train_y_3d.shape, test_X_3d.shape, test_y_3d.shape)
# design network
model = Sequential()
model.add(LSTM(50, input_shape=(train_X_3d.shape[1], train_X_3d.shape[2])))
#model.add(Dense(1))
model.compile(loss='mae', optimizer='adam')
# fit network
history = model.fit(train_X_3d, train_y_3d,
epochs=50, batch_size=72,
validation_data=(test_X_3d, test_y_3d), verbose=2, shuffle=False)
# plot history
pyplot.plot(history.history['loss'], label='train')
pyplot.plot(history.history['val_loss'], label='test')
pyplot.legend()
pyplot.show()
# make a prediction
yhat = model.predict(test_X_3d)
yhat
输出是:
pollution dew temp press wnd_dir wnd_spd snow rain
date
2010-01-02 00:00:00 129.0 -16 -4.0 1020.0 SE 1.79 0 0
2010-01-02 01:00:00 148.0 -15 -4.0 1020.0 SE 2.68 0 0
2010-01-02 02:00:00 159.0 -11 -5.0 1021.0 SE 3.57 0 0
2010-01-02 03:00:00 181.0 -7 -5.0 1022.0 SE 5.36 1 0
2010-01-02 04:00:00 138.0 -7 -5.0 1022.0 SE 6.25 2 0
(8760, 8) (8760, 8) (35039, 8) (35039, 8)
(8760, 1, 8) (8760, 1, 8) (35039, 1, 8) (35039, 1, 8)
Train on 8760 samples, validate on 35039 samples
Epoch 1/50
---------------------------------------------------------------------------
InvalidArgumentError Traceback (most recent call last)
~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/framework/ops.py in _create_c_op(graph, node_def, inputs, control_inputs)
1609 try:
-> 1610 c_op = c_api.TF_FinishOperation(op_desc)
1611 except errors.InvalidArgumentError as e:
InvalidArgumentError: Dimensions must be equal, but are 50 and 8 for 'loss/lstm_loss/sub' (op: 'Sub') with input shapes: [?,50], [?,1,8].
During handling of the above exception, another exception occurred:
ValueError Traceback (most recent call last)
<ipython-input-1-00c079ab5854> in <module>
97 history = model.fit(train_X_3d, train_y_3d,
98 epochs=50, batch_size=72,
---> 99 validation_data=(test_X_3d, test_y_3d), verbose=2, shuffle=False)
100 # plot history
101 pyplot.plot(history.history['loss'], label='train')
~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training.py in fit(self, x, y, batch_size, epochs, verbose, callbacks, validation_split, validation_data, shuffle, class_weight, sample_weight, initial_epoch, steps_per_epoch, validation_steps, validation_freq, max_queue_size, workers, use_multiprocessing, **kwargs)
726 max_queue_size=max_queue_size,
727 workers=workers,
--> 728 use_multiprocessing=use_multiprocessing)
729
730 def evaluate(self,
~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2.py in fit(self, model, x, y, batch_size, epochs, verbose, callbacks, validation_split, validation_data, shuffle, class_weight, sample_weight, initial_epoch, steps_per_epoch, validation_steps, validation_freq, **kwargs)
322 mode=ModeKeys.TRAIN,
323 training_context=training_context,
--> 324 total_epochs=epochs)
325 cbks.make_logs(model, epoch_logs, training_result, ModeKeys.TRAIN)
326
~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2.py in run_one_epoch(model, iterator, execution_function, dataset_size, batch_size, strategy, steps_per_epoch, num_samples, mode, training_context, total_epochs)
121 step=step, mode=mode, size=current_batch_size) as batch_logs:
122 try:
--> 123 batch_outs = execution_function(iterator)
124 except (StopIteration, errors.OutOfRangeError):
125 # TODO(kaftan): File bug about tf function and errors.OutOfRangeError?
~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2_utils.py in execution_function(input_fn)
84 # `numpy` translates Tensors to values in Eager mode.
85 return nest.map_structure(_non_none_constant_value,
---> 86 distributed_function(input_fn))
87
88 return execution_function
~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py in __call__(self, *args, **kwds)
455
456 tracing_count = self._get_tracing_count()
--> 457 result = self._call(*args, **kwds)
458 if tracing_count == self._get_tracing_count():
459 self._call_counter.called_without_tracing()
~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py in _call(self, *args, **kwds)
501 # This is the first call of __call__, so we have to initialize.
502 initializer_map = object_identity.ObjectIdentityDictionary()
--> 503 self._initialize(args, kwds, add_initializers_to=initializer_map)
504 finally:
505 # At this point we know that the initialization is complete (or less
~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py in _initialize(self, args, kwds, add_initializers_to)
406 self._concrete_stateful_fn = (
407 self._stateful_fn._get_concrete_function_internal_garbage_collected( # pylint: disable=protected-access
--> 408 *args, **kwds))
409
410 def invalid_creator_scope(*unused_args, **unused_kwds):
~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py in _get_concrete_function_internal_garbage_collected(self, *args, **kwargs)
1846 if self.input_signature:
1847 args, kwargs = None, None
-> 1848 graph_function, _, _ = self._maybe_define_function(args, kwargs)
1849 return graph_function
1850
~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py in _maybe_define_function(self, args, kwargs)
2148 graph_function = self._function_cache.primary.get(cache_key, None)
2149 if graph_function is None:
-> 2150 graph_function = self._create_graph_function(args, kwargs)
2151 self._function_cache.primary[cache_key] = graph_function
2152 return graph_function, args, kwargs
~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py in _create_graph_function(self, args, kwargs, override_flat_arg_shapes)
2039 arg_names=arg_names,
2040 override_flat_arg_shapes=override_flat_arg_shapes,
-> 2041 capture_by_value=self._capture_by_value),
2042 self._function_attributes,
2043 # Tell the ConcreteFunction to clean up its graph once it goes out of
~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/framework/func_graph.py in func_graph_from_py_func(name, python_func, args, kwargs, signature, func_graph, autograph, autograph_options, add_control_dependencies, arg_names, op_return_value, collections, capture_by_value, override_flat_arg_shapes)
913 converted_func)
914
--> 915 func_outputs = python_func(*func_args, **func_kwargs)
916
917 # invariant: `func_outputs` contains only Tensors, CompositeTensors,
~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py in wrapped_fn(*args, **kwds)
356 # __wrapped__ allows AutoGraph to swap in a converted function. We give
357 # the function a weak reference to itself to avoid a reference cycle.
--> 358 return weak_wrapped_fn().__wrapped__(*args, **kwds)
359 weak_wrapped_fn = weakref.ref(wrapped_fn)
360
~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2_utils.py in distributed_function(input_iterator)
71 strategy = distribution_strategy_context.get_strategy()
72 outputs = strategy.experimental_run_v2(
---> 73 per_replica_function, args=(model, x, y, sample_weights))
74 # Out of PerReplica outputs reduce or pick values to return.
75 all_outputs = dist_utils.unwrap_output_dict(
~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/distribute/distribute_lib.py in experimental_run_v2(self, fn, args, kwargs)
758 fn = autograph.tf_convert(fn, ag_ctx.control_status_ctx(),
759 convert_by_default=False)
--> 760 return self._extended.call_for_each_replica(fn, args=args, kwargs=kwargs)
761
762 def reduce(self, reduce_op, value, axis):
~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/distribute/distribute_lib.py in call_for_each_replica(self, fn, args, kwargs)
1785 kwargs =
1786 with self._container_strategy().scope():
-> 1787 return self._call_for_each_replica(fn, args, kwargs)
1788
1789 def _call_for_each_replica(self, fn, args, kwargs):
~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/distribute/distribute_lib.py in _call_for_each_replica(self, fn, args, kwargs)
2130 self._container_strategy(),
2131 replica_id_in_sync_group=constant_op.constant(0, dtypes.int32)):
-> 2132 return fn(*args, **kwargs)
2133
2134 def _reduce_to(self, reduce_op, value, destinations):
~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/autograph/impl/api.py in wrapper(*args, **kwargs)
290 def wrapper(*args, **kwargs):
291 with ag_ctx.ControlStatusCtx(status=ag_ctx.Status.DISABLED):
--> 292 return func(*args, **kwargs)
293
294 if inspect.isfunction(func) or inspect.ismethod(func):
~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2_utils.py in train_on_batch(model, x, y, sample_weight, class_weight, reset_metrics)
262 y,
263 sample_weights=sample_weights,
--> 264 output_loss_metrics=model._output_loss_metrics)
265
266 if reset_metrics:
~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_eager.py in train_on_batch(model, inputs, targets, sample_weights, output_loss_metrics)
309 sample_weights=sample_weights,
310 training=True,
--> 311 output_loss_metrics=output_loss_metrics))
312 if not isinstance(outs, list):
313 outs = [outs]
~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_eager.py in _process_single_batch(model, inputs, targets, output_loss_metrics, sample_weights, training)
250 output_loss_metrics=output_loss_metrics,
251 sample_weights=sample_weights,
--> 252 training=training))
253 if total_loss is None:
254 raise ValueError('The model cannot be run '
~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_eager.py in _model_loss(model, inputs, targets, output_loss_metrics, sample_weights, training)
164
165 if hasattr(loss_fn, 'reduction'):
--> 166 per_sample_losses = loss_fn.call(targets[i], outs[i])
167 weighted_losses = losses_utils.compute_weighted_loss(
168 per_sample_losses,
~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/keras/losses.py in call(self, y_true, y_pred)
219 y_pred, y_true = tf_losses_util.squeeze_or_expand_dimensions(
220 y_pred, y_true)
--> 221 return self.fn(y_true, y_pred, **self._fn_kwargs)
222
223 def get_config(self):
~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/keras/losses.py in mean_absolute_error(y_true, y_pred)
781 y_pred = ops.convert_to_tensor(y_pred)
782 y_true = math_ops.cast(y_true, y_pred.dtype)
--> 783 return K.mean(math_ops.abs(y_pred - y_true), axis=-1)
784
785
~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/ops/math_ops.py in binary_op_wrapper(x, y)
897 with ops.name_scope(None, op_name, [x, y]) as name:
898 if isinstance(x, ops.Tensor) and isinstance(y, ops.Tensor):
--> 899 return func(x, y, name=name)
900 elif not isinstance(y, sparse_tensor.SparseTensor):
901 try:
~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/ops/gen_math_ops.py in sub(x, y, name)
11086 # Add nodes to the TensorFlow graph.
11087 _, _, _op = _op_def_lib._apply_op_helper(
> 11088 "Sub", x=x, y=y, name=name)
11089 _result = _op.outputs[:]
11090 _inputs_flat = _op.inputs
~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/framework/op_def_library.py in _apply_op_helper(self, op_type_name, name, **keywords)
791 op = g.create_op(op_type_name, inputs, dtypes=None, name=scope,
792 input_types=input_types, attrs=attr_protos,
--> 793 op_def=op_def)
794 return output_structure, op_def.is_stateful, op
795
~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/framework/func_graph.py in create_op(***failed resolving arguments***)
546 return super(FuncGraph, self)._create_op_internal( # pylint: disable=protected-access
547 op_type, inputs, dtypes, input_types, name, attrs, op_def,
--> 548 compute_device)
549
550 def capture(self, tensor, name=None):
~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/framework/ops.py in _create_op_internal(self, op_type, inputs, dtypes, input_types, name, attrs, op_def, compute_device)
3427 input_types=input_types,
3428 original_op=self._default_original_op,
-> 3429 op_def=op_def)
3430 self._create_op_helper(ret, compute_device=compute_device)
3431 return ret
~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/framework/ops.py in __init__(self, node_def, g, inputs, output_types, control_inputs, input_types, original_op, op_def)
1771 op_def, inputs, node_def.attr)
1772 self._c_op = _create_c_op(self._graph, node_def, grouped_inputs,
-> 1773 control_input_ops)
1774 # pylint: enable=protected-access
1775
~/anaconda3/envs/topic_forecaster/lib/python3.7/site-packages/tensorflow_core/python/framework/ops.py in _create_c_op(graph, node_def, inputs, control_inputs)
1611 except errors.InvalidArgumentError as e:
1612 # Convert to ValueError for backwards compatibility.
-> 1613 raise ValueError(str(e))
1614
1615 return c_op
ValueError: Dimensions must be equal, but are 50 and 8 for 'loss/lstm_loss/sub' (op: 'Sub') with input shapes: [?,50], [?,1,8].
数据集可用here。
我的问题是:
Keras LSTM 层不是为此设计的吗?也许它只适用于一个时间序列? 如果 LSTM 很好,有什么好的解决方法? 如果 LSTM 不好,如果我想对特征之间的交互进行建模并同时对所有特征进行预测,我应该使用哪些其他方法?谢谢!
【问题讨论】:
【参考方案1】:我找到了解决方案here(在“多个并行系列”下)。我们只需要重塑特征和标签并在网络中输入,它就可以工作了!特征应具有 (n_steps, n_features) 的形状,而标签应具有 (n_samples, n_features) 的形状(如果我们预测 1 个时间步)。
from sklearn.preprocessing import LabelEncoder, MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from math import sqrt
from numpy import array, concatenate
from matplotlib import pyplot
from pandas import read_csv
from pandas import DataFrame
from pandas import concat
from pandas import read_csv, DataFrame, concat
from datetime import datetime
# load data
def parse(x):
return datetime.strptime(x, '%Y %m %d %H')
dataset = read_csv('raw.csv', parse_dates = [['year', 'month', 'day', 'hour']], index_col=0, date_parser=parse)
dataset.drop('No', axis=1, inplace=True)
# manually specify column names
dataset.columns = ['pollution', 'dew', 'temp', 'press', 'wnd_dir', 'wnd_spd', 'snow', 'rain']
dataset.index.name = 'date'
# mark all NA values with 0
dataset['pollution'].fillna(0, inplace=True)
# drop the first 24 hours
dataset = dataset[24:]
# summarize first 5 rows
print(dataset.head(5))
# save to file
dataset.to_csv('pollution.csv')
# load dataset
dataset = read_csv('pollution.csv', header=0, index_col=0)
values = dataset.values
# integer encode direction
encoder = LabelEncoder()
values[:,4] = encoder.fit_transform(values[:,4])
# ensure all data is float
values = values.astype('float32')
# normalize features
scaler = MinMaxScaler(feature_range=(0, 1))
scaled = scaler.fit_transform(values)
n_steps = 10
n_features = 8
def split_sequences(sequences, n_steps):
X, y = list(), list()
for i in range(len(sequences)):
# find the end of this pattern
end_ix = i + n_steps
# check if we are beyond the dataset
if end_ix > len(sequences)-1:
break
# gather input and output parts of the pattern
seq_x, seq_y = sequences[i:end_ix, :], sequences[end_ix, :]
X.append(seq_x)
y.append(seq_y)
return array(X), array(y)
X, y = split_sequences(sequences=scaled, n_steps=n_steps)
print(X.shape, y.shape)
X = X[:1000, :]
y = y[:1000, :]
# define model
model = Sequential()
model.add(LSTM(50, activation='relu', return_sequences=False, input_shape=(n_steps, n_features)))
#model.add(LSTM(100, activation='relu'))
model.add(Dense(n_features))
model.compile(optimizer='adam', loss='mse')
# fit model
model.fit(X, y, epochs=100, verbose=1)
# demonstrate prediction
x_input = X[0]
x_input = x_input.reshape((1, n_steps, n_features))
yhat = model.predict(x_input, verbose=0)
print(yhat)
【讨论】:
以上是关于如何让 Keras LSTM 在多变量设置中对多个时间序列进行预测?的主要内容,如果未能解决你的问题,请参考以下文章