rollapply 使用 sparklyr 处理大数据
Posted
技术标签:
【中文标题】rollapply 使用 sparklyr 处理大数据【英文标题】:rollapply for large data using sparklyr 【发布时间】:2018-02-11 22:29:50 【问题描述】:我想估计大约 2250 万个观察值的数据集的滚动风险价值,因此我想使用 sparklyr 进行快速计算。这是我所做的(使用示例数据库):
library(PerformanceAnalytics)
library(reshape2)
library(dplyr)
data(managers)
data <- zerofill(managers)
data<-as.data.frame(data)
class(data)
data$date=row.names(data)
lmanagers<-melt(data, id.vars=c('date'))
现在我使用 dplyr 和 PerformanceAnalytics 包估计 VaR:
library(zoo) # for rollapply()
var <- lmanagers %>% group_by(variable) %>% arrange(variable,date) %>%
mutate(var=rollapply(value, 10,FUN=function(x) VaR(x, p=.95, method="modified",align = "right"), partial=T))
这很好用。现在我这样做是为了利用 sparklyr:
library(sparklyr)
sc <- spark_connect(master = "local")
lmanagers_sp <- copy_to(sc,lmanagers)
src_tbls(sc)
var_sp <- lmanagers_sp %>% group_by(variable) %>% arrange(variable,date) %>%
mutate(var=rollapply(value, 10,FUN=function(x) VaR(x, p=.95, method="modified",align = "right"), partial=T)) %>%
collect
但这会产生以下错误:
Error: Unknown input type: pairlist
谁能告诉我错误在哪里,正确的代码是什么?或者任何其他更快地估计滚动 VaR 的解决方案也值得赞赏。
【问题讨论】:
您确实意识到data$date=row.names(data)
为您提供了character
的向量,而不是Date
的向量?如果你这样做会发生什么data$date <- as.Date(row.names(data))
【参考方案1】:
让我把你的问题分成两个任务:
如何使用sparklyr
接口进行滚动自联接(即a.manager_id = b.manager_id and a.date < b.date and b.date <= a.date + 10
)
如何通过sparklyr
接口使用自定义函数(即VaR
)
第一个任务可能可以使用dplyr
动词,它支持有限的Window functions, including lead()
and lag()
集合。你可能会得到一些非常丑陋的东西,就像(lag(return,1) + lag(return,2) + lag(return,3))/(3 - is.na(lag(return,1)) - is.na(lag(return,2)) - is.na(lag(return,3))
一样——只是一个通用的例子。 (不幸的是,条件连接,例如日期窗口,在dplyr
中仍然是unsupported - 这个问题似乎经常出现,例如this one。)
直接用 DBI::dbGetQuery()
包裹的 Direct Spark SQL(使用上面描述的条件自联接)编写第一个任务会容易得多。
第二个任务是统计任务,不能简单地使用dplyr
或直接SQL完成,它有一个sparklyr
不支持的库依赖,所以你需要使用Scala(或Python ) user-defined function (UDF) 来计算 VaR,例如另一个答案中的 already linked。
tl;dr 第一个任务可以通过sparklyr
完成(但使用SQL,而不是dplyr
)。第二个任务需要一个外部 UDF,然后您可以通过 sparklyr
invoke()
。
【讨论】:
【参考方案2】:对于像sparklyr
这样的自定义dplyr
后端,mutate
目前不支持在其他包中定义的任意 R 函数;因此,rollapply()
目前不受支持。
为了计算sparklyr
中的风险价值,一种方法是使用extend sparklyr using Scala and R 并遵循类似于:Estimating Financial Risk with Apache Spark 的方法。
【讨论】:
以上是关于rollapply 使用 sparklyr 处理大数据的主要内容,如果未能解决你的问题,请参考以下文章
将大型数据集缓存到 spark 内存中时“超出 GC 开销限制”(通过 sparklyr 和 RStudio)
如何使用 'sparklyr::replace.na()' 替换一列上的 NaN?
R语言使用zoo包中的rollapply函数计算两个时间序列数据列之间的滚动相关性(Rolling correlations)例如,计算两种商品销售额之间的3个月的滚动相关性