使用 TimescaleDB 获取每个组/聚合函数的最新/最新版本的版本化时间序列的有效方法
Posted
技术标签:
【中文标题】使用 TimescaleDB 获取每个组/聚合函数的最新/最新版本的版本化时间序列的有效方法【英文标题】:Efficient way to get latest/newest version of versioned time series per group/aggregate function using TimescaleDB 【发布时间】:2021-12-11 23:09:07 【问题描述】:我们正在迁移到 TimescaleDB,并迁移了一些包含超过 4 亿行的大型表,其中包含版本化的时间序列预测。
表格结构如下,dt_start_utc
保存预测的实际日期,version_utc
保存预测的发布日期(越新,越接近实际预测日期):
sandbox_cord=# \d+ fc_power_raw_import_normalized
Table "public.fc_power_raw_import_normalized"
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
----------------------+-----------------------------+-----------+----------+---------+---------+--------------+-------------
dt_start_utc | timestamp without time zone | | not null | | plain | |
fc_id | integer | | not null | | plain | |
fc_kwh | integer | | | | plain | |
fc_power_supplier_id | integer | | not null | | plain | |
fc_power_type_id | integer | | not null | | plain | |
version_utc | timestamp without time zone | | not null | | plain | |
Indexes:
"fc_power_raw_import_normalized2_pkey" PRIMARY KEY, btree (dt_start_utc, fc_id, fc_power_supplier_id, fc_power_type_id, version_utc)
"fc_power_raw_import_normalize2_dt_start_utc_fc_id_fc_power_s_id" btree (dt_start_utc DESC, fc_id, fc_power_supplier_id, fc_power_type_id, version_utc DESC)
"fc_power_raw_import_normalize2_dt_start_utc_fc_power_supplie_id" btree (dt_start_utc DESC, fc_power_supplier_id, fc_power_type_id)
"fc_power_raw_import_normalized2_dt_start_utc_idx" btree (dt_start_utc DESC)
"fc_power_raw_import_normalized2_version_utc_idx" btree (version_utc DESC)
Triggers:
ts_insert_blocker BEFORE INSERT ON fc_power_raw_import_normalized2 FOR EACH ROW EXECUTE FUNCTION _timescaledb_internal.insert_blocker()
Child tables: _timescaledb_internal._hyper_3_2334_chunk,
_timescaledb_internal._hyper_3_2335_chunk,
_timescaledb_internal._hyper_3_2336_chunk,
_timescaledb_internal._hyper_3_2337_chunk,
_timescaledb_internal._hyper_3_2338_chunk,
_timescaledb_internal._hyper_3_2339_chunk
Access method: heap
...
这里有一些值:
sandbox_cord=# SELECT * FROM fc_power_raw_import_normalized ORDER BY fc_id ASC LIMIT 25;
dt_start_utc | fc_id | fc_kwh | fc_power_supplier_id | fc_power_type_id | version_utc
---------------------+-------+--------+----------------------+------------------+---------------------
2020-08-27 00:00:00 | 9 | 167 | 5 | 1 | 2020-08-23 00:27:03
2020-08-27 00:00:00 | 9 | 150 | 5 | 1 | 2020-08-23 01:12:37
2020-08-27 00:00:00 | 9 | 132 | 5 | 1 | 2020-08-23 07:11:42
2020-08-27 00:00:00 | 9 | 144 | 5 | 1 | 2020-08-23 13:12:11
2020-08-27 00:00:00 | 9 | 161 | 5 | 1 | 2020-08-23 19:13:05
2020-08-27 00:00:00 | 9 | 166 | 5 | 1 | 2020-08-24 01:11:53
...
现在我想通过以下查询获得每组时间序列的最新版本 (fc_id
):
SELECT *
FROM fc_power_raw_import_normalized
WHERE (dt_start_utc, fc_id, fc_power_supplier_id, fc_power_type_id, version_utc) IN (
SELECT
dt_start_utc, fc_id, fc_power_supplier_id, fc_power_type_id, MAX(version_utc) version_utc
FROM
fc_power_raw_import_normalized
WHERE
dt_start_utc > now() - INTERVAL '2 weeks'
GROUP BY
dt_start_utc, fc_id, fc_power_supplier_id, fc_power_type_id
)
AND dt_start_utc > now() - INTERVAL '2 weeks'
ORDER by fc_id, dt_start_utc, version_utc;
UPDATE 或使用 TimescaleDBs last() 函数进行以下查询:
SELECT
dt_start_utc,
fc_id,
fc_power_supplier_id,
fc_power_type_id,
last(fc_kwh, version_utc) AS fc_kwh_last
FROM fc_power_raw_import_normalized
WHERE dt_start_utc > now () - INTERVAL '2 weeks'
GROUP BY dt_start_utc, fc_id, fc_power_supplier_id, fc_power_type_id
ORDER BY dt_start_utc ASC, fc_id ASC;
这会产生:
dt_start_utc | fc_id | fc_kwh | fc_power_supplier_id | fc_power_type_id | version_utc
---------------------+-------+--------+----------------------+------------------+---------------------
2021-10-12 16:45:00 | 19 | 99 | 4 | 1 | 2021-10-12 13:13:50
2021-10-12 16:45:00 | 19 | 99 | 4 | 2 | 2021-10-12 13:14:47
2021-10-12 17:00:00 | 19 | 100 | 4 | 1 | 2021-10-12 13:13:50
2021-10-12 17:00:00 | 19 | 100 | 4 | 2 | 2021-10-12 13:14:47
2021-10-12 17:15:00 | 19 | 103 | 4 | 1 | 2021-10-12 13:13:50
2021-10-12 17:15:00 | 19 | 103 | 4 | 2 | 2021-10-12 13:14:47
2021-10-12 17:30:00 | 19 | 105 | 4 | 1 | 2021-10-12 13:13:50
2021-10-12 17:30:00 | 19 | 105 | 4 | 2 | 2021-10-12 13:14:47
2021-10-12 17:45:00 | 19 | 108 | 4 | 1 | 2021-10-12 13:13:50
2021-10-12 17:45:00 | 19 | 108 | 4 | 2 | 2021-10-12 13:14:47
2021-10-12 18:00:00 | 19 | 108 | 4 | 1 | 2021-10-12 13:13:50
2021-10-12 18:00:00 | 19 | 108 | 4 | 2 | 2021-10-12 13:14:47
2021-10-12 18:15:00 | 19 | 105 | 4 | 1 | 2021-10-12 13:13:50
2021-10-12 18:15:00 | 19 | 105 | 4 | 2 | 2021-10-12 13:14:47
2021-10-12 18:30:00 | 19 | 82 | 4 | 1 | 2021-10-12 18:28:47
2021-10-12 18:30:00 | 19 | 82 | 4 | 2 | 2021-10-12 18:29:59
2021-10-12 18:45:00 | 19 | 82 | 4 | 1 | 2021-10-12 18:28:47
2021-10-12 18:45:00 | 19 | 82 | 4 | 2 | 2021-10-12 18:29:59
2021-10-12 19:00:00 | 19 | 81 | 4 | 1 | 2021-10-12 18:28:47
...
但这实际上很慢 e。 G。 3 个月的计算时间约为 77 秒,而更长的时间范围则需要更多时间。
我现在使用INNER JOIN
或窗口函数尝试了此查询的不同变体,还根据this article 添加了不同的索引。但是这些方法都没有显着提高性能。
另一种方法是使用continuous aggregates,这需要定义一个时间段(请参阅此问题:Continuous aggregates in postgres/timescaledb requires time_bucket-function?)并且似乎不适合这里。底层块大小是我改变的另一件事,但也没有显着改进。
到目前为止,当涉及到这个表结构和查询时,TimescaleDB 的性能并没有比我们优化的“常规”关系数据库 MariaDB 好多少,我希望专用的时间序列数据库具有更好的默认性能.
现在我的问题很简单:如何使用 TimescaleDB 和此表结构有效地为每个组/聚合函数获取最新/最新版本的版本化时间序列?
欢迎任何提示!
【问题讨论】:
这样的版本化时间序列很难。哪个时间戳是您的分区键?你还在做什么其他查询?你主要是买最新的吗?您是完全致力于此模型还是对其他数据模型持开放态度? 嗨大卫!感谢您的回答。分区键是dt_start_utc
。查询主要针对较新的值,但有时针对整个表进行一些历史分析。我并没有完全致力于这个数据模型,但这样当然会更容易。欢迎替代方法和想法!
我同时发现了一个使用 TimescaleDBs last()
函数的更高效的查询公式。它的扩展性似乎稍微好一些,我目前正在做一些测试。
压缩对这类表格和使用场景有好处吗?
另外,version_utc
字段上的分区可能是有益的,如果我没听错的话?
【参考方案1】:
我现在找到了使用Continuous Aggregates 的解决方案(将实时聚合作为默认行为)。这可能很明显,但我还是花了一段时间才发现。所以我会在这里分享它以帮助其他有同样问题的人。
查询稍作修改,但示例显示了一般方法。
例子:
创建一个连续的聚合:
CREATE MATERIALIZED VIEW latest_forecast_15_min
WITH (timescaledb.continuous) AS
SELECT
time_bucket('15 minutes', dt_start_utc) AS dt_start,
fc_id,
fc_power_supplier_id,
fc_power_type_id,
last(fc_kwh, version_utc) AS fc_kwh_last
FROM fc_power_raw_import_normalized
WHERE dt_start_utc > now () - INTERVAL '2 weeks'
GROUP BY dt_start, fc_id, fc_power_supplier_id, fc_power_type_id;
添加过去 2 周每小时更新的刷新策略:
SELECT add_continuous_aggregate_policy('latest_forecast_15_min',
start_offset => INTERVAL '2 WEEKS',
end_offset => INTERVAL '1 HOUR',
schedule_interval => INTERVAL '1 h');
从连续聚合中选择数据:
SELECT
dt_start,
fc_id,
fc_power_supplier_id,
fc_power_type_id,
fc_kwh_last
FROM latest_forecast_15_min;
对于我的示例,这将查询时间从 ~13 seconds
减少到 ~2 seconds
,并提供了为不同查询类型(例如分组或时间间隔)添加更多聚合的灵活性。
【讨论】:
以上是关于使用 TimescaleDB 获取每个组/聚合函数的最新/最新版本的版本化时间序列的有效方法的主要内容,如果未能解决你的问题,请参考以下文章
Python 配置文件:如何获取每个单独执行的时间(无聚合)
pandas使用groupby函数agg函数获取每个分组聚合对应的均值(mean)实战:计算分组聚合单数据列的均值计算分组聚合多数据列的均值
pandas使用groupby函数agg函数获取每个分组聚合对应的标准差(std)实战:计算分组聚合单数据列的标准差(std)计算分组聚合多数据列的标准差(std)