使用 QuestDB 和 Apache Kafka 处理时间序列数据
Posted 小小怪物
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用 QuestDB 和 Apache Kafka 处理时间序列数据相关的知识,希望对你有一定的参考价值。
Apache Kafka 是一个经过实战考验的分布式流处理平台,在金融行业中很受欢迎,用于处理任务关键型事务工作负载。Kafka 处理大量实时市场数据的能力使其成为交易、风险管理和欺诈检测的核心基础设施组件。金融机构使用 Kafka 从市场数据源、交易数据和其他外部来源流式传输数据,以推动决策。
用于摄取和存储财务数据的常见数据管道涉及将实时数据发布到 Kafka,并利用 Kafka Connect 将其流式传输到数据库。例如,市场数据团队可能会不断更新 Kafka 证券的实时报价,交易团队可能会使用该数据来下达买入/卖出订单。然后,处理后的市场数据和订单可以保存到时间序列数据库中以供进一步分析。
在本文中,我们将创建一个示例数据管道,以说明这在实践中是如何工作的。我们将轮询外部数据源(FinnHub)以获取股票和ETF的实时报价,并将该信息发布到Kafka。然后,Kafka Connect将获取这些记录并将其发布到时间序列数据库(QuestDB)进行分析。
先决条件
- Git
- Docker Engine: 20.10+
- Golang: 1.19+
- FinnHub API Token
设置
若要在本地运行示例,请先克隆存储库。
代码库分为三个部分:
- Golang 代码位于存储库的根目录。
- Kafka Connect QuestDB 映像和 Docker Compose YAML 文件的 Dockerfile 位于 docker 下。
- Kafka Connect 接收器的 JSON 文件位于 kafka-connect-sinks 下。
构建 Kafka Connect QuestDB 映像
我们首先需要使用 QuestDB Sink 连接器构建 Kafka Connect docker 镜像。导航到该目录并运行
docker-compose build
Dockerfile 只是在 Confluent Kafka Connect 基础映像之上通过 Confluent Hub 安装 Kafka QuestDB 连接器:
FROM confluentinc/cp-kafka-connect-base:7.3.2
RUN confluent-hub install --no-prompt questdb/kafka-questdb-connector:0.6
启动 Kafka、Kafka Connect 和 QuestDB
接下来,我们将通过 Docker Compose 设置基础结构。在同一目录中,在后台运行 Docker Compose:
docker-compose up -d
这将启动Kafka + Zookeeper,我们安装了QuestDB连接器的自定义Kafka Connect映像,以及QuestDB。Docker Compose 文件的完整内容请参见原文。
启动 QuestDB Kafka Connect Sink
等待 Docker 容器正常运行(kafka-connect 映像将记录消息),然后我们可以创建 Kafka Connect 接收器。我们将创建两个汇:一个用于特斯拉,一个用于SPY(SPDR S&P 500 ETF),以比较波动股票和整体市场的价格趋势。
发出以下命令以在目录中创建 Tesla 接收器:
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" --data @kafka-connect-sinks/questdb-sink-TSLA.json <http://localhost:8083/connectors>
发出以下命令以在目录中创建 SPY 接收器:
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" --data @kafka-connect-sinks/questdb-sink-SPY.json <http://localhost:8083/connectors>
流式传输实时股票报价
现在我们已经设置了数据管道,我们准备将实时股票报价流式传输到 Kafka 并将其存储在 QuestDB 中。
首先,我们需要从 Finnhub Stock API 获取一个免费的 API 代币。在线创建一个免费帐户并复制 API 密钥。
将该密钥导出到我们的 shell 下:
export FINNHUB_TOKEN=<my-token-here>
实时报价端点返回各种属性,例如当前价格、最高价/最低价/开盘价以及之前的收盘价。由于我们只对当前价格感兴趣,因此我们只获取价格并将票证符号和时间戳添加到 Kafka JSON 消息中。
详细的 Golang 代码请参见原文。
若要开始流式传输数据,请运行代码:
$ go run main.go
要同时获取 SPY 的数据,请打开另一个终端窗口,将符号的代码修改为 SPY,并使用设置的令牌值运行代码。
结果
运行生产者代码后,它将打印出发送给 Kafka 的消息,例如:“Message published to Kafka: “symbol”:”TSLA”,”price”:174.48,”timestamp”:1678743220215”。这些数据被发送到Kafka主题topic_TSLA,并通过Kafka Connect接收器发送到QuestDB。
然后我们可以导航到 localhost:9000 以访问 QuestDB 控制台。在topic_TSLA表中搜索所有记录,我们可以看到我们的实时市场报价:
SELECT * FROM ‘topic_TSLA’
我们还可以从以下方面查看 SPY 数据:
SELECT * FROM ‘topic_SPY’
现在数据在 QuestDB 中,我们可以通过获取 2m 窗口的平均价格来查询聚合信息:
SELECT avg(price), timestamp FROM topic_SPY SAMPLE BY 2m;
通过获取 2m 窗口的平均,我们可以比较 SPY 和特斯拉的价格趋势。
QuestDB - 在 SAMPLE BY 函数中声明变量或执行数学运算
【中文标题】QuestDB - 在 SAMPLE BY 函数中声明变量或执行数学运算【英文标题】:QuestDB - Declaring variables or performing math in SAMPLE BY function 【发布时间】:2021-12-06 19:08:38 【问题描述】:背景:
我正在使用 Telegraf、QuestDB 和 Grafana 来可视化高频时间序列数据。为了减少 Grafana 查询长时间范围数据的时间,我需要利用 QuestDB 的 SAMPLE BY
函数。
目标:
我需要根据利用 Grafana 的系统变量 $__interval_ms 和任意数字的简单数学方程使我的 QuestDB 查询动态化。
预期:
在下面的查询中,我试图利用 Grafana 的系统变量来使我的查询动态化。我希望 QuestDB 能够理解并执行完成 SAMPLE BY
函数所需的数学运算。
SELECT ts time, last(x), last(y), last(z)
FROM accelerometer
WHERE $__timeFilter(ts)
SAMPLE BY ($__interval_ms/1000)T
Grafana 处理此查询后,将其作为...传递给 QuestDB。
SELECT ts time, last(x), last(y), last(z)
FROM accelerometer
WHERE ts BETWEEN '2021-10-12T00:00:00.000Z' AND '2021-10-12T01:00:00.000Z'
SAMPLE BY (30000/1000)T
注意:如果我将 SAMPLE BY (300000/1000)T
替换为 SAMPLE BY 30T
,查询将按预期执行。
结果:
QuestDB 无法识别我希望执行数学运算并失败。
我已采取的步骤:
我已尝试执行以下解决方法:
-
通过
DECLARE @sampler
声明变量——失败
在查询前通过WITH sampler as (SELECT concat($__interval_ms / 1000, 'T')
执行WITH
语句,然后在末尾通过SAMPLE BY sampler
引用sampler
-- 失败
在 Grafana 变量中执行数学运算(通过其标准仪表板变量 GUI)- 失败
在 SAMPLE BY
函数中嵌入 SELECT
语句 -- 失败
到目前为止,我完全没有运气。
如何在 QuestDB 中存储变量(类似于 Postgres 中的 DECLARE
)或在 SAMPLE by
函数中执行数学运算?或者有没有其他我没有想到的方法来解决这个问题?
【问题讨论】:
【参考方案1】:QuestDB 自 6.0.9 起无法解析非常量 SAMPLE BY
句点,这是一种可以执行 SAMPLE BY 100s
但不能像 SAMPLE BY 1000/10s
这样的算术表达式的方法。
【讨论】:
以上是关于使用 QuestDB 和 Apache Kafka 处理时间序列数据的主要内容,如果未能解决你的问题,请参考以下文章
kafka使用Apache Kafka构建可靠的再处理和死信队列
apache kafka性能测试命令使用和构建kafka-perf