7个最新的时间序列分析库介绍和代码示例
Posted deephub
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了7个最新的时间序列分析库介绍和代码示例相关的知识,希望对你有一定的参考价值。
时间序列分析包括检查随着时间推移收集的数据点,目的是确定可以为未来预测提供信息的模式和趋势。我们已经介绍过很多个时间序列分析库了,但是随着时间推移,新的库和更新也在不断的出现,所以本文将分享8个目前比较常用的,用于处理时间序列问题的Python库。他们是tsfresh, autots, darts, atspy, kats, sktime, greykite。
1、Tsfresh
Tsfresh在时间序列特征提取和选择方面功能强大。它旨在自动从时间序列数据中提取大量特征,并识别出最相关的特征。Tsfresh支持多种时间序列格式,可用于分类、聚类和回归等各种应用程序。
importpandasaspd
fromtsfreshimportextract_features
fromtsfresh.utilities.dataframe_functionsimportmake_forecasting_frame
# Assume we have a time series dataset `data` with columns "time" and "value"
data=pd.read_csv('data.csv')
# We will use the last 10 points to predict the next point
df_shift, y=make_forecasting_frame(data["value"], kind="value", max_timeshift=10, rolling_direction=1)
# Extract relevant features using tsfresh
X=extract_features(df_shift, column_id="id", column_sort="time", column_value="value", impute_function=impute)
2、AutoTS
autots是另一个用于时间序列预测的Python库:
- 提供了单变量和多变量时间序列预测的各种算法,包括ARIMA, ETS, Prophet和DeepAR。
- 为最佳模型执行自动模型集成。
- 提供了上界和下界的置信区间预测。
- 通过学习最优NaN imputation和异常值去除来处理数据。
fromautots.datasetsimportload_monthly
df_long=load_monthly(long=True)
fromautotsimportAutoTS
model=AutoTS(
forecast_length=3,
frequency='infer',
ensemble='simple',
max_generations=5,
num_validations=2,
)
model=model.fit(df_long, date_col='datetime', value_col='value', id_col='series_id')
# Print the description of the best model
print(model)
3、darts
darts(Data Analytics and Real-Time Systems)有多种时间序列预测模型,包括ARIMA、Prophet、指数平滑的各种变体,以及各种深度学习模型,如LSTMs、gru和tcn。Darts还具有用于交叉验证、超参数调优和特征工程的内置方法。
darts的一个关键特征是能够进行概率预测。这意味着,不仅可以为每个时间步骤生成单点预测,还可以生成可能结果的分布,从而更全面地理解预测中的不确定性。
importpandasaspd
importmatplotlib.pyplotasplt
fromdartsimportTimeSeries
fromdarts.modelsimportExponentialSmoothing
# Read data
df=pd.read_csv("AirPassengers.csv", delimiter=",")
# Create a TimeSeries, specifying the time and value columns
series=TimeSeries.from_dataframe(df, "Month", "#Passengers")
# Set aside the last 36 months as a validation series
train, val=series[:-36], series[-36:]
# Fit an exponential smoothing model, and make a (probabilistic)
# prediction over the validation series’ duration
model=ExponentialSmoothing()
model.fit(train)
prediction=model.predict(len(val), num_samples=1000)
# Plot the median, 5th and 95th percentiles
series.plot()
prediction.plot(label="forecast", low_quantile=0.05, high_quantile=0.95)
plt.legend()
4、AtsPy
atspy,可以简单地加载数据并指定要测试的模型,如下面的代码所示。
# Importing packages
importpandasaspd
fromatspyimportAutomatedModel
# Reading data
df=pd.read_csv("AirPassengers.csv", delimiter=",")
# Preprocessing data
data.columns= ['month','Passengers']
data['month'] =pd.to_datetime(data['month'],infer_datetime_format=True,format='%y%m')
data.index=data.month
df_air=data.drop(['month'], axis=1)
# Select the models you want to run:
models= ['ARIMA','Prophet']
run_models=AutomatedModel(df=df_air, model_list=models, forecast_len=10)
该包提供了一组完全自动化的模型。包括:
5、kats
kats (kit to Analyze Time Series)是一个由Facebook(现在的Meta)开发的Python库。这个库的三个核心特性是:
模型预测:提供了一套完整的预测工具,包括10+个单独的预测模型、集成、元学习模型、回溯测试、超参数调优和经验预测区间。
检测:Kats支持检测时间序列数据中的各种模式的函数,包括季节性、异常、变化点和缓慢的趋势变化。
特征提取和嵌入:Kats中的时间序列特征(TSFeature)提取模块可以生成65个具有明确统计定义的特征,可应用于大多数机器学习(ML)模型,如分类和回归。
# pip install kats
importpandasaspd
fromkats.constsimportTimeSeriesData
fromkats.models.prophetimportProphetModel, ProphetParams
# Read data
df=pd.read_csv("AirPassengers.csv", names=["time", "passengers"])
# Convert to TimeSeriesData object
air_passengers_ts=TimeSeriesData(air_passengers_df)
# Create a model param instance
params=ProphetParams(seasonality_mode='multiplicative')
# Create a prophet model instance
m=ProphetModel(air_passengers_ts, params)
# Fit model simply by calling m.fit()
m.fit()
# Make prediction for next 30 month
forecast=m.predict(steps=30, freq="MS")
forecast.head()
6、Sktime
sktime是一个用于时间序列分析的库,它构建在scikit-learn之上,并遵循类似的API,可以轻松地在两个库之间切换。下面是如何使用Sktime进行时间序列分类的示例:
fromsktime.datasetsimportload_arrow_head
fromsktime.classification.composeimportTimeSeriesForestClassifier
fromsktime.utils.samplingimporttrain_test_split
# Load ArrowHead dataset
X, y=load_arrow_head(return_X_y=True)
# Split data into train and test sets
X_train, X_test, y_train, y_test=train_test_split(X, y)
# Create and fit a time series forest classifier
classifier=TimeSeriesForestClassifier(n_estimators=100)
classifier.fit(X_train, y_train)
# Predict labels for the test set
y_pred=classifier.predict(X_test)
# Print classification report
fromsklearn.metricsimportclassification_report
print(classification_report(y_test, y_pred))
7、GreyKite
greykite是LinkedIn发布的一个时间序列预测库。该库可以处理复杂的时间序列数据,并提供一系列功能,包括自动化特征工程、探索性数据分析、预测管道和模型调优。
fromgreykite.common.data_loaderimportDataLoader
fromgreykite.framework.templates.autogen.forecast_configimportForecastConfig
fromgreykite.framework.templates.autogen.forecast_configimportMetadataParam
fromgreykite.framework.templates.forecasterimportForecaster
fromgreykite.framework.templates.model_templatesimportModelTemplateEnum
# Defines inputs
df=DataLoader().load_bikesharing().tail(24*90) # Input time series (pandas.DataFrame)
config=ForecastConfig(
metadata_param=MetadataParam(time_col="ts", value_col="count"), # Column names in `df`
model_template=ModelTemplateEnum.AUTO.name, # AUTO model configuration
forecast_horizon=24, # Forecasts 24 steps ahead
coverage=0.95, # 95% prediction intervals
)
# Creates forecasts
forecaster=Forecaster()
result=forecaster.run_forecast_config(df=df, config=config)
# Accesses results
result.forecast # Forecast with metrics, diagnostics
result.backtest # Backtest with metrics, diagnostics
result.grid_search # Time series CV result
result.model # Trained model
result.timeseries # Processed time series with plotting functions
总结
我们可以看到,这些时间序列的库主要功能有2个方向,一个是特征的生成,另外一个就是多种时间序列预测模型的集成,所以无论是处理单变量还是多变量数据,它们都可以满足我们的需求,但是具体用那个还要看具体的需求和使用的习惯。
https://avoid.overfit.cn/post/45451d119a154aeba72bf8dd3eaa9496
作者:Joanna
trove代码的主从切换源代码分析
本篇文章旨在介绍openstack trove对数据库提升一个从库为主库的源码分析,解决大家在使用过程中的遇到不清楚的问题和疑惑。
该功能的原本是针对一个在线的数据库,如果提升配置,主库遇到异常,需要维护的时候,如果快速切换一个从库为主库提供服务。
openstack trove代码的主从切换promote slave to master的源代码分析,
def promote_to_replica_source(self, context, instance_id):
# TODO(atomic77) Promote and eject need to be able to handle the case
# where a datastore like Postgresql needs to treat the slave to be
# promoted differently from the old master and the slaves which will
# be simply reassigned to a new master. See:
# https://bugs.launchpad.net/trove/+bug/1553339
def _promote_to_replica_source(old_master, master_candidate,
replica_models):
# First, we transition from the old master to new as quickly as
# possible to minimize the scope of unrecoverable error
# NOTE(zhaochao): we cannot reattach the old master to the new
# one immediately after the new master is up, because for MariaDB
# the other replicas are still connecting to the old master, and
# during reattaching the old master as a slave, new GTID may be
# created and synced to the replicas. After that, when attaching
# the replicas to the new master, ‘START SLAVE‘ will fail by
# ‘fatal error 1236‘ if the binlog of the replica diverged from
# the new master. So the proper order should be:
# -1. make the old master read only (and detach floating ips)
# -2. make sure the new master is up-to-date
# -3. detach the new master from the old one
# -4. enable the new master (and attach floating ips)
# -5. attach the other replicas to the new master
# -6. attach the old master to the new one
# (and attach floating ips)
# -7. demote the old master
# What we changed here is the order of the 6th step, previously
# this step took place right after step 4, which causes failures
# with MariaDB replications.
old_master.make_read_only(True)
master_ips = old_master.detach_public_ips()
slave_ips = master_candidate.detach_public_ips()
latest_txn_id = old_master.get_latest_txn_id()
master_candidate.wait_for_txn(latest_txn_id)
master_candidate.detach_replica(old_master, for_failover=True)
master_candidate.enable_as_master()
master_candidate.attach_public_ips(master_ips)
master_candidate.make_read_only(False)
# At this point, should something go wrong, there
# should be a working master with some number of working slaves,
# and possibly some number of "orphaned" slaves
exception_replicas = []
error_messages = ""
for replica in replica_models:
try:
if replica.id != master_candidate.id:
replica.detach_replica(old_master, for_failover=True)
replica.attach_replica(master_candidate)
except exception.TroveError as ex:
log_fmt = ("Unable to migrate replica %(slave)s from "
"old replica source %(old_master)s to "
"new source %(new_master)s on promote.")
exc_fmt = _("Unable to migrate replica %(slave)s from "
"old replica source %(old_master)s to "
"new source %(new_master)s on promote.")
msg_content = {
"slave": replica.id,
"old_master": old_master.id,
"new_master": master_candidate.id}
LOG.exception(log_fmt, msg_content)
exception_replicas.append(replica)
error_messages += "%s (%s)
" % (
exc_fmt % msg_content, ex)
# dealing with the old master after all the other replicas
# has been migrated.
old_master.attach_replica(master_candidate)
old_master.attach_public_ips(slave_ips)
try:
old_master.demote_replication_master()
except Exception as ex:
log_fmt = "Exception demoting old replica source %s."
exc_fmt = _("Exception demoting old replica source %s.")
LOG.exception(log_fmt, old_master.id)
exception_replicas.append(old_master)
error_messages += "%s (%s)
" % (
exc_fmt % old_master.id, ex)
self._set_task_status([old_master] + replica_models,
InstanceTasks.NONE)
if exception_replicas:
self._set_task_status(exception_replicas,
InstanceTasks.PROMOTION_ERROR)
msg = (_("promote-to-replica-source %(id)s: The following "
"replicas may not have been switched: %(replicas)s:"
"
%(err)s") %
{"id": master_candidate.id,
"replicas": [repl.id for repl in exception_replicas],
"err": error_messages})
raise ReplicationSlaveAttachError(msg)
with EndNotification(context):
master_candidate = BuiltInstanceTasks.load(context, instance_id)
old_master = BuiltInstanceTasks.load(context,
master_candidate.slave_of_id)
replicas = []
for replica_dbinfo in old_master.slaves:
if replica_dbinfo.id == instance_id:
replica = master_candidate
else:
replica = BuiltInstanceTasks.load(context,
replica_dbinfo.id)
replicas.append(replica)
try:
_promote_to_replica_source(old_master, master_candidate,
replicas)
except ReplicationSlaveAttachError:
raise
except Exception:
self._set_task_status([old_master] + replicas,
InstanceTasks.PROMOTION_ERROR)
raise
切换流程如下:
主从切换适合:一主多从的结构,而且绑定了floating_ip的情况。
- 根据要切换的slave_id获取实例作为新的maste;
- 将所有slave包括现在的作为新master的slave_id,添加到一个replicas列表
-
根据旧的master_id获取旧的master实例
以下开始进入正式切换步骤:_promote_to_replica_source(old_master, master_candidate,replicas)
注意: 以下说的公网IP是指floating ip - 设置旧的maste实例为只读
- 旧master 实例detach公网ip
- 新的master实例detach公网ip
- 获取旧master实例的同步位置
- 等待新的master实例同步到相同位置
- 解除新旧master实例的主从关系
- 新的master实例启用为master实例。
- 新的maste实例attache公网IP
- 新的master设置read_only 为flase,例如mysql: set global read_only = False
获取获取slave实例,如果slave实例的id不等于新的maste实例的id。
旧的maste解除主从关系
attache旧的slave到新的maste实例上
旧的maste实例和新的maste实例建立主从关系
旧的maste实例添加公网ip
以上是关于7个最新的时间序列分析库介绍和代码示例的主要内容,如果未能解决你的问题,请参考以下文章
Python可视化应用实战案例30篇-基础绘图命令详解含大量示例代码(附Python代码)