使用 TimescaleDB 获取每个组/聚合函数的最新/最新版本的版本化时间序列的有效方法

Posted

技术标签:

【中文标题】使用 TimescaleDB 获取每个组/聚合函数的最新/最新版本的版本化时间序列的有效方法【英文标题】:Efficient way to get latest/newest version of versioned time series per group/aggregate function using TimescaleDB 【发布时间】:2021-12-11 23:09:07 【问题描述】:

我们正在迁移到 TimescaleDB,并迁移了一些包含超过 4 亿行的大型表,其中包含版本化的时间序列预测。

表格结构如下,dt_start_utc 保存预测的实际日期,version_utc 保存预测的发布日期(越新,越接近实际预测日期):

sandbox_cord=# \d+ fc_power_raw_import_normalized
                                    Table "public.fc_power_raw_import_normalized"
        Column        |            Type             | Collation | Nullable | Default | Storage | Stats target | Description
----------------------+-----------------------------+-----------+----------+---------+---------+--------------+-------------
dt_start_utc         | timestamp without time zone |           | not null |         | plain   |              |
fc_id                | integer                     |           | not null |         | plain   |              |
fc_kwh               | integer                     |           |          |         | plain   |              |
fc_power_supplier_id | integer                     |           | not null |         | plain   |              |
fc_power_type_id     | integer                     |           | not null |         | plain   |              |
version_utc          | timestamp without time zone |           | not null |         | plain   |              |
Indexes:
    "fc_power_raw_import_normalized2_pkey" PRIMARY KEY, btree (dt_start_utc, fc_id, fc_power_supplier_id, fc_power_type_id, version_utc)
    "fc_power_raw_import_normalize2_dt_start_utc_fc_id_fc_power_s_id" btree (dt_start_utc DESC, fc_id, fc_power_supplier_id, fc_power_type_id, version_utc DESC)
    "fc_power_raw_import_normalize2_dt_start_utc_fc_power_supplie_id" btree (dt_start_utc DESC, fc_power_supplier_id, fc_power_type_id)
    "fc_power_raw_import_normalized2_dt_start_utc_idx" btree (dt_start_utc DESC)
    "fc_power_raw_import_normalized2_version_utc_idx" btree (version_utc DESC)
Triggers:
    ts_insert_blocker BEFORE INSERT ON fc_power_raw_import_normalized2 FOR EACH ROW EXECUTE FUNCTION _timescaledb_internal.insert_blocker()
Child tables: _timescaledb_internal._hyper_3_2334_chunk,
            _timescaledb_internal._hyper_3_2335_chunk,
            _timescaledb_internal._hyper_3_2336_chunk,
            _timescaledb_internal._hyper_3_2337_chunk,
            _timescaledb_internal._hyper_3_2338_chunk,
            _timescaledb_internal._hyper_3_2339_chunk
Access method: heap
...

这里有一些值:

sandbox_cord=# SELECT * FROM fc_power_raw_import_normalized ORDER BY fc_id ASC LIMIT 25;
    dt_start_utc     | fc_id | fc_kwh | fc_power_supplier_id | fc_power_type_id |     version_utc
---------------------+-------+--------+----------------------+------------------+---------------------
2020-08-27 00:00:00 |     9 |    167 |                    5 |                1 | 2020-08-23 00:27:03
2020-08-27 00:00:00 |     9 |    150 |                    5 |                1 | 2020-08-23 01:12:37
2020-08-27 00:00:00 |     9 |    132 |                    5 |                1 | 2020-08-23 07:11:42
2020-08-27 00:00:00 |     9 |    144 |                    5 |                1 | 2020-08-23 13:12:11
2020-08-27 00:00:00 |     9 |    161 |                    5 |                1 | 2020-08-23 19:13:05
2020-08-27 00:00:00 |     9 |    166 |                    5 |                1 | 2020-08-24 01:11:53
...

现在我想通过以下查询获得每组时间序列的最新版本 (fc_id):

SELECT *
FROM fc_power_raw_import_normalized
WHERE  (dt_start_utc, fc_id, fc_power_supplier_id, fc_power_type_id, version_utc) IN (
    SELECT
    dt_start_utc, fc_id, fc_power_supplier_id, fc_power_type_id, MAX(version_utc) version_utc
    FROM
    fc_power_raw_import_normalized
    WHERE
    dt_start_utc > now() - INTERVAL '2 weeks'
    GROUP BY
    dt_start_utc, fc_id, fc_power_supplier_id, fc_power_type_id
)
AND dt_start_utc > now() - INTERVAL '2 weeks'
ORDER by fc_id, dt_start_utc, version_utc;

UPDATE 或使用 TimescaleDBs last() 函数进行以下查询:

SELECT
dt_start_utc,
    fc_id,
    fc_power_supplier_id,
    fc_power_type_id,
    last(fc_kwh, version_utc) AS fc_kwh_last
FROM fc_power_raw_import_normalized
WHERE dt_start_utc > now () - INTERVAL '2 weeks'
GROUP BY dt_start_utc, fc_id, fc_power_supplier_id, fc_power_type_id
ORDER BY dt_start_utc ASC, fc_id ASC;

这会产生:

   dt_start_utc     | fc_id | fc_kwh | fc_power_supplier_id | fc_power_type_id |     version_utc
---------------------+-------+--------+----------------------+------------------+---------------------
2021-10-12 16:45:00 |    19 |     99 |                    4 |                1 | 2021-10-12 13:13:50
2021-10-12 16:45:00 |    19 |     99 |                    4 |                2 | 2021-10-12 13:14:47
2021-10-12 17:00:00 |    19 |    100 |                    4 |                1 | 2021-10-12 13:13:50
2021-10-12 17:00:00 |    19 |    100 |                    4 |                2 | 2021-10-12 13:14:47
2021-10-12 17:15:00 |    19 |    103 |                    4 |                1 | 2021-10-12 13:13:50
2021-10-12 17:15:00 |    19 |    103 |                    4 |                2 | 2021-10-12 13:14:47
2021-10-12 17:30:00 |    19 |    105 |                    4 |                1 | 2021-10-12 13:13:50
2021-10-12 17:30:00 |    19 |    105 |                    4 |                2 | 2021-10-12 13:14:47
2021-10-12 17:45:00 |    19 |    108 |                    4 |                1 | 2021-10-12 13:13:50
2021-10-12 17:45:00 |    19 |    108 |                    4 |                2 | 2021-10-12 13:14:47
2021-10-12 18:00:00 |    19 |    108 |                    4 |                1 | 2021-10-12 13:13:50
2021-10-12 18:00:00 |    19 |    108 |                    4 |                2 | 2021-10-12 13:14:47
2021-10-12 18:15:00 |    19 |    105 |                    4 |                1 | 2021-10-12 13:13:50
2021-10-12 18:15:00 |    19 |    105 |                    4 |                2 | 2021-10-12 13:14:47
2021-10-12 18:30:00 |    19 |     82 |                    4 |                1 | 2021-10-12 18:28:47
2021-10-12 18:30:00 |    19 |     82 |                    4 |                2 | 2021-10-12 18:29:59
2021-10-12 18:45:00 |    19 |     82 |                    4 |                1 | 2021-10-12 18:28:47
2021-10-12 18:45:00 |    19 |     82 |                    4 |                2 | 2021-10-12 18:29:59
2021-10-12 19:00:00 |    19 |     81 |                    4 |                1 | 2021-10-12 18:28:47
...

但这实际上很慢 e。 G。 3 个月的计算时间约为 77 秒,而更长的时间范围则需要更多时间。

我现在使用INNER JOIN 或窗口函数尝试了此查询的不同变体,还根据this article 添加了不同的索引。但是这些方法都没有显着提高性能。

另一种方法是使用continuous aggregates,这需要定义一个时间段(请参阅此问题:Continuous aggregates in postgres/timescaledb requires time_bucket-function?)并且似乎不适合这里。底层块大小是我改变的另一件事,但也没有显着改进。

到目前为止,当涉及到这个表结构和查询时,TimescaleDB 的性能并没有比我们优化的“常规”关系数据库 MariaDB 好多少,我希望专用的时间序列数据库具有更好的默认性能.

现在我的问题很简单:如何使用 TimescaleDB 和此表结构有效地为每个组/聚合函数获取最新/最新版本的版本化时间序列?

欢迎任何提示!

【问题讨论】:

这样的版本化时间序列很难。哪个时间戳是您的分区键?你还在做什么其他查询?你主要是买最新的吗?您是完全致力于此模型还是对其他数据模型持开放态度? 嗨大卫!感谢您的回答。分区键是dt_start_utc。查询主要针对较新的值,但有时针对整个表进行一些历史分析。我并没有完全致力于这个数据模型,但这样当然会更容易。欢迎替代方法和想法! 我同时发现了一个使用 TimescaleDBs last() 函数的更高效的查询公式。它的扩展性似乎稍微好一些,我目前正在做一些测试。 压缩对这类表格和使用场景有好处吗? 另外,version_utc 字段上的分区可能是有益的,如果我没听错的话? 【参考方案1】:

我现在找到了使用Continuous Aggregates 的解决方案(将实时聚合作为默认行为)。这可能很明显,但我还是花了一段时间才发现。所以我会在这里分享它以帮助其他有同样问题的人。

查询稍作修改,但示例显示了一般方法。

例子:

    创建一个连续的聚合:

    CREATE MATERIALIZED VIEW latest_forecast_15_min
    WITH (timescaledb.continuous) AS
    SELECT
    time_bucket('15 minutes', dt_start_utc) AS dt_start,
    fc_id,
    fc_power_supplier_id,
    fc_power_type_id,
    last(fc_kwh, version_utc) AS fc_kwh_last
    FROM fc_power_raw_import_normalized
    WHERE dt_start_utc > now () - INTERVAL '2 weeks'
    GROUP BY dt_start, fc_id, fc_power_supplier_id, fc_power_type_id;
    

    添加过去 2 周每小时更新的刷新策略:

    SELECT add_continuous_aggregate_policy('latest_forecast_15_min',
        start_offset => INTERVAL '2 WEEKS',
        end_offset => INTERVAL '1 HOUR',
        schedule_interval => INTERVAL '1 h');
    

    从连续聚合中选择数据:

    SELECT
    dt_start,
    fc_id,
    fc_power_supplier_id,
    fc_power_type_id,
    fc_kwh_last
    FROM latest_forecast_15_min;
    

对于我的示例,这将查询时间从 ~13 seconds 减少到 ~2 seconds,并提供了为不同查询类型(例如分组或时间间隔)添加更多聚合的灵活性。

【讨论】:

以上是关于使用 TimescaleDB 获取每个组/聚合函数的最新/最新版本的版本化时间序列的有效方法的主要内容,如果未能解决你的问题,请参考以下文章

Python 配置文件:如何获取每个单独执行的时间(无聚合)

pandas使用groupby函数agg函数获取每个分组聚合对应的均值(mean)实战:计算分组聚合单数据列的均值计算分组聚合多数据列的均值

oracle分析函数

over(partition by)开窗函数的使用

pandas使用groupby函数agg函数获取每个分组聚合对应的标准差(std)实战:计算分组聚合单数据列的标准差(std)计算分组聚合多数据列的标准差(std)

分析函数之开窗函数over