用 DolphinDB 和 Python Celery 搭建一个高性能因子计算平台

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了用 DolphinDB 和 Python Celery 搭建一个高性能因子计算平台相关的知识,希望对你有一定的参考价值。

因子挖掘是量化金融研究和交易的核心工作。传统的开发流程中,通常使用 Python 从关系型数据库(如 SqlServer, Oracle 等)读取数据,在 Python 中进行因子计算。随着证券交易规模不断扩大以及交易数据量的激增,用户对因子计算平台的性能提出了更高的要求。传统的因子计算工程面临以下问题:


  • 因子数据量的增长以及 Python 作为计算平台导致的性能瓶颈问题
  • 传统的计算平台框架如何无缝替换问题

本教程聚焦于因子批量化计算这一实际业务场景,将 DolphinDB 作为一个核心的计算工具引入到传统的因子平台。以DolphinDB 为核心的因子计算平台包括:数据同步模块、因子批计算模块和因子调度模块。其中 dataX 作为数据同步工具,负责将原始数据以及增量数据从关系数据库同步存储到 DolphinDB 中;DolphinDB 作为因子计算与存储模块;Celery 作为任务调度框架模块。

DolphinDB 因子计算平台,可以为业务部门提供即时的因子计算服务、大数据批计算服务以及历史因子查询服务等。引入DolphinDB后,既能满足高中低频因子计算的要求,又有丰富的API以及ETL工具实现无缝集成。下文以 WorldQuant 101 Alpha 因子指标库中的1号因子 ​​WQAlpha1​​ 为例来展现出整个因子计算平台的构建流程。


1. 总体架构

基于 DolphinDB 与 Python Celery 框架的因子平台构建,主要包括基于 dataX 的 SQL Server Reader (历史数据同步模块)和 DolphinDB (数据存储与计算模块)的数据源注入,DolphinDB 因子函数的定义和调用,Celery 框架(因子平台调度模块)调用因子函数传递参数和最终以 ​​Dataframe​​ 格式的数据可视化展示等。其整体构图如下所示:

用

1.1 SQL Server 概述

  • 介绍:
    SQL Server 是由 Microsoft 开发和推广的关系数据库管理系统。
    SQL Server 为整个架构中原始数据来源部分。
  • 教程与下载、安装:
    有关 SQL Server 的使用、下载、安装可以参考 ​​SQL Server官方文档​​。

1.2 dataX 概述

  • 介绍:
    dataX 是一个异构数据源离线同步工具,用于实现包括 mysql, Oracle, SQL Server, Postgre 等各种异构数据源之间高效的数据同步功能。
    本教程中的 dolphindbWriter 的 dataX 插件能够帮助用户将 SQL Server 数据导入到 DolphinDB 中。
  • 教程与下载、安装:
    有关 dataX 的使用、安装可以参考 ​​dataX 指南​​,下载请点击 ​​dataX​​。

1.3 DolphinDB 概述

  • 介绍:
    DolphinDB 是一款高性能的时序数据处理框架,用于计算高频因子以及因子存储等。
    本教程将 DolphinDB 作为因子计算的主要工具,并结合自身所特有的函数视图功能实现预定义的因子函数在 Python 中调用 DolphinDB 的 Python API 。
  • 教程与下载、安装:
    有关 DolphinDB 的安装指南可以参考 ​​DolphinDB安装使用指南​​,点击​​下载链接​​下载。其中 Python api 的调用可以参考 ​​Python API for DolphinDB​​。

1.4 Celery 概述

  • 介绍:
    Celery 是一个基于 Python 开发的简单、灵活、可靠的分布式异步消息队列,用于实现异步任务(Async Task)处理,在实际使用过程中需要借助消息中间件(Broker)监控任务执行单元(Worker)的执行情况,并将任务执行结果最终存储到结果存储(Backend)中。
    Celery 具有如下优势:
  • 能够实现异步发起并处理请求的功能,更方便实现了 Python 多线程;
  • 方便集成到诸如 rabbitMQ,DolphinDB 等组件,具有较强的扩展性。

本教程中将 Celery 作为任务调度的框架,并结合 redis 作为消息中间件和结果存储实现对因子计算的任务调用。


注:为了防止在使用 Celery 框架过程中出现诸如 ​​TypeError: __init__() got an unexpected keyword argument username​​ 的报错,建议在安装 Celery 框架后卸载默认的 kombou 库并指定安装 ​​5.1.0​​ 版本的库。

2. 环境部署

注:
1.本教程所介绍的是测试环境的部署,因此所部署的 DolphinDB 服务为单节点版,具体部署教程可以参考 ​​​DolphinDB 单节点部署教程​​​;
2.本教程所使用的 Celery 版本为4.3.0.


  • 硬件环境:

硬件名称

配置信息

主机名

cnserver9

外网 IP

xxx.xxx.xxx.122

操作系统

Linux(内核版本3.10以上)

内存

64 GB

CPU

x86_64(12核心)

  • 软件环境:

软件名称

版本信息

DolphinDB

V2.00.7

SQL Server

2019最新版

dataX

3.0

JDK(安装dataX必备)

1.8.0_xxx

Maven(安装dataX必备)

3.6.1+

Python(安装dataX必备)

2.x

Python(包含需要预先安装 numpy、pandas、Celery 等库以及 DolphinDB 的 Python api )

3.7.9

redis

6.2.7

3. 开发使用范例

3.1 数据介绍

本教程选取了 2020.01.01 - 2021.01.01 间多只股票的每日收盘价格,总数据条数为 544174 条。以下是收盘价格表在 SQL Server 和 DolphinDB 中的数据结构:

字段名

字段含义

数据类型(SQL Server)

数据类型(DolphinDB)

SecurityID

股票代码

varchar

SYMBOL

TradeDate

交易日期

date

DATE

Value

收盘价格

float

DOUBLE

3.2 业务场景与指标介绍

本教程选取了 WorldQuant 101 Alpha 因子指标库中的1号因子 ​​WQAlpha1​​​ 为案例进行计算,有关该指标库的详细内容和该因子的引用方法可以参考 ​​WorldQuant 101 Alpha 因子指标库​​。


3.3 dataX 同步 SQL Server 数据到 DolphinDB

本节介绍如何将 SQL Server 的数据同步到 DolphinDB 中。

注:
1.本教程中默认已经预先构建好了存有数据的 SQL Server 数据库,在以下的流程介绍中不再描述其构建过程;
2.本教程中部署的单节点版的 DolphinDB 服务的对应端口为 ​​​8848​​ 。


  • DolphinDB 数据库表构建:
    在数据导入到 DolphinDB 之前,需要预先在部署的 DolphinDB 服务中构建数据库表,执行如下 DolphinDB 脚本建立数据库 ​​dfs://tick_close​​ 和其中的数据表 ​​tick_close​​:
dbName = "dfs://tick_close"
tbName = "tick_close"
if(existsDatabase(dbName))
dropDatabase(dbName)

db = database(dbName, RANGE, date(datetimeAdd(2000.01M,0..50*12,M)))
name = `SecurityID`TradeDate`Value
type = `SYMBOL`DATE`DOUBLE
schemaTable = table(1:0, name, type)
db.createPartitionedTable(table=schemaTable, tableName=tbName, partitionColumns=`TradeDate)
  • 撰写导入配置文件:
    在启动 dataX 执行数据导入命令时,需要先撰写以 json 为格式的配置文件,用来指定数据同步过程中的数据源相关配置。
    在一般情况下,对每一张数据表的同步往往需要对应撰写一个配置文件。本教程中针对 tick_close 数据表分别撰写了如下的 tick_close.json 文件:

"job":
"content": [

"writer":
"parameter":
"dbPath": "dfs://tick_close",
"tableName": "tick_close",
"batchSize": 100,
"userId": "admin",
"pwd": "123456",
"host": "127.0.0.1",
"table": [

"type": "DT_SYMBOL",
"name": "SecurityID"
,
"type": "DT_DATE",
"name": "TradeDate"
,

"type": "DT_DOUBLE",
"name": "Value"

],
"port": 8848
,
"name": "dolphindbwriter"
,
"reader":
"name": "sqlserverreader",
"parameter":
"username": "SA",
"password": "Sa123456",
"column": [
"*"
],
"connection": [

"table": [
"tick_close"
],
"jdbcUrl": [
"jdbc:sqlserver://127.0.0.1:1234;DatabaseName=tick_close"
]


]



],
"setting":
"speed":
"channel": 1



注:本教程涉及到的数据同步仅为历史数据的全量同步,在实际过程中如果有增量同步等的需要,在 ​​writer​​​ 配置中要增加 ​​saveFunctionName​​​ 和 ​​saveFunctionDef​​​ 两个配置,具体用法可以参考​​ 基于 DataX 的 DolphinDB 数据导入工具​​。


  • 执行数据导入命令:
    进入 dataX 的 ​​bin​​ 目录下,分别执行如下命令向 DolphinDB 的 ​​tick_close​​ 数据表中导入数据:
$ python datax.py ../conf/tick_close.json

参数解释:

  • ​datax.py​​:用于启动dataX的脚本,必选
  • ​../conf/tick_close.json​​:存放配置文件的路径,必选

预期输出:

用

3.4 Celery 框架触发 DolphinDB 预定义函数计算

本节介绍如何使用 DolphinDB 脚本实现计算区间收益率的因子函数并使用 Celery 框架调用和触发框架。


  • 消息中间件和结果存储模块 redis 服务的构建:

Celery 框架需要一个消息中间件来发送消息实现任务的调度,同时需要一个结果存储的工具用来存储结果。本教程中我们推荐使用 redis 作为消息中间件和结果存储工具,其部署的端口为 ​​6379​​ 。在实际使用过程中用户可以根据实际情况选择所使用的的工具和部署方法。本教程中关于 redis 的部署流程省略。


  • DolphinDB 因子函数实现流程:

登录机器或使用 DolphinDB GUI 或 VScode 插件连接 DolphinDB 服务使用 DolphinDB 脚本预定义函数。本教程封装了 WorldQuant 101 Alpha 因子指标库中的1号因子 ​​WQAlpha1​​,并以之为案例,其代码实现如下:

/**
* 因子:WorldQuant 101 Alpha 因子指标库中的1号因子 WQAlpha1
参数:
security_id:STRING VECTOR,股票代码序列
begin_date:DATE,区间开始日期
end_date:DATE,区间结束日期
*/
use wq101alpha
defg get_alpha1(security_id, begin_date, end_date)
if (typestr(security_id) == STRING VECTOR && typestr(begin_date) == `DATE && typestr(end_date) == `DATE)
tick_list = select * from loadTable("dfs://tick_close", "tick_close") where TradeDate >= begin_date and TradeDate <= end_date and SecurityID in security_id
alpha1_list=WQAlpha1(panel(tick_list.TradeDate, tick_list.SecurityID, tick_list.Value))
return table(alpha1_list.rowNames() as TradeDate, alpha1_list)

else
print("What you have entered is a wrong type")
return `NULLValue

参数解释:

  • 请求参数:

字段

说明

类型(dolphindb)

备注

是否必填

security_id

股票代码序列

STRING VECTOR

该时间序列中的各元素以日划分


begin_date

区间开始日期

DATE


end_date

区间结束日期

DATE


  • 返回参数:

字段

说明

类型(dolphindb)

备注

returntable

在一定时间区间内某支基金的最大回撤率

IN-MEMORY TABLE

在异常情况下会返回一个类型为STRING的值wrongNum

由于在 Python 代码中使用 Python api 调用 DolphinDB 预定义的函数和在 server 服务内预定义函数的操作属于不同的 ​​session​​,为了使得 Python api 可以成功调用到 DolphinDB 脚本定义的函数,本教程中我们引入了 ​​functionView​​(即函数视图)功能,需要先将函数添加到函数视图中,将函数视图的执行权限给某研究员(admin 用户是不需要赋予权限),具体代码实现如下:

//将该函数加入到函数视图中
addFunctionView(get_alpha1)
//将该函数的调用权限赋予给xxx用户
grant("xxx", VIEW_EXEC, "get_alpha1")
  • Celery 调用因子函数项目构建流程:

本节介绍如何构建一个基于 Celery 框架的项目,本教程所使用的 Celery 安装方式是 ​​pip​​ 命令,登录机器执行如下命令。用户也可以使用其它安装方法:

$ pip install celery==4.3.0 && pip install redis==3.2.0

注:如果出现诸如 ​​TypeError: __init__() got an unexpected keyword argument username​​ 类似的错误,说明安装 Celery 框架同时安装的组件 kombu 库的版本出了问题,建议卸载掉原有版本的库,执行 ​​pip3 install kombu==5.1.0​​ 安装 ​​5.1.0​​ 版本的库。

安装好所需库后,在特定目录下执行如下命令构建项目目录架构和所需文件:

$ mkdir celery_project && touch celery_project/tasks.py celery_project/app.py

执行 ​​tree ./celery_project​​ 命令查看项目目录结构如下:

./celery_project ├── app.py └── tasks.py  0 directories, 2 files

其中,对两个文件中的内容编写分别如下:

​tasks.py​​:该文件用来与 DolphinDB 建立 ​​session​​ 封装调用因子函数,并声明封装的函数为一个可以被 Celery 框架异步调度的任务。

首先,导入所需的 Python 库:

from celery import Celery
import dolphindb as ddb
import numpy as np
import pandas as pd
from datetime import datetime

其次,调用 DolphinDB 的 Python api 与之前 DolphinDB 服务建立 ​​session​​:

s = ddb.session()
s.connect("127.0.0.1", 8848, "admin", "123456")

同时,实例化一个 Celery 对象,并设置相关配置:

app = Celery(
celeryApp,
broker=redis://localhost:6379/1,
backend=redis://localhost:6379/2
)
app.conf.update(
task_serializer=pickle,
accept_content=[pickle],
result_serializer=pickle,
timezone=Asia/Shanghai,
enable_utc=True,
)

注:由于在本次计算过程中涉及到 ​​datetime​​ 和 ​​DataFrame​​ 格式数据的传递和返回,而 Celery 默认的序列化方式 ​​json​​ 无法支持这类数据的序列化,因此要设置 ​​task_serializer​​、​​accept_content​​、​​result_serializer​​ 三个参数设置指定序列化方式为 ​​pickle​​ 。

最后,将调用 DolphinDB 的预定义函数封装成一个函数以供调用,并添加 ​​@app.task()​​ 装饰器以说明执行的任务是可供 Celery 调用的异步任务:

@app.task()
def get_alpha1(security_id, begin_date, end_time):
return s.run("get_alpha1", security_id, begin_date, end_time)

注:这里我们使用了 Python 中的数据类型的参数进行传递,其中的 Python 和 DolphinDB 的数据类型的对应关系和使用 DolphinDB 中的数据类型的传参可以参考 ​​Python API for DolphinDB​​ 的1.3节。

  • ​app.py​​:该文件用来与 DolphinDB 建立 ​​session​​ 封装调用因子函数,并声明封装的函数为一个可以被 Celery 框架异步调度的任务。

如下是代码实现。这里我们使用了循环语句并调用 Celery 中的 ​​delay()​​ 函数向 Celery 框架发送两个任务调用请求,同时在每次循环中打印出任务 ​​id​​:

import numpy as np
from tasks import get_alpha1
security_id_list=[["600020", "600021"],["600022", "600023"]]
if __name__ == __main__:
for i in security_id_list:
result = get_alpha1.delay(i, np.datetime64(2020-01-01), np.datetime64(2020-01-31))
print(result)
  • Celery 调用因子函数任务实现流程:

在命令行执行如下语句运行 Celery 框架的 worker 端:

$ celery -A tasks worker --loglevel=info

预期输出:

-------------- celery@cnserver9 v4.3.0 (rhubarb)
---- **** -----
--- * *** * -- Linux-3.10.0-1160.53.1.el7.x86_64-x86_64-with-centos-7.9.2009-Core 2022-11-11 00:10:34
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: celeryApp:0x7f597a1d4e48
- ** ---------- .> transport: redis://localhost:6379/1
- ** ---------- .> results: redis://localhost:6379/2
- *** --- * --- .> concurrency: 64 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery


[tasks]
. tasks.max_drawdown

[2022-11-11 00:10:37,413: INFO/MainProcess] Connected to redis://localhost:6379/1
[2022-11-11 00:10:37,437: INFO/MainProcess] mingle: searching for neighbors
[2022-11-11 00:10:38,465: INFO/MainProcess] mingle: all alone
[2022-11-11 00:10:38,488: INFO/MainProcess] celery@cnserver9 ready.

由于使用该命令运行 ​​worker​​ 后会一直保持在交互模式,因此我们需要和机器建立一个新的会话,在进入Celery 项目目录后执行如下命令向 Celery 框架发送异步任务调用请求:

$ python3 app.py

预期输出:

400a3024-65a1-4ba6-b8a9-66f6558be242
cd830360-e866-4850-aba0-3a07e8738f78

这时我们查看之前处在运行状态的 ​​worker​​ 端,可以查看到异步任务的执行情况和返回的结果信息:

-------------- celery@cnserver9 v4.3.0 (rhubarb)
---- **** -----
--- * *** * -- Linux-3.10.0-1160.53.1.el7.x86_64-x86_64-with-centos-7.9.2009-Core 2022-11-11 00:10:34
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: celeryApp:0x7f597a1d4e48
- ** ---------- .> transport: redis://localhost:6379/1
- ** ---------- .> results: redis://localhost:6379/2
- *** --- * --- .> concurrency: 64 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery


[tasks]
. tasks.max_drawdown

[2022-11-11 00:10:37,413: INFO/MainProcess] Connected to redis://localhost:6379/1
[2022-11-11 00:10:37,437: INFO/MainProcess] mingle: searching for neighbors
[2022-11-11 00:10:38,465: INFO/MainProcess] mingle: all alone
[2022-11-11 00:10:38,488: INFO/MainProcess] celery@cnserver9 ready.
[2022-11-11 00:12:44,365: INFO/MainProcess] Received task: tasks.max_drawdown[400a3024-65a1-4ba6-b8a9-66f6558be242]
[2022-11-11 00:12:44,369: INFO/MainProcess] Received task: tasks.max_drawdown[cd830360-e866-4850-aba0-3a07e8738f78]
[2022-11-11 00:12:44,846: INFO/ForkPoolWorker-63] Task tasks.get_alpha1[400a3024-65a1-4ba6-b8a9-66f6558be242] succeeded in 0.04292269051074982s: TradeDate 600020 600021
0 2020-01-01 NaN NaN
1 2020-01-02 NaN NaN
2 2020-01-03 NaN NaN
3 2020-01-06 NaN NaN
4 2020-01-07 0.5 0.0
5 2020-01-08 0.5 0.0
6 2020-01-09 0.0 0.5
7 2020-01-10 0.0 0.5
8 2020-01-13 0.0 0.5
9 2020-01-14 0.0 0.5
10 2020-01-15 0.5 0.0
11 2020-01-16 0.5 0.0
12 2020-01-17 0.5 0.0
13 2020-01-20 0.5 0.0
14 2020-01-21 0.0 0.5
15 2020-01-22 0.5 0.0
16 2020-01-23 0.5 0.0
17 2020-01-24 0.5 0.0
18 2020-01-27 0.5 0.0
19 2020-01-28 0.0 0.5
20 2020-01-29 0.0 0.5
21 2020-01-30 0.0 0.5
22 2020-01-31 0.0 0.5

[2022-11-11 00:12:45,054: INFO/ForkPoolWorker-1] Task tasks.get_alpha1[cd830360-e866-4850-aba0-3a07e8738f78] succeeded in 0.06510275602340698s: TradeDate 600022 600023
0 2020-01-01 NaN NaN
1 2020-01-02 NaN NaN
2 2020-01-03 NaN NaN
3 2020-01-06 NaN NaN
4 2020-01-07 0.0 0.0
5 2020-01-08 0.0 0.0
6 2020-01-09 0.0 0.0
7 2020-01-10 0.0 0.0
8 2020-01-13 0.0 0.0
9 2020-01-14 0.0 0.0
10 2020-01-15 0.0 0.5
11 2020-01-16 0.0 0.0
12 2020-01-17 0.0 0.5
13 2020-01-20 0.5 0.0
14 2020-01-21 0.5 0.0
15 2020-01-22 0.5 0.0
16 2020-01-23 0.5 0.0
17 2020-01-24 0.0 0.5
18 2020-01-27 0.0 0.0
19 2020-01-28 0.5 0.0
20 2020-01-29 0.5 0.0
21 2020-01-30 0.5 0.0
22 2020-01-31 0.5 0.0

在任务执行结束后,我们也可以查看存储在 redis 中任务执行结果的相应信息。

注:在未启动 Celery 框架终端时我们也可以向 Celery 框架发送异步任务调用请求,但此时由于未启动 ​​worker​​​ 端不能查看任务执行的状况和结果,只能返回一个任务 ​​id​​。


4. 总结

本教程着重讲述了如何将 DolphinDB 引入到传统因子计算平台中来解决传统因子平台的性能瓶颈问题。在经过实际测试后,我们将 Celery 框架的任务异步调用的优势和 DolphinDB 计算存储一体的强大性能优势结合起来,为实际生产过程提供了一个解决案例。

同时,由于篇幅有限,涉及到 SQL Server, dataX, DolphinDB 和 Celery 框架的一些其它操作未能更进一步展示,用户在使用过程中需要按照实际情况进行调整。也欢迎大家对本教程中可能存在的纰漏和缺陷批评指正。


附件

用 Kafka + DolphinDB 实时计算K线

Kafka 是一个高吞吐量的分布式消息中间件,可用于海量消息的发布和订阅。

当面对大量的数据写入时,以消息中间件接收数据,然后再批量写入到时序数据库中,这样可以将消息中间件的高并发能力和时序数据库的高吞吐量联合起来,更好地解决海量数据的实时处理和存储问题。

本篇教程,我们会向大家详细介绍 DolphinDB Kafka 插件的使用方式,并以一个“DolphinDB + Kafka 实时计算k线”的案例,向大家展示 DolphinDB Kafka 插件的最佳实践指南。

1. DolphinDB Kafka 插件介绍

DolphinDB Kafka 插件支持把 DolphinDB 中生产的数据推送到 Kafka,也支持从 Kafka订阅数据,并在DolphinDB中消费。用户可以在 DolphinDB 中实例化 Producer 对象,把 DolphinDB 中的数据同步到 Kafka 中指定的 Topic。用户也可以在 DolphinDB 中实例化 Consumer 对象,将 Kafka 中指定 Topic 的数据同步到 DolphinDB。DolphinDB Kafka 插件目前支持以下数据类型的序列化和反序列化:


  • DolphinDB 标量
  • Kafka Java API 的内置类型:String(UTF-8) , Short , Integer , Long , Float , Double , Bytes , byte[] 以及 ByteBuffer
  • 以上数据类型所组成的向量

用

Kafka 插件目前支持版本:​​relsease200​​​, ​​release130​​。本教程基于 Kafka Plugin release200 开发,请使用 DolphinDB 2.00.X 版本 server 进行相关测试。若需要测试其它版本 server,请切换至相应插件分支下载插件包进行测试。


2. 基本使用介绍

2.1 安装 DolphinDB Kafka 插件

用户可根据 DolphinDB server 版本和操作系统下载对应的已经编译好的插件文件,​​官方下载链接​​​。手动编译安装可以参考官网文档教程:​​DolphinDB Kafka 插件官方教程​​。

以 Linux 为例,下载好插件文件后需要添加动态库地址到环境变量中,注意插件安装的路径​<PluginDir>​​,需要根据实际环境修改,本例中插件的安装路径为 ​/DolphinDB/server/plugins/kafka​,执行命令如下:

export LD_LIBRARY_PATH="LD_LIBRARY_PATH:/DolphinDB/server/plugins/kafka"

2.2 使用 DolphinDB Kafka Producer

语法

kafka::producer(config)

  • config:字典类型,表示DolphinDB Kafka Producer 的配置。字典的键是一个字符串,值是一个字符串或布尔值。有关 Kafka 配置的更多信息,请参阅 ​​配置参数列表​​。

该函数调用后,会根据指定配置创建一个 Kafka Producer 实例,并返回句柄。

kafka::produce(producer, topic, key, value, json, [partition])

  • producer:Kafka 生产者的句柄
  • topic:Kafka 的主题
  • key:Kafka 生产者配置字典的键
  • value:Kafka 生产者配置字典的值
  • json:表示是否以 json 格式传递数据
  • partition:可选参数,整数,表示 Kafka 的 broker 分区

该函数调用后会,可以把 DolphinDB 中的数据同步到 Kafka 中指定的 Topic。

下面通过例子,展示如何实时同步 DolphinDB 流数据表 ​KafkaTest 中的增量数据到 Kafka 的 ​dolphindb-producer-test​ Topic 中。

用


DolphinDB 中创建 Producer 实例

DolphinDB GUI 连接 DolphinDB 节点后执行以下脚本,加载 DolphinDB Kafka 插件:

tryloadPlugin("/DolphinDB/server/plugins/kafka/PluginKafka.txt") catch(ex)print(ex)

注意:
本例中插件的安装路径为 ​​/DolphinDB/server/plugins/kafka​,用户需要根据自己实际环境进行修改。

每次启动 DolphinDB 服务后,只需手动加载一次即可。也可以设置为自动加载,参考教程:​​自动加载插件教程​​。

DolphinDB GUI 中执行以下脚本, 创建 Producer 实例,注意需要根据实际环境配置 metadata.broker.list 参数:

producerCfg = dict(STRING, ANY)
producerCfg["metadata.broker.list"] = "192.193.168.4:9092"
producer = kafka::producer(producerCfg)

模拟测试数据生成

DolphinDB GUI 中执行以下脚本,模拟测试数据生成:

share streamTable(take(1, 86400) as id, 2020.01.01T00:00:00 + 0..86399 as datetime, rand(1..100, 86400) as val) as `kafkaTest

测试数据共有 86400 行,包含三列:id (INT 类型), datetime(DATETIME 类型)和 val(INT 类型),如下表所示

用

Kafka 创建 Topic : dolphindb-producer-test

使用 Kafka 集群自带的 kafka-topics.sh 终端命令创建 Topic:

bin/kafka-topics.sh --create --topic dolphindb-producer-test --bootstrap-server 192.193.168.4:9092

控制台输出结果:

Created topic dolphindb-producer-test.

DolphinDB 流数据表中的数据同步至 Kafka

DolphinDB GUI 中执行以下脚本,声明自定义流数据表订阅的处理函数:

def sendMsgToKafkaFunc(producer, msg)
try
kafka::produce(producer, "dolphindb-producer-test", 1, msg, true)

catch(ex)
writeLog("[Kafka Plugin] Failed to send msg to kafka with error:" +ex)

DolphinDB GUI 中执行以下脚本,订阅 DolphinDB 的流数据表 ​kafkaTest​​,处理函数是 sendMsgToKafkaFunc,将流数据表内的增量数据实时推送到 Kafka 的 ​dolphindb-producer-test​ Topic 中:

subscribeTable(tableName="kafkaTest", actinotallow="sendMsgToKafka", offset=0, handler=sendMsgToKafkaFuncproducer, msgAsTable=true, recnotallow=true)

验证数据

使用 kafka-console-consumer 命令行工具消费 Topic 为 ​dolphindb-producer-test​ 中的数据。

执行下述语句,首先会输出流数据表中的历史数据,往流数据表中插入新的数据后,kafka-console-consumer 会立即输出新增的数据:

./bin/kafka-console-consumer.sh --bootstrap-server 192.193.168.4:9092 --from-beginning --topic dolphindb-producer-test

控制台会打印已消费的数据,输出结果如下:

... "id":[1,1,...],"datetime":["2020.01.01T10:55:12","2020.01.01T10:55:13",...],"val":[73,74,...] "id":[1,1,...],"datetime":["2020.01.01T23:55:12","2020.01.01T23:55:13",...],"val":[88,1,...] ...

接下来在 DolphinDB GUI 中执行以下脚本,往流数据表 ​kafkaTest 中新插入两条数据:

insert into kafkaTest values(2,now(),rand(1..100)) insert into kafkaTest values(2,now(),rand(1..100))

控制台输出结果:

"id":[2],"datetime":["2022.08.16T11:08:27"],"val":[23]
"id":[2],"datetime":["2022.08.16T11:10:42"],"val":[11]

由此验证 DolphinDB Kafka Producer 生产数据的完整性和正确性。


2.3 使用 DolphinDB Kafka Consumer

语法

kafka::consumer(config)

  • config:字典类型,表示 Kafka 消费者的配置。字典的键是一个字符串,值是一个元组。有关 Kafka 配置的更多信息, 请参考 ​​Kafka 使用手册​​。

该函数调用后,会根据指定配置创建一个 Kafka Consumer 实例,并返回句柄。

下面通过例子,展示如何在 DolphinDB 中订阅消费 kafka 中 Topic 为 ​dolphindb-consumer-test​​ 的数据,将其实时同步到流数据表 ​KafkaTest 中。

用

DolphinDB 中创建 Consumer 实例

DolphinDB GUI 中执行以下脚本, 创建 Consumer 实例:

consumerCfg = dict(string, any)
consumerCfg["metadata.broker.list"] = "192.193.168.4:9092"
consumerCfg["group.id"] = "test"
consumer = kafka::consumer(consumerCfg)

DolphinDB 中消费数据

DolphinDB GUI 中执行以下脚本,创建一张共享内存表 ​kafkaTest​:

share table(1:0,`id`timestamp`val,[INT,TIMESTAMP,INT]) as `kafkaTest

DolphinDB GUI 中执行以下脚本,订阅 Kafka 中的 ​dolphindb-consumer-test​ 主题的数据:

topics = ["dolphindb-consumer-test"]
kafka::subscribe(consumer, topics)

注意:订阅函数支持传入一个string类型的向量,实现同时订阅多个topic

DolphinDB GUI 中执行以下脚本,定义多线程轮询处理消费队列:

def parse(x) 
dict = parseExpr(x).eval()
return table(dict[`id] as `id, dict[`timestamp] as `datetime, dict[`val] as `val)


conn = kafka::createSubJob(consumer, kafkaTest, parse, "kafka consumer")

DolphinDB GUI 中执行以下脚本,查看订阅状态:

kafka::getJobStat()

返回:

subscriptionId

user

description

createTimestamp

80773376

admin

kafka consumer

2022.08.19T06:46:06.072

验证数据

使用 kafka-console-producer 终端工具,在控制台输入消息生产到 kafka:

./bin/kafka-console-producer.sh --bootstrap-server 192.193.168.4:9092 --topic dolphindb-consumer-test

控制台输入消息:

"id":1001,"timestamp":1660920813123,"val":1000
"id":1001,"timestamp":1660920814123,"val":2000
"id":1001,"timestamp":1660920815123,"val":3000

用

通过 DolphinDB GUI 查看流数据表中的结果,如下图所示:

用

由此验证 DolphinDB Kafka Consumer 消费数据的完整性和正确性,消费吞吐量相关的信息见第四章。

DolphinDB GUI 中执行以下脚本,取消订阅:

kafka::cancelSubJob(conn)

3. 通过 Kafka 插件实时计算K线

3.1 环境准备

  • 部署 DolphinDB 集群,版本为​v2.00.7​。


  • 部署 Kafka 集群,版本为 ​2.13-3.1.0​。


3.2 生产数据

本节通过 DolphinDB 的 ​​replay 历史数据回放工具​​和 Kafka 插件,把逐笔成交数据实时发送到 Kafka 中。

Kafka 创建 Topic :topic-message

使用 Kafka 集群自带的 kafka-topics.sh 终端命令创建 Topic:

./bin/kafka-topics.sh --create --topic topic-message --bootstrap-server 192.193.168.4:9092

控制台输出结果:

Created topic topic-message.

加载 Kafka 插件并创建 Kafka Producer

DolphinDB GUI 连接 DolphinDB 节点后执行以下脚本:

// 加载插件
path = "/DolphinDB/server/plugins/kafka"
loadPlugin(path + "/PluginKafka.txt")
loadPlugin("/DolphinDB/server/plugins/kafka/PluginEncoderDecoder.txt");

// 定义创建 Kafka Producer 的函数
def initKafkaProducerFunc(metadataBrokerList)
producerCfg = dict(STRING, ANY)
producerCfg["metadata.broker.list"] = metadataBrokerList
return kafka::producer(producerCfg)


// 创建 Kafka Producer 并返回句柄
producer = initKafkaProducerFunc("192.193.168.5:8992")


  • 本例中插件的安装路径为 ​/DolphinDB/server/plugins/kafka​,用户需要根据自己实际环境进行修改。
  • 推荐 Kafka server 和 DolphinDB Server 在同一网段中。

推送数据到 Kafka Topic

DolphinDB GUI 中执行以下脚本:

// 定义推送数据到 KafKa "topic-message" Topic 的函数
def sendMsgToKafkaFunc(dataType, producer, msg)
startTime = now()
try
for(i in msg)
kafka::produce(producer, "topic-message", 1, i, true)

cost = now() - startTime
writeLog("[Kafka Plugin] Successed to send " + dataType + " : " + msg.size() + " rows, " + cost + " ms.")

catch(ex) writeLog("[Kafka Plugin] Failed to send msg to kafka with error: " +ex)


// 创建 DolphinDB 流数据表 tickStream
colName = `SecurityID`TradeTime`TradePrice`TradeQty`TradeAmount`BuyNum`SellNum`TradeIndex`ChannelNo`TradeBSFlag`BizIndex
colType = [SYMBOL, TIMESTAMP, DOUBLE, INT, DOUBLE, INT, INT, INT, INT, SYMBOL, INT]
share(streamTable(35000000:0, colName, colType), `tickStream)

// 订阅 tickStream,处理函数是 sendMsgToKafkaFunc
subscribeTable(tableName="tickStream", actionName="sendMsgToKafka", offset=-1, handler=sendMsgToKafkaFunc`tick, producer, msgAsTable=true, reconnect=true, batchSize=10000,throttle=1)
getHomeDir()
// 控速回放 DolphinDB 分布式中的历史数据至 tickStream
dbName = "dfs://SH_TSDB_tick"
tbName = "tick"
replayDay = 2021.12.08
testData = select * from loadTable(dbName, tbName) where date(TradeTime)=replayDay, time(TradeTime)>=09:30:00.000, time(TradeTime)<=10:00:00.000 order by TradeTime, SecurityID
submitJob("replay", "replay", replay, testData, objByName("tickStream"), `TradeTime, `TradeTime, 2000, true, 4)

kafka::produce 函数会将任意表结构的 msg 以 json 格式发送至指定的 Kafka topic。此处的 ​​writeLog​

可以使用 kafka-console-consumer 命令行工具消费 Topic 为 ​topic-message​ 中的数据,验证数据是否成功写入:

./bin/kafka-console-producer.sh --bootstrap-server 192.193.168.4:9092 --from-beginning --topic topic-message

3.3 消费数据

创建消费者,主题并进行订阅

DolphinDB GUI 中执行以下脚本:

// 创建 Kafka Consumer 并返回句柄
consumerCfg = dict(STRING, ANY)
consumerCfg["metadata.broker.list"] = "192.193.168.5:8992"
consumerCfg["group.id"] = "topic-message"
consumer = kafka::consumer(consumerCfg)

// 订阅 Kafka 中的 "topic-message" 主题的数据
topics = ["topic-message"]
kafka::subscribe(consumer, topics);

DolphinDB 订阅 Kafka消息队列中数据

DolphinDB GUI 中执行以下脚本:

// 订阅 Kafka 发布消息,写入流表 tickStream_kafka
colName = `SecurityID`TradeTime`TradePrice`TradeQty`TradeAmount`BuyNum`SellNum`TradeIndex`ChannelNo`TradeBSFlag`BizIndex
colType = [SYMBOL, TIMESTAMP, DOUBLE, INT, DOUBLE, INT, INT, INT, INT, SYMBOL, INT]
share(streamTable(35000000:0, colName, colType), `tickStreamkafka)
go

// Kafka 消息解析函数
def parse(mutable dictVar, mutable tickStreamkafka)
try
t = dictVar
t.replaceColumn!(`TradeTime, temporalParse(dictVar[`TradeTime],"yyyy.MM.ddTHH:mm:ss.SSS"))
tickStreamkafka.append!(t);
catch(ex)
print("kafka errors : " + ex)



colType[1] = STRING;
decoder = EncoderDecoder::jsonDecoder(colName, colType, parse, tickStreamkafka, 15, 100000, 0.5)

// 创建 subjob 函数
conn = kafka::createSubJob(consumer, , decoder, "topic-message")

3.4 流计算引擎实时计算 K 线

使用 DolphinDB 内置​​流计算引擎​​​计算分钟 K 线,并将结果输出到名为 ​OHLCVwap​的结果表中。

DolphinDB GUI 中执行以下脚本:

// 创建接收实时计算结果的流数据表
colName = `TradeTime`SecurityID`OpenPrice`HighPrice`LowPrice`ClosePrice`Vwap
colType = [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE]
share(streamTable(2000000:0, colName, colType), `OHLCStream)

// K 线指标计算元表达式
aggrMetrics = <[ first(TradePrice), max(TradePrice), min(TradePrice), last(TradePrice), wavg(TradePrice, TradeQty) ]>

// 创建引擎并将 kafka 中订阅的数据注入流计算引擎
createTimeSeriesEngine(name="OHLCVwap", windowSize=60000, step=60000, metrics=aggrMetrics, dummyTable=objByName("tickStreamkafka"), outputTable=objByName("OHLCStream"), useSystemTime=true, keyColumn=`SecurityID, useWindowStartTime=false)
subscribeTable(tableName="tickStreamkafka", actionName="OHLCVwap", offset=-1, handler=getStreamEngine("OHLCVwap"), msgAsTable=true, batchSize=1000, throttle=1, hash=0)

  • 设置参数 offset 为 - 1,订阅将会从提交订阅时流数据表的当前行开始。
  • 设置 useSystemTime=true,表示时间序列引擎会按照数据注入时间序列引擎的时刻(毫秒精度的本地系统时间,与数据中的时间列无关),每隔固定时间截取固定长度窗口的流数据进行计算。


4. 性能测试

4.1 硬件环境

类型

配置

CPU

Intel(R) Xeon(R) Gold 5220R CPU @ 2.20GHz

内存

512 GB

网络带宽

10 Gbps

硬盘

SSD (500 MB/s 读写)

4.2 软件环境

  • DolphinDB:2.00.7
  • 内核版本: Linux 3.10.0-1160.el7.x86_64
  • 操作系统版本:CentOS Linux 7 (Core)
  • Kafka版本:2.13-3.1.0
  • JDK:1.8

4.3 测试结果

测试数据表结构如下:

列名

DolphinDB 数据类型

deviceId

SYMBOL

timestamps

TIMESTAMP

deviceType

SYMBOL

value

DOUBLE

pluginSendTime

TIMESTAMP

pluginReceived

TIMESTAMP

测试结果如下:

数据量

耗时(s)

RPS

吞吐(MB/s)

100W

8

12 万

38

500W

45.2

11 万

37

1000W

92.1

11 万

37

测试结果说明

  • 测试环境为生产级别的常用配置,目的是降低用户选型评估成本
  • 测试结果为执行10次取平均值
  • 指标 RPS 是指每秒消费的记录数


4.4 测试流程

相关说明

  • 启动测试前清空所有数据。
  • 每次测试先把所有数据写入 Kafka,再加载 Kafka 插件同步数据到 DolphinDB中。目的是将同步数据的压力全部集中到 Kafka 插件。
  • 以Kafka插件从收到第一批数据到收到最后一批数据的时间差作为同步数据的总耗时。

测试流程

  • 加载 Kafka 插件并创建 Kafka Producer 发送数据到 Kafka 中(以发送100万条数据为例)

DolphinDB GUI 连接 DolphinDB 执行以下脚本,本例中插件的安装路径为 ​​/DolphinDB/server/plugins/kafka​​,用户需要根据自己实际环境进行修改:

// 加载插件
try
loadPlugin("/DolphinDB/server/plugins/kafka/PluginKafka.txt")
loadPlugin("/DolphinDB/server/plugins/kafka/PluginEncoderDecoder.txt")
catch(ex)print(ex)

// 创建 Producer
producerCfg = dict(STRING, ANY);
producerCfg["metadata.broker.list"] = "192.193.168.5:8992";
producer = kafka::producer(producerCfg);
kafka::producerFlush(producer);

//向kafka传100万数据
tbl = table("R5L1B3T1N03D01" as deviceId, "2022-02-22 13:55:47.377" as timestamps, "voltage" as deviceType , 1.5 as value )


// 创建 Consume
consumerCfg = dict(STRING, ANY)

consumerCfg["group.id"] = "test10"
consumerCfg["metadata.broker.list"] = "192.193.168.5:8992";

for(i in 1..1000000)
aclMsg = select *, string(now()) as pluginSendTime from tbl;
for(i in aclMsg)
kafka::produce(producer, "test3", "1", i, true);



consumer = kafka::consumer(consumerCfg)
topics=["test10"];
kafka::subscribe(consumer, topics);

for(i in 1..1000000)
aclMsg = select *, string(now()) as pluginSendTime from tbl;
for(i in aclMsg)
kafka::produce(producer, "test10", "1", i, true);

  • 订阅 Kafka 中数据进行消费

// 创建存储解析完数据的表
colDefName = ["deviceId","timestamps","deviceType","value", "pluginSendTime", "pluginReceived"]

colDefType = [SYMBOL,TIMESTAMP,SYMBOL,DOUBLE,TIMESTAMP,TIMESTAMP]
dest = table(1:0, colDefName, colDefType);
share dest as `streamZd

// 解析函数
def temporalHandle(mutable dictVar, mutable dest)
try
t = dictVar
t.replaceColumn!(`timestamps, temporalParse(dictVar[`timestamps],"yyyy-MM-dd HH:mm:ss.SSS"))
t.replaceColumn!(`pluginSendTime, timestamp(dictVar[`pluginSendTime]))
t.update!(`received, now());
dest.append!(t);
catch(ex)
print("kafka errors : " + ex)



// 创建 decoder
name = colDefName[0:5];
type = colDefType[0:5];
type[1] = STRING;
type[4] = STRING;
decoder = EncoderDecoder::jsonDecoder(name, type, temporalHandle, dest, 15, 100000, 0.5)

// 创建subjob函数
kafka::createSubJob(consumer, , decoder, `DecoderKafka)

此时我们观察共享流表的数据量,当达到 100 万条时说明消费完成,测试结束。


5. Kerberos 认证

5.1 什么是 Kerberos ?

Kerberos 是一种基于加密 Ticket 的身份认证协议,支持双向认证且性能较高。主要有三个组成部分:Kdc, Client 和 Service。

生产环境的 Kafka 一般需要开启 Kerberos 认证,为 Kafka 提供权限管理,提高安全性。

用

5.2 前置条件

  • Java 8+
  • kerberos:包括 Kdc 和 Client
  • keytab 证书


5.3 认证相关配置说明

环境相关配置说明

以下是 Kerberos 认证涉及的关键配置信息,具体配置文件的路径根据实际情况调整

  1. 安装 kdc

yum install -y krb5-server krb5-libs krb5-workstation krb5-devel krb5-auth-dialog

2. 配置 /etc/krb5.conf

[realms]
HADOOP.COM =
kdc = cnserver9:88
admin_server = cnserver9:749
default_domain = HADOOP.COM

3. 配置 /var/kerberos/krb5kdc/kadm5.acl

*/admin@HADOOP.COM  *

4. 创建生成 kdc 数据库文件

sudo kdb5_util create -r HADOOP.COM –s

5. 启动 kerberos 服务

sudo systemctl start krb5kdc
sudo systemctl status krb5kdc

6. 安装 kerberos 客户端

yum install -y krb5-devel krb5-workstation krb5-client

7. 启动 kerberos 客户端

sudo kadmin.local -q "addprinc hadoop/admin"

DolphinDB Kafka Plugin 配置说明


  • 关键配置参数说明
  • security.protocol:指定通信协议
  • sasl.kerberos.service.name:指定 service 名称
  • sasl.mechanism:SASL机制,包括 GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER
  • sasl.kerberos.keytab:keytab 文件的路径
  • sasl.kerberos.principal:指定 principal


  • 具体代码实现

// 加载插件
tryloadPlugin("/path/to/DolphinDBPlugin/kafka/bin/linux/PluginKafka.txt") catch(ex)print(ex)

// 生产者配置
producerCfg = dict(STRING, ANY);
producerCfg["bootstrap.servers"] = "cnserver9:9992";
producerCfg["security.protocol"] = "SASL_PLAINTEXT";
producerCfg["sasl.kerberos.service.name"] = "kafka";
producerCfg["sasl.mechanism"] = "GSSAPI";
producerCfg["sasl.kerberos.keytab"] = "/home/test/hadoop.keytab";
producerCfg["sasl.kerberos.principal"] = "kafka/cnserver9@HADOOP.COM";
producer = kafka::producer(producerCfg);

// 消费者配置
consumerCfg = dict(STRING, ANY)
consumerCfg["group.id"] = "test"
consumerCfg["bootstrap.servers"] = "cnserver9:9992";
consumerCfg["security.protocol"] = "SASL_PLAINTEXT";
consumerCfg["sasl.kerberos.service.name"] = "kafka";
consumerCfg["sasl.mechanism"] = "GSSAPI";
consumerCfg["sasl.kerberos.keytab"] = "/home/test/hadoop.keytab";
consumerCfg["sasl.kerberos.principal"] = "kafka/cnserver9@HADOOP.COM";
consumer = kafka::consumer(consumerCfg)

注意:适配 Kerberos 认证只需修改 Kafka 插件有关生产者和消费者的配置即可,其余脚本无需改动。

6. 其他说明

本教程展示了 DolphinDB Kafka Plugin 中常用的接口函数,完整的函数支持请参考官网文档:​​DolphinDB Kafka 插件官方教程​

使用过程中如果遇到任何问题,欢迎大家在项目仓库反馈

7. 参考链接

以上是关于用 DolphinDB 和 Python Celery 搭建一个高性能因子计算平台的主要内容,如果未能解决你的问题,请参考以下文章

DolphinDB +Python Airflow 高效实现数据清洗

干货丨Orca对DolphinDB分布式表的操作

如何高效处理面板数据

干货附代码|大数据分析语言DolphinDB脚本语言概述

为啥 dolphindb 脚本中的函数无法访问外部范围内的变量

dolphindb 是不是支持加权最小二乘回归“wls”?