7个最新的时间序列分析库介绍和代码示例

Posted deephub

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了7个最新的时间序列分析库介绍和代码示例相关的知识,希望对你有一定的参考价值。

时间序列分析包括检查随着时间推移收集的数据点,目的是确定可以为未来预测提供信息的模式和趋势。我们已经介绍过很多个时间序列分析库了,但是随着时间推移,新的库和更新也在不断的出现,所以本文将分享8个目前比较常用的,用于处理时间序列问题的Python库。他们是tsfresh, autots, darts, atspy, kats, sktime, greykite。

1、Tsfresh

Tsfresh在时间序列特征提取和选择方面功能强大。它旨在自动从时间序列数据中提取大量特征,并识别出最相关的特征。Tsfresh支持多种时间序列格式,可用于分类、聚类和回归等各种应用程序。

 importpandasaspd
 fromtsfreshimportextract_features
 fromtsfresh.utilities.dataframe_functionsimportmake_forecasting_frame
 
 # Assume we have a time series dataset `data` with columns "time" and "value"
 data=pd.read_csv('data.csv')
 
 # We will use the last 10 points to predict the next point
 df_shift, y=make_forecasting_frame(data["value"], kind="value", max_timeshift=10, rolling_direction=1)
 
 # Extract relevant features using tsfresh
 X=extract_features(df_shift, column_id="id", column_sort="time", column_value="value", impute_function=impute)

2、AutoTS

autots是另一个用于时间序列预测的Python库:

  • 提供了单变量和多变量时间序列预测的各种算法,包括ARIMA, ETS, Prophet和DeepAR。
  • 为最佳模型执行自动模型集成。
  • 提供了上界和下界的置信区间预测。
  • 通过学习最优NaN imputation和异常值去除来处理数据。
 fromautots.datasetsimportload_monthly
 
 df_long=load_monthly(long=True)
 
 fromautotsimportAutoTS
 
 model=AutoTS(
     forecast_length=3,
     frequency='infer',
     ensemble='simple',
     max_generations=5,
     num_validations=2,
 )
 model=model.fit(df_long, date_col='datetime', value_col='value', id_col='series_id')
 
 # Print the description of the best model
 print(model)

3、darts

darts(Data Analytics and Real-Time Systems)有多种时间序列预测模型,包括ARIMA、Prophet、指数平滑的各种变体,以及各种深度学习模型,如LSTMs、gru和tcn。Darts还具有用于交叉验证、超参数调优和特征工程的内置方法。

darts的一个关键特征是能够进行概率预测。这意味着,不仅可以为每个时间步骤生成单点预测,还可以生成可能结果的分布,从而更全面地理解预测中的不确定性。

 importpandasaspd
 importmatplotlib.pyplotasplt
 
 fromdartsimportTimeSeries
 fromdarts.modelsimportExponentialSmoothing
 
 # Read data
 df=pd.read_csv("AirPassengers.csv", delimiter=",")
 
 # Create a TimeSeries, specifying the time and value columns
 series=TimeSeries.from_dataframe(df, "Month", "#Passengers")
 
 # Set aside the last 36 months as a validation series
 train, val=series[:-36], series[-36:]
 
 # Fit an exponential smoothing model, and make a (probabilistic) 
 # prediction over the validation series’ duration
 model=ExponentialSmoothing()
 model.fit(train)
 prediction=model.predict(len(val), num_samples=1000)
 
 # Plot the median, 5th and 95th percentiles
 series.plot()
 prediction.plot(label="forecast", low_quantile=0.05, high_quantile=0.95)
 plt.legend()

4、AtsPy

atspy,可以简单地加载数据并指定要测试的模型,如下面的代码所示。

 # Importing packages
 importpandasaspd
 fromatspyimportAutomatedModel
 
 # Reading data
 df=pd.read_csv("AirPassengers.csv", delimiter=",")
 
 # Preprocessing data 
 data.columns= ['month','Passengers']
 data['month'] =pd.to_datetime(data['month'],infer_datetime_format=True,format='%y%m')
 data.index=data.month
 df_air=data.drop(['month'], axis=1)
 
 # Select the models you want to run:
 models= ['ARIMA','Prophet']
 run_models=AutomatedModel(df=df_air, model_list=models, forecast_len=10)

该包提供了一组完全自动化的模型。包括:

5、kats

kats (kit to Analyze Time Series)是一个由Facebook(现在的Meta)开发的Python库。这个库的三个核心特性是:

模型预测:提供了一套完整的预测工具,包括10+个单独的预测模型、集成、元学习模型、回溯测试、超参数调优和经验预测区间。

检测:Kats支持检测时间序列数据中的各种模式的函数,包括季节性、异常、变化点和缓慢的趋势变化。

特征提取和嵌入:Kats中的时间序列特征(TSFeature)提取模块可以生成65个具有明确统计定义的特征,可应用于大多数机器学习(ML)模型,如分类和回归。

 # pip install kats
 
 importpandasaspd
 fromkats.constsimportTimeSeriesData
 fromkats.models.prophetimportProphetModel, ProphetParams
 
 # Read data
 df=pd.read_csv("AirPassengers.csv", names=["time", "passengers"])
 
 # Convert to TimeSeriesData object
 air_passengers_ts=TimeSeriesData(air_passengers_df)
 
 # Create a model param instance
 params=ProphetParams(seasonality_mode='multiplicative')
 
 # Create a prophet model instance
 m=ProphetModel(air_passengers_ts, params)
 
 # Fit model simply by calling m.fit()
 m.fit()
 
 # Make prediction for next 30 month
 forecast=m.predict(steps=30, freq="MS")
 forecast.head()

6、Sktime

sktime是一个用于时间序列分析的库,它构建在scikit-learn之上,并遵循类似的API,可以轻松地在两个库之间切换。下面是如何使用Sktime进行时间序列分类的示例:

 fromsktime.datasetsimportload_arrow_head
 fromsktime.classification.composeimportTimeSeriesForestClassifier
 fromsktime.utils.samplingimporttrain_test_split
 
 # Load ArrowHead dataset
 X, y=load_arrow_head(return_X_y=True)
 
 # Split data into train and test sets
 X_train, X_test, y_train, y_test=train_test_split(X, y)
 
 # Create and fit a time series forest classifier
 classifier=TimeSeriesForestClassifier(n_estimators=100)
 classifier.fit(X_train, y_train)
 
 # Predict labels for the test set
 y_pred=classifier.predict(X_test)
 
 # Print classification report
 fromsklearn.metricsimportclassification_report
 print(classification_report(y_test, y_pred))

7、GreyKite

greykite是LinkedIn发布的一个时间序列预测库。该库可以处理复杂的时间序列数据,并提供一系列功能,包括自动化特征工程、探索性数据分析、预测管道和模型调优。

 fromgreykite.common.data_loaderimportDataLoader
 fromgreykite.framework.templates.autogen.forecast_configimportForecastConfig
 fromgreykite.framework.templates.autogen.forecast_configimportMetadataParam
 fromgreykite.framework.templates.forecasterimportForecaster
 fromgreykite.framework.templates.model_templatesimportModelTemplateEnum
 
 # Defines inputs
 df=DataLoader().load_bikesharing().tail(24*90)  # Input time series (pandas.DataFrame)
 config=ForecastConfig(
      metadata_param=MetadataParam(time_col="ts", value_col="count"),  # Column names in `df`
      model_template=ModelTemplateEnum.AUTO.name,  # AUTO model configuration
      forecast_horizon=24,   # Forecasts 24 steps ahead
      coverage=0.95,         # 95% prediction intervals
  )
 
 # Creates forecasts
 forecaster=Forecaster()
 result=forecaster.run_forecast_config(df=df, config=config)
 
 # Accesses results
 result.forecast     # Forecast with metrics, diagnostics
 result.backtest     # Backtest with metrics, diagnostics
 result.grid_search  # Time series CV result
 result.model        # Trained model
 result.timeseries   # Processed time series with plotting functions

总结

我们可以看到,这些时间序列的库主要功能有2个方向,一个是特征的生成,另外一个就是多种时间序列预测模型的集成,所以无论是处理单变量还是多变量数据,它们都可以满足我们的需求,但是具体用那个还要看具体的需求和使用的习惯。

https://avoid.overfit.cn/post/45451d119a154aeba72bf8dd3eaa9496

作者:Joanna

trove代码的主从切换源代码分析

本篇文章旨在介绍openstack trove对数据库提升一个从库为主库的源码分析,解决大家在使用过程中的遇到不清楚的问题和疑惑。
该功能的原本是针对一个在线的数据库,如果提升配置,主库遇到异常,需要维护的时候,如果快速切换一个从库为主库提供服务。
openstack trove代码的主从切换promote slave to master的源代码分析,

    def promote_to_replica_source(self, context, instance_id):
        # TODO(atomic77) Promote and eject need to be able to handle the case
        # where a datastore like Postgresql needs to treat the slave to be
        # promoted differently from the old master and the slaves which will
        # be simply reassigned to a new master. See:
        # https://bugs.launchpad.net/trove/+bug/1553339

        def _promote_to_replica_source(old_master, master_candidate,
                                       replica_models):
            # First, we transition from the old master to new as quickly as
            # possible to minimize the scope of unrecoverable error

            # NOTE(zhaochao): we cannot reattach the old master to the new
            # one immediately after the new master is up, because for MariaDB
            # the other replicas are still connecting to the old master, and
            # during reattaching the old master as a slave, new GTID may be
            # created and synced to the replicas. After that, when attaching
            # the replicas to the new master, ‘START SLAVE‘ will fail by
            # ‘fatal error 1236‘ if the binlog of the replica diverged from
            # the new master. So the proper order should be:
            # -1. make the old master read only (and detach floating ips)
            # -2. make sure the new master is up-to-date
            # -3. detach the new master from the old one
            # -4. enable the new master (and attach floating ips)
            # -5. attach the other replicas to the new master
            # -6. attach the old master to the new one
            #     (and attach floating ips)
            # -7. demote the old master
            # What we changed here is the order of the 6th step, previously
            # this step took place right after step 4, which causes failures
            # with MariaDB replications.
            old_master.make_read_only(True)
            master_ips = old_master.detach_public_ips()
            slave_ips = master_candidate.detach_public_ips()
            latest_txn_id = old_master.get_latest_txn_id()
            master_candidate.wait_for_txn(latest_txn_id)
            master_candidate.detach_replica(old_master, for_failover=True)
            master_candidate.enable_as_master()
            master_candidate.attach_public_ips(master_ips)
            master_candidate.make_read_only(False)

            # At this point, should something go wrong, there
            # should be a working master with some number of working slaves,
            # and possibly some number of "orphaned" slaves

            exception_replicas = []
            error_messages = ""
            for replica in replica_models:
                try:
                    if replica.id != master_candidate.id:
                        replica.detach_replica(old_master, for_failover=True)
                        replica.attach_replica(master_candidate)
                except exception.TroveError as ex:
                    log_fmt = ("Unable to migrate replica %(slave)s from "
                               "old replica source %(old_master)s to "
                               "new source %(new_master)s on promote.")
                    exc_fmt = _("Unable to migrate replica %(slave)s from "
                                "old replica source %(old_master)s to "
                                "new source %(new_master)s on promote.")
                    msg_content = {
                        "slave": replica.id,
                        "old_master": old_master.id,
                        "new_master": master_candidate.id}
                    LOG.exception(log_fmt, msg_content)
                    exception_replicas.append(replica)
                    error_messages += "%s (%s)
" % (
                        exc_fmt % msg_content, ex)

            # dealing with the old master after all the other replicas
            # has been migrated.
            old_master.attach_replica(master_candidate)
            old_master.attach_public_ips(slave_ips)
            try:
                old_master.demote_replication_master()
            except Exception as ex:
                log_fmt = "Exception demoting old replica source %s."
                exc_fmt = _("Exception demoting old replica source %s.")
                LOG.exception(log_fmt, old_master.id)
                exception_replicas.append(old_master)
                error_messages += "%s (%s)
" % (
                    exc_fmt % old_master.id, ex)

            self._set_task_status([old_master] + replica_models,
                                  InstanceTasks.NONE)
            if exception_replicas:
                self._set_task_status(exception_replicas,
                                      InstanceTasks.PROMOTION_ERROR)
                msg = (_("promote-to-replica-source %(id)s: The following "
                         "replicas may not have been switched: %(replicas)s:"
                         "
%(err)s") %
                       {"id": master_candidate.id,
                        "replicas": [repl.id for repl in exception_replicas],
                        "err": error_messages})
                raise ReplicationSlaveAttachError(msg)

        with EndNotification(context):
            master_candidate = BuiltInstanceTasks.load(context, instance_id)
            old_master = BuiltInstanceTasks.load(context,
                                                 master_candidate.slave_of_id)
            replicas = []
            for replica_dbinfo in old_master.slaves:
                if replica_dbinfo.id == instance_id:
                    replica = master_candidate
                else:
                    replica = BuiltInstanceTasks.load(context,
                                                      replica_dbinfo.id)
                replicas.append(replica)

            try:
                _promote_to_replica_source(old_master, master_candidate,
                                           replicas)
            except ReplicationSlaveAttachError:
                raise
            except Exception:
                self._set_task_status([old_master] + replicas,
                                      InstanceTasks.PROMOTION_ERROR)
                raise

切换流程如下:
主从切换适合:一主多从的结构,而且绑定了floating_ip的情况。

  1. 根据要切换的slave_id获取实例作为新的maste;
  2. 将所有slave包括现在的作为新master的slave_id,添加到一个replicas列表
  3. 根据旧的master_id获取旧的master实例
    以下开始进入正式切换步骤:_promote_to_replica_source(old_master, master_candidate,replicas)
    注意: 以下说的公网IP是指floating ip

  4. 设置旧的maste实例为只读
  5. 旧master 实例detach公网ip
  6. 新的master实例detach公网ip
  7. 获取旧master实例的同步位置
  8. 等待新的master实例同步到相同位置
  9. 解除新旧master实例的主从关系
  10. 新的master实例启用为master实例。
  11. 新的maste实例attache公网IP
  12. 新的master设置read_only 为flase,例如mysql: set global read_only = False

获取获取slave实例,如果slave实例的id不等于新的maste实例的id。
旧的maste解除主从关系
attache旧的slave到新的maste实例上
旧的maste实例和新的maste实例建立主从关系
旧的maste实例添加公网ip

以上是关于7个最新的时间序列分析库介绍和代码示例的主要内容,如果未能解决你的问题,请参考以下文章

Python可视化应用实战案例30篇-基础绘图命令详解含大量示例代码(附Python代码)

10个自动EDA库功能介绍:几行代码进行的数据分析靠不靠谱

7个最受瞩目的 Python 库,提升你的开发效率

※python自学7个Python生态系统核心库,你值得拥有

我的第一个微信好友数据分析

python做数据分析-简单库的介绍和运用