使用 R doParallel 或 foreach 从 mysql 并行获取数据

Posted

技术标签:

【中文标题】使用 R doParallel 或 foreach 从 mysql 并行获取数据【英文标题】:Fetching data in parallel from mysql using R doParallel or foreach 【发布时间】:2016-04-21 03:26:55 【问题描述】:

我正在尝试使用 R 从 mysql 数据库中并行获取数据。以下代码正在逐一获取数据并且工作正常。但我想通过发送多个查询并将其保存到不同的变量中来加快这个过程。稍后我将在变量中合并时间序列。

library(RMySQL)
dbConnect(MySQL(), user='external', password='xxxxxxx', dbname='GMT_Minute_Data', host='xx.xx.xxx.xxx')

sqlData <-select TradeTime, Open, High, Low, Close from ad where tradetime between ‘2014-01-01’ and ‘2015-10-20’
data1= dbFetch(sqlData, n=-1)
sqlData <-select TradeTime, Open, High, Low, Close from ty where tradetime between ‘2014-01-01’ and ‘2015-10-20’
data2 = dbFetch(sqlData, n=-1)
sqlData <-select TradeTime, Open, High, Low, Close from ax where tradetime between ‘2014-01-01’ and ‘2015-10-20’
data3 = dbFetch(sqlData, n=-1)

connections <- dbListConnections(MySQL())
for(i in connections) dbDisconnect(i)

我尝试使用以下代码并行获取数据:

library(foreach)
library(doParallel)
library(RMySQL)

fetchData<- function(nInst, inst1, inst2, inst3, inst4, inst5, startDate, endDate, con1)

  inst<-NULL
  sqlData <-NULL

  if(nInst==1)
    inst<-inst1
  else if(nInst==2)
    inst<-inst2
  else if(nInst==3)
    inst<-inst3
  else if(nInst==4)
    inst<-inst4
  else if(nInst==5)
    inst<-inst5

  sqlData <- dbSendQuery(con1, paste0('select TradeTime, Open, High, Low, Close from ', inst, ' where tradetime between \'',  startDate, '\' and \'',  endDate, '\'' ))
  data1 = dbFetch(sqlData, n=-1)
  print(head(data1))

  data1 


cluster = makeCluster(5, type = "SOCK")
registerDoParallel(cluster)
mydb <- NULL
clusterEvalQ(cluster, 

  mydb <- dbConnect(MySQL(), user='external', password='xxxxxx', dbname='GMT_Minute_Data', host='xx.xx.xxx.xxx')
  NULL
)


allDataList<-foreach(n =1:2, .verbose=TRUE, .packages=('RMySQL')) %dopar% 
  fetchData(n, inst1, inst2, inst3, inst4, inst5, startDate, endDate, mydb)


stopCluster(cluster)
on.exit(dbDisconnect(mydb))

有时代码只为第一个仪器获取数据,而不为其余仪器获取数据。

如果有人知道解决方案,请提供帮助。

谢谢,

【问题讨论】:

【参考方案1】:

我认为问题在于 foreach 将 mydb 变量自动导出给工作人员,从而破坏了使用它进行初始化的目的 clusterEvalQ。数据库连接无法正确序列化并发送到其他机器,这就是为什么使用clusterEvalQ 手动初始化它很有用。 foreach .verbose=TRUE 选项让您验证 mydb自动导出的。如果它说它是自动导出的,你需要阻止它。

在您的示例中,您可以通过简单地删除 mydb &lt;- NULL 语句来防止自动导出 mydb,但我建议您使用 foreach .noexport='mydb' 选项来确保它永远不会自动导出。这是一个简化的示例:

library(doParallel)

fetchData <- function(ignore) 
  mydb


cluster <- makeCluster(5, type = "SOCK")
registerDoParallel(cluster)

clusterEvalQ(cluster, 
  mydb <- sample(100, 1) # different value for each worker
  NULL
)

r <- foreach(n=1:2, .noexport='mydb', .verbose=TRUE) %dopar%  
  fetchData(n)

在这种情况下,foreach 分析fetchData 函数并注意到它使用了一个名为mydb 的变量。因此,如果在主服务器上定义了mydb,它将自动导出它,除非您告诉它不要这样做。这就是为什么我建议使用.noexport='mydb',即使它没有在本地环境中定义。它可以双重确保您的函数没有使用损坏的数据库连接。

【讨论】:

以上是关于使用 R doParallel 或 foreach 从 mysql 并行获取数据的主要内容,如果未能解决你的问题,请参考以下文章

Foreach和doparallel而不是R中的for循环

使用 foreach 函数和 doParallel 库在 R 中嵌套 for 循环

使用 doparallel 在 foreach 循环内循环

在 R doParallel foreach 循环中运行 ovun.sample

在 R 中使用 doParallel 的 foreach 时,Windows Defender 的 CPU 使用率非常高

R doParallel foreach 对独立工作者进行错误处理