干货附代码|大数据分析语言DolphinDB脚本语言概述

Posted 雷课

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了干货附代码|大数据分析语言DolphinDB脚本语言概述相关的知识,希望对你有一定的参考价值。

开发大数据应用,不仅需要能支撑海量数据的分布式数据库,能高效利用多核多节点的分布式计算框架,更需要一门能与分布式数据库和分布式计算有机融合、高性能易扩展、表达能力强、满足快速开发和建模需要的编程语言。DolphinDB从流行的Python和SQL语言汲取了灵感,设计了大数据处理脚本语言。

干货附代码|大数据分析语言DolphinDB脚本语言概述

提到数据库语言,我们很容易想到标准的SQL语言。不同于标准的SQL,DolphinDB编程语言功能齐全,表达能力非常强大,完美支持命令式编程、向量化编程、函数话编程、SQL编程、远程过程调用编程(RPC)和元编程等多种编程范式。DolphinDB编程语言的语法和表达习惯与Python和SQL非常相似,只要对Python和SQL有一定的了解,就能轻松掌握。相对而言,掌握内存时序数据库kdb+的q语言难度要大得多。


DolphinDB的编程语言能够满足数据科学家快速开发和建模的需求。DolphinDB语言简洁灵活,表达能力强,大大提高了数据科学家的开发效率。DolphinDB支持向量化计算和分布式计算,具有极快的运行速度。下面将详细介绍DolphinDB编程语言的独特之处。

干货附代码|大数据分析语言DolphinDB脚本语言概述


一.命令式编程

与主流的脚本语言Python、JS等,还有强类型语言C、C++、Java等一样,DolphinDB也支持命令式编程。命令式编程是指通过执行一条一条的语句,实现最终目标。DolphinDB的命令式编程主要是用作上层模块的处理和调度。在大数据分析中,由于需要处理的数据量非常庞大,如果我们采用命令式编程逐行处理数据,效率会十分低下,性能也会有所下降。因此,我们推荐在DolphinDB中使用其他编程方式来批量处理数据。

//DolphinDB支持对单变量和多变量进行赋值

x = 1 2 3

y = 4 5

y += 2

x, y = y, x //swap the value of x and y

x, y =1 2 3, 4 5


// 1到100累加求和

s = 0

for(x in 1:101) s += x

print s


//数组中的元素求和

s = 0;

for(x in 1 3 5 9 15) s += x

print s


//打印矩阵每一列的均值

m = matrix(1 2 3, 4 5 6, 7 8 9)

for(c in m) print c.avg()


//计算product表中每一个产品的销售额

t= table(["TV set", "Phone", "PC"] as productId, 1200 600 800 as price, 10 20 7 as qty)

for(row in t) print row.productId + ": " + row.price * row.qty

干货附代码|大数据分析语言DolphinDB脚本语言概述




向量化编程

干货附代码|大数据分析语言DolphinDB脚本语言概述


跟matlab、R等编程语言一样,Dolphin DB 也支持向量化编程。前面提到的kdb+数据库的q语言也是向量处理语言,它在复杂的计算上表现出很好的性能,并且效率很高。DolphinDB的编程语言对很多算法都进行了优化,比如对时间序列数据计算滑动窗口指标,大大提高了向量函数的效率。


//两个长度为1000万的向量相加,采用向量化编程比命令式编程的for语句更加简洁,耗耗时更短。

n = 10000000

a = rand(1.0, n)

b = rand(1.0, n)


//采用for语句编程,需要12秒

c = array(DOUBLE, n)

for(i in 0 : n)

c[i] = a[i] + b[i]

Time elapsed: 12341.043 ms


//采用向量化编程,仅需36毫秒

c = a + b

Time elapsed: 36.901 ms

向量化编程通常是把整个向量加载到连续内存中。有时候因为内存碎片,没有找到连续内存,向量就不可用了。DolphinDB针对这个问题,特意提供了big array数据类型。big array可以把物理上不连续的内存块组成逻辑上连续的向量,即使是非常大的向量,也能在DolphinDB中使用,提高了系统的可用性。

干货附代码|大数据分析语言DolphinDB脚本语言概述



3

函数化编程

干货附代码|大数据分析语言DolphinDB脚本语言概述


DolphinDB支持函数化编程的大部分功能,包括纯函数、自定义函数、λ函数、高阶函数、部分应用和闭包。DolphinDB内置了400多个函数,涵盖了各种数据类型、数据结构和系统调用。


DolphinDB的纯函数特性减少了函数的副作用。在自定义函数时,DolphinDB不能使用函数体外定义的变量。纯函数特性可以大幅度提高代码可读性和软件质量。


3.1 自定义函数


//定义一个函数返回工作日

def getWorkDays(dates){

return dates[def(x):weekday(x) between 1:5]

}


getWorkDays(2018.07.01 2018.08.01 2018.09.01 2018.10.01)


[2018.08.01, 2018.10.01]


上面的例子定义一个函数getWorkDays,该函数受一组日期,返回并返回在周一和周五之间的日期。函数的实现采用了向量的过滤功能,也就是接受一个布尔型单目函数用于数据的过滤。


3.2 高阶函数


下面的一个例子我们使用三个高阶函数pivot、each和cross,干净利落的用三行代码,根据股票日内tick级别的报价数据,计算出两两之间的相关性。


//模拟生成10000000万个数据点(股票代码,交易时间和价格)

n=10000000

syms = rand(`FB`GOOG`MSFT`AMZN`IBM, n)

time = 09:30:00.000 + rand(21600000, n)

price = 500.0 + rand(500.0, n)


//利用pivot函数生成透视表

priceMatrix = pivot(avg, price, time.minute(), syms)

//each和ratios函数的配合使用,为每个股票(矩阵的列)生成每分钟的回报序列

retMatrix = each(ratios, priceMatrix) - 1

//cross和corr函数的配合使用,计算股票两两之间的相关性

corrMatrix = cross(corr, retMatrix, retMatrix)


AMZN      FB        GOOG      IBM       MSFT

--------- --------- --------- --------- ---------

AMZN|1         0.015181  -0.056245 0.005822  0.084104

FB  |0.015181  1         -0.028113 0.034159  -0.117279

GOOG|-0.056245 -0.028113 1         -0.039278 -0.025165

IBM |0.005822  0.034159  -0.039278 1         -0.049922

MSFT|0.084104  -0.117279 -0.025165 -0.049922 1


3.3 部分应用


高阶函数中的函数参数通常对参数有限制,通过部分应用,可以确保参数符合要求。例如,给定一个向量 a = 12 14 18,计算与矩阵中的每一列的相关性。因为要计算矩阵的每一列的相关性,当然可以使用高阶函数each。但是corr函数需要两个参数,而矩阵只提供其中的一个参数,另一个参数必须事先给定,所以部分应用可以解决这个问题。当然我们也可以用for语句来解决这个问题,但代码冗长而低效。


a = 12 14 18

m = matrix(5 6 7, 1 3 2, 8 7 11)


//使用each和部分应用计算矩阵中的每一列与给定向量a的相关性

each(corr{a}, m)


//使用for语句解决上面的问题

cols = m.columns()

c = array(DOUBLE, cols)

for(i in 0:cols)

c[i] = corr(a, m[i])


部分应用的另一个作用是使函数保持状态。例如,在流计算中,用户通常需要给定一个消息处理函数(message handler),接受一条新的信息,返回一个结果。但是我们希望消息处理函数返回的是迄今为止所有数的平均数。这个问题我们可以通过部分应用来解决。


def cumavg(mutable stat, newNum){

stat[0] = (stat[0] * stat[1] + newNum)/(stat[1] + 1)

stat[1] += 1

return stat[0]

}


msgHandler = cumavg{0.0 0.0}

each(msgHandler, 1 2 3 4 5)


[1,1.5,2,2.5,3]

干货附代码|大数据分析语言DolphinDB脚本语言概述



四.SQL编程

干货附代码|大数据分析语言DolphinDB脚本语言概述


DolphinDB的编程语言不仅支持标准的SQL,还针对时间序列数据扩展了SQL的功能,如分组计算(context by)、数据透视(pivot by)、窗口函数、asof连接和窗口连接等,更便于分析时间序列数据。单纯的SQL引擎表达能力有限,很难满足更加复杂的数据分析和算法实现,影响开发效率。在DolphinDB中,脚本语言与SQL语言是完全融合在一起的。


4.1 SQL与编程语言融合


//生成一个员工工资表

emp_wage = table(take(1..10, 100) as id, take(2017.10M + 1..10, 100).sort() as month, take(5000 5500 6000 6500, 100) as wage)


//计算给定的一组员工的平均工资。员工列表存储在一个本地变量empIds中

empIds = 3 4 6 7 9

select avg(wage) from emp_wage where id in empIds group by id

id avg_wage

-- --------

3  5500

4  6000

6  6000

7  5500

9  5500


//除计算平均工资外,同时显示员工的姓名。员工姓名使用一个字典empName来获取。

empName = dict(1..10, `Alice`Bob`Jerry`Jessica`Mike`Tim`Henry`Anna`Kevin`Jones)

select empName[first(id)] as name, avg(wage) from emp_wage where id in empIds group by id

id name    avg_wage

-- ------- --------

3  Jerry   5500

4  Jessica 6000

6  Tim     6000

7  Henry   5500

9  Kevin   5500


上面的例子,SQL语句的where子句和select子句分别用到了上下文中定义的数组和字典,使得本来需要通过子查询和多表联结来解决的问题,通过简单的hash table解决了。如果SQL涉及到分布式数据库,这些上下文变量会自动序列化到需要的节点。这不仅让代码看上去更简洁,有更好的可读性,而且提升了性能。在大数据分析中,很多数据表关联,即使SQL优化器做了很多优化,也难免带来性能问题。


4.2 context by

             ——对面板数据的友好支持


DolphinDB提供了类似其他数据库系统的window function——context by。但是与window function相比,context by的语法更简洁,并且没有那么多限制,可以与select或update一起使用。



//按股票代码进行分组,计算每个股票每天的回报。假设数据是时间顺序排列的。

update trades set ret = ratios(price) - 1.0 context by sym


//按日期进行分组,计算每天每个股票的ret降序排名。

select date, symbol,  ret, rank(ret, false) + 1 as rank from trades where isValid(ret) context by date


//选择每天ret排名前10的股票

select date, symbol, ret from trades where isValid(ret) context by date having rank(ret, false) < 10


4.3 asof join和window join

       ——对时序数据的友好支持


t1 =  table(09:30m 09:31m 09:33m 09:34m as minute, 29.2 28.9 29.3 30.1 as price)

t2 =  table(09:30m 09:31m 09:34m 09:36m as minute,  51.2 52.4 51.9 52.8 as price)

select * from aj(t1, t2, `minute)


minute price t2_minute t2_price

------ ----- --------- --------

09:30m 29.2  09:30m    51.2

09:31m 28.9  09:31m    52.4

09:33m 29.3  09:31m    52.4

09:34m 30.1  09:34m    51.9


上面的例子中,t2中没有与09:33m、09:34m对应的记录,asof join(aj)会分别取t2中在09:33m、09:34m之前最近时间对应的记录,即取t2中09:31m的记录。


p = table(1 2 3 as id, 2018.06M 2018.07M 2018.07M as month)

s = table(1 2 1 2 1 2 as id, 2018.04M 2018.04M 2018.05M 2018.05M 2018.06M 2018.06M as month, 4500 5000 6000 5000 6000 4500 as wage)

select * from wj(p, s, -3:-1,,`id`month)


id month    avg_wage

-- -------- -----------

1  2018.06M 5250

2  2018.07M 4833.333333

3  2018.07M


上面的例子说明了window join(wj)的用法。wj首先取表p第一行记录,即id=1,month=2018.06M。然后在表s中选择id=1并且month在(2018.06M-3)到(2018.06M-1),即2018.03M到2018.05M之间的记录来计算avg(wage)。因此avg_wage=(4500+6000)/2=5250。如此类推。


asof join和window join在金融分析领域有着广泛的应用。一个经典的应用是将交易表和报价表进行关联,计算个股交易成本。详情可以参考使用Window Join快速估计个股交易成本。


4.4 SQL其它扩展


为了满足大数据分析的要求,DolphinDB对SQL还做了很多扩展。比如,用户的自定义函数无需编译、打包或部署,即可在SQL中使用。又比如DolphinDB支持组合字段(Composite Column),可以将复杂分析函数的多个返回值输出到数据表的一行。


factor1=3.2 1.2 5.9 6.9 11.1 9.6 1.4 7.3 2.0 0.1 6.1 2.9 6.3 8.4 5.6

factor2=1.7 1.3 4.2 6.8 9.2 1.3 1.4 7.8 7.9 9.9 9.3 4.6 7.8 2.4 8.7

t=table(take(1 2 3, 15).sort() as id, 1..15 as y, factor1, factor2)


//在输出参数的同时,输出t统计值。使用自定义函数包装输出结果

def myols(y,x){

r=ols(y,x,true,2)

return r.Coefficient.beta join r.RegressionStat.statistics[0]

}

select myols(y,[factor1,factor2]) as `alpha`beta1`beta2`R2 from t group by id


id alpha     beta1     beta2     R2

-- --------- --------- --------- --------

1  1.063991  -0.258685 0.732795  0.946056

2  6.886877  -0.148325 0.303584  0.992413

3  11.833867 0.272352  -0.065526 0.144837



五.远程过程调用编程


干货附代码|大数据分析语言DolphinDB脚本语言概述


DolphinDB与其他系统相比,在远程过程调用(RPC)上的优势主要体现在两个方面:第一,在DolphinDB中,无论是自定义函数还是内置函数,我们都可以通过远程过程调用发送到其他节点上运行,而其他系统不能远程调用与自定义函数相关的函数。第二,DolphinDB的远程过程调用无需编译或者部署。系统会自动把相关函数定义和所需数据序列化到远程节点。数据科学家或数据分析师在编写与远程过程调用相关的函数时,不需要工程师配合编译和部署,可以直接在线使用,极大地提高了开发和分析效率。

下面的例子是使用remoteRun执行远程函数:


h = xdb("localhost", 8081)

//在远程节点上执行一段脚本

remoteRun(h, "sum(1 3 5 7)")

16


//上述远程调用也可以简写成

h("sum(1 3 5 7)")

16


//在远程节点上执行一个在远程节点注册的函数

h("sum", 1 3 5 7)

16


//在远程系节点上执行本地的自定义函数

def mysum(x) : reduce(+, x)

h(mysum, 1 3 5 7)

16


//在远程节点(localhost:8081)上创建一个共享表sales

h("share table(2018.07.02 2018.07.02 2018.07.03 as date, 1 2 3 as qty, 10 15 7 as price) as sales")

//如果本地的自定义函数有依赖,依赖的自定义函数也会序列化到远程节点

defg salesSum(tableName, d): select mysum(price*qty) from objByName(tableName) where date=d

h(salesSum, "sales", 2018.07.02)

40


DolphinDB还提供了与分布式计算相关的函数。mr和imr分别用于开发基于map-reduce和迭代的map-reduce分布式算法。用户只需要指定分布式数据源和定制的核心函数,譬如map函数,reduce函数,final函数等。下面我们先创建一个分布式表,添加一些模拟数据,然后演示开发计算中位数和线性回归的例子。


//模拟生成分布式表sample,用id分区

//y = 0.5 + 3x1 -0.5x2

n=10000000

x1 = pow(rand(1.0,n), 2)

x2 = norm(3.0:1.0, n)

y = 0.5 + 3 * x1 - 0.5*x2 + norm(0.0:1.0, n)

t=table(rand(10, n) as id, y, x1, x2)


login(`admin,"123456")

db = database("dfs://testdb", VALUE, 0..9)

db.createPartitionedTable(t, "sample", "id").append!(t)


利用自定义的map函数myOLSMap,内置的reudce函数加函数(+),自定义的final函数myOLSFinal,以及内置的map-reduce框架函数mr,快速构建了一个在分布式数据源上运行线性回归的函数myOLSEx。


def myOLSMap(table, yColName, xColNames){

x = matrix(take(1.0, table.rows()), table[xColNames])

xt = x.transpose();

return xt.dot(x), xt.dot(table[yColName])

}


def myOLSFinal(result){

xtx = result[0]

xty = result[1]

return xtx.inv().dot(xty)[0]

}


def myOLSEx(ds, yColName, xColNames){

return mr(ds, myOLSMap{, yColName, xColNames}, +, myOLSFinal)

}


//使用自己开发的分布式算法和分布式数据源计算线性回归系数

sample = loadTable("dfs://testdb", "sample")

myOLSEx(sqlDS(


下面这个例子,我们构造一个算法,在分布式数据源上计算一组数据的近似中位数。算法的基本原理是利用bucketCount函数,在每一个节点上分别计算一组bucket内的数据个数,然后把各个节点上的数据累加。这样我们可以找到中位数应该落在哪个区间内。如果这个区间不够小,进一步细分这个区间,直到小于给定的精度要求。中位数的算法需要多次迭代,我们因此使用了迭代计算框架imr。


def medMap(data, range, colName): bucketCount(data[colName], double(range), 1024, true)


def medFinal(range, result){

x= result.cumsum()

index = x.asof(x[1025]/2.0)

ranges = range[1] - range[0]

if(index == -1)

return (range[0] - ranges*32):range[1]

else if(index == 1024)

return range[0]:(range[1] + ranges*32)

else{

interval = ranges / 1024.0

startValue = range[0] + (index - 1) * interval

return startValue : (startValue + interval)

}

}


def medEx(ds, colName, range, precision){

termFunc = def(prev, cur): cur[1] - cur[0] <= precision

return imr(ds, range, medMap{,,colName}, +, medFinal, termFunc).avg()

}


//使用自己开发的近似中位数算法,计算分布式数据的中位数。

sample = loadTable("dfs://testdb", "sample")

medEx(sqlDS(



.元编程


干货附代码|大数据分析语言DolphinDB脚本语言概述


DolphinDB支持使用元编程来动态创建表达式,如函数调用的表达式和SQL查询表达式。元编程的一个典型应用是定制报表。用户只需要输入数据表、字段名称和字段格式就能生成报表。具体实现如下:


//根据输入的数据表,字段名称和格式,以及过滤条件,动态生成SQL表达式并执行

def generateReport(tbl, colNames, colFormat, filter){

colCount = colNames.size()

colDefs = array(ANY, colCount)

for(i in 0:colCount){

if(colFormat[i] == "")

colDefs[i] = sqlCol(colNames[i])

else

colDefs[i] = sqlCol(colNames[i], format{,colFormat[i]})

}

return sql(colDefs, tbl, filter).eval()

}


//模拟生成一个100行的数据表

t = table(1..100 as id, (1..100 + 2018.01.01) as date, rand(100.0, 100) as price, rand(10000, 100) as qty)


//输入过滤条件,字段和格式,定制报表。过滤条件使用了元编程。

generateReport(t, ["id","date","price","qty"], ["000","MM/dd/yyyy", "00.00", "#,###"], < id<5 or id>95 >)


id  date       price qty

--- ---------- ----- -----

001 01/02/2018 50.27 2,886

002 01/03/2018 30.85 1,331

003 01/04/2018 17.89 18

004 01/05/2018 51.00 6,439

096 04/07/2018 57.73 8,339

097 04/08/2018 47.16 2,425

098 04/09/2018 27.90 4,621

099 04/10/2018 31.55 7,644

100 04/11/2018 46.63 8,383


DolphinDB编程语言为数据分析而生,天生具备处理海量数据的能力,功能强大,简单易用。









雷课:

       让教育更有质量,

       让教育更有想象!




干货附代码|大数据分析语言DolphinDB脚本语言概述

以上是关于干货附代码|大数据分析语言DolphinDB脚本语言概述的主要内容,如果未能解决你的问题,请参考以下文章

干货丨DolphinDB元编程教程

从 Kdb+ 到 DolphinDB

干货丨如何使用DolphinDB回放加密货币盘口与逐笔交易数据

为啥 dolphindb 脚本中的函数无法访问外部范围内的变量

干货丨时序数据库DolphinDB横截面引擎教程

干货丨Orca写数据教程