rollapply 使用 sparklyr 处理大数据

Posted

技术标签:

【中文标题】rollapply 使用 sparklyr 处理大数据【英文标题】:rollapply for large data using sparklyr 【发布时间】:2018-02-11 22:29:50 【问题描述】:

我想估计大约 2250 万个观察值的数据集的滚动风险价值,因此我想使用 sparklyr 进行快速计算。这是我所做的(使用示例数据库):

library(PerformanceAnalytics)
library(reshape2)
library(dplyr)

data(managers)
data <- zerofill(managers)
data<-as.data.frame(data)
class(data)
data$date=row.names(data)
lmanagers<-melt(data, id.vars=c('date'))

现在我使用 dplyr 和 PerformanceAnalytics 包估计 VaR:

library(zoo) # for rollapply()
var <- lmanagers %>% group_by(variable) %>% arrange(variable,date) %>% 
  mutate(var=rollapply(value, 10,FUN=function(x) VaR(x, p=.95, method="modified",align = "right"), partial=T))

这很好用。现在我这样做是为了利用 sparklyr:

library(sparklyr)
sc <- spark_connect(master = "local")
lmanagers_sp <- copy_to(sc,lmanagers)
src_tbls(sc)

var_sp <- lmanagers_sp %>% group_by(variable) %>% arrange(variable,date) %>% 
  mutate(var=rollapply(value, 10,FUN=function(x) VaR(x, p=.95, method="modified",align = "right"), partial=T)) %>% 
  collect

但这会产生以下错误:

Error: Unknown input type: pairlist

谁能告诉我错误在哪里,正确的代码是什么?或者任何其他更快地估计滚动 VaR 的解决方案也值得赞赏。

【问题讨论】:

您确实意识到data$date=row.names(data) 为您提供了character 的向量,而不是Date 的向量?如果你这样做会发生什么data$date &lt;- as.Date(row.names(data)) 【参考方案1】:

让我把你的问题分成两个任务:

如何使用sparklyr 接口进行滚动自联接(即a.manager_id = b.manager_id and a.date &lt; b.date and b.date &lt;= a.date + 10) 如何通过sparklyr 接口使用自定义函数(即VaR

第一个任务可能可以使用dplyr 动词,它支持有限的Window functions, including lead() and lag() 集合。你可能会得到一些非常丑陋的东西,就像(lag(return,1) + lag(return,2) + lag(return,3))/(3 - is.na(lag(return,1)) - is.na(lag(return,2)) - is.na(lag(return,3)) 一样——只是一个通用的例子。 (不幸的是,条件连接,例如日期窗口,在dplyr 中仍然是unsupported - 这个问题似乎经常出现,例如this one。)

直接用 DBI::dbGetQuery() 包裹的 Direct Spark SQL(使用上面描述的条件自联接)编写第一个任务会容易得多。

第二个任务是统计任务,不能简单地使用dplyr或直接SQL完成,它有一个sparklyr不支持的库依赖,所以你需要使用Scala(或Python ) user-defined function (UDF) 来计算 VaR,例如另一个答案中的 already linked。

tl;dr 第一个任务可以通过sparklyr 完成(但使用SQL,而不是dplyr)。第二个任务需要一个外部 UDF,然后您可以通过 sparklyrinvoke()

【讨论】:

【参考方案2】:

对于像sparklyr 这样的自定义dplyr 后端,mutate 目前不支持在其他包中定义的任意 R 函数;因此,rollapply() 目前不受支持。

为了计算sparklyr 中的风险价值,一种方法是使用extend sparklyr using Scala and R 并遵循类似于:Estimating Financial Risk with Apache Spark 的方法。

【讨论】:

以上是关于rollapply 使用 sparklyr 处理大数据的主要内容,如果未能解决你的问题,请参考以下文章

将大型数据集缓存到 spark 内存中时“超出 GC 开销限制”(通过 sparklyr 和 RStudio)

CAPM.beta rollapply

R语言可以处理大的数据吗

如何使用 'sparklyr::replace.na()' 替换一列上的 NaN?

R语言使用zoo包中的rollapply函数计算两个时间序列数据列之间的滚动相关性(Rolling correlations)例如,计算两种商品销售额之间的3个月的滚动相关性

如何使用 sparklyr 过滤部分匹配