有没有办法在 xts 中将 period.apply 与 doParallel 和 foreach 一起使用?

Posted

技术标签:

【中文标题】有没有办法在 xts 中将 period.apply 与 doParallel 和 foreach 一起使用?【英文标题】:Is there a way to use period.apply with doParallel and foreach in xts? 【发布时间】:2018-10-08 07:49:08 【问题描述】:

我想在 R 中并行化 period.apply 函数,我正在尝试使用 doParallelForeach,但我不知道如何实现这个函数。我使用的数据是带有日期时间索引和变量值的xts 对象,我想做的是每 5 秒取一次数据的平均值:

                                     VAR
2018-01-01 00:00:00                1945.054
2018-01-01 00:00:02                1944.940
2018-01-01 00:00:05                1945.061
2018-01-01 00:00:07                1945.255
2018-01-01 00:00:10                1945.007
2018-01-01 00:00:12                1944.995

这是我编写的代码示例,但它不起作用:

library(xts)
library(doParallel)
library(foreach)

cores <- detectCores()
cluster <- makeCluster(cores, type = "PSOCK")
registerDoParallel(cluster)

ends <- endpoints(x,"secs",5)
m <- foreach(i = 1:length(index(x))) %dopar% period.apply(x,ends,mean)
index(m) <- foreach(m) %dopar% trunc(index(m),"secs")
stopCluster()

有效的代码是这样的,但对于更大的数据库,它需要太多时间:

ends <- endpoints(x,"secs",5)
m <- period.apply(x, ends, mean)
index(m) <- trunc(index(m),"secs")

有没有更有效的方法?

提前致谢。

【问题讨论】:

读者不清楚 period.apply() 是 'xts' 中的一个函数 - 请更新为 library(xts) 关于“不起作用”,您是否收到错误消息,或者它给您错误的结果,或者它只是没有更快? 执行时间太长,我应该停止执行。 @Riverarodrigoa 您的方法是将任务拆分为(大约)N/5 个作业,每个作业只处理几行,其中 N 是您拥有的数据行数。更有效的是设置 8 个作业,每个作业处理 N/8 行。我看到 Ralf 的回答 ***.com/a/50090842/841830 就是这样做的。 【参考方案1】:

我对这个问题中说明的period.apply() 的表现感到非常沮丧。我的抑郁症变成了一种让抑郁症变得更快的痴迷。所以我用 C 重写了它。这是一个使用它并显示性能改进的示例。

library(xts)  # need the GitHub development version
period_apply <- xts:::period_apply  # not exported

set.seed(21)
x <- .xts(rnorm(1e7), 1:1e7)
e <- endpoints(x, "seconds", 5)

system.time(y <- period.apply(x, e, sum))  # current version
#    user  system elapsed 
#  77.904   0.368  78.462 
system.time(z <- period_apply(x, e, sum))  # new C version
#    user  system elapsed 
#  15.468   0.232  15.741
all.equal(y, z)
# [1] TRUE

因此,对于本示例,它的速度大约快了 5 倍。还有一些事情可以让它更快,但 5x 是一个停下来展示它可能会更好的好地方。如果您想(并且足够勇敢)尝试一下,请查看latest development version。

【讨论】:

【参考方案2】:

您是否在一些简单的数据集上尝试过您的代码?因为一旦我让它运行,它就会多次完成所有工作(x 中的每一行一次)。此外,如果您尝试并行化工作,通常最好让“工作人员”在发回数据之前尽可能多地完成工作。在您的代码中,您有两个连续的 foreach 调用,这会导致额外的通信开销。

我的做法是这样的:

    xts 对象拆分为N 垃圾,确保我们以5 秒的间隔之一进行拆分。 让每个工人为一块做所有的工作。 合并结果。如何选择N

由于split.xts 用于第一步,每个块将具有相同数量的 5s 间隔。但是,要完成的工作量(可能)更多地取决于数据点的数量,而不是 5s 间隔的数量。因此,如果这些块之间的点分布不均匀,那么使用更多的块以及一些负载平衡可能是有意义的。如果点的分布是均匀的,则将N 尽可能大以最小化通信开销是有意义的。这里我采用后一种方法,即设置N 等于内核数。

现在让我们生成一些示例数据并应用您的工作解决方案:

library(xts)
x <- xts(x = runif(100),
         order.by = as.POSIXct("2018-01-01") + 0:99)

ends <- endpoints(x,"secs",5)
m <- period.apply(x, ends, mean)
index(m) <- trunc(index(m),"secs")

接下来我们设置并行集群:

library(doParallel)
library(foreach)

cores <- detectCores()
cluster <- makeCluster(cores, type = "PSOCK")
registerDoParallel(cluster)

现在我们必须拆分xts 对象。在这里,我首先确定整个对象的时间跨度,并以N 5s 为间隔进行分配。

N <- cores
k <- as.integer(ceiling(difftime(max(index(x)), min(index(x)), units = "secs") / (5 * N)))

接下来,我将xts 对象拆分为xts 对象列表,每个对象的长度大致相同:

split_x <- split(x, f = "secs", k = 5 * k)

现在我让foreach 遍历这些块并合并结果:

m2 <- foreach(x = split_x, .packages = c("xts"), .combine = c) %dopar% 
    ends <- endpoints(x,"secs",5)
    m <- period.apply(x, ends, mean)
    index(m) <- trunc(index(m),"secs")
    m

stopCluster(cluster)

万岁,结果是一样的:

all.equal(m, m2)
#> [1] TRUE

【讨论】:

非常感谢!这解决了我的问题。现在它的运行速度提高了 3 倍,之前没有并行化 27 秒,现在是 8 秒。你知道这个时间是否可以减少更多? @Riverarodrigoa 你有多少(真正的)核心?实际数据集有多大?在这 8 秒内是否所有核心都处于 100% 状态,或者是否存在(更长的)活动核心较少的延伸? 抱歉耽搁了,我必须工作的数据集大约是 120 000 000 个观测值,现在我在一个 1028279 obs 的样本中工作(8s 对应于这个样本)。是的,我电脑的所有内核都在这 8 秒内 100% 工作。 @Riverarodrigoa 你有多少核心?不管怎样,你可以试试RcppRoll。首先为了简单起见,没有并行性。 虽然 RcppRoll 是一个非常不错的包,但我相信它只能滚动(重叠)窗口。 period.apply() 用于非重叠窗口。

以上是关于有没有办法在 xts 中将 period.apply 与 doParallel 和 foreach 一起使用?的主要内容,如果未能解决你的问题,请参考以下文章

在 R 中将哪个时间序列类用于财务数据?

xts 格式的时间戳和高频包的列名

有没有办法在 C++ 中将 auto 作为参数传递?

有没有办法在 RealityKit 中将视图呈现为实体?

有没有办法在android studio中将音频文件转换为字节数组

有没有办法在opencv c ++中将密集光流转换为稀疏光流?