r 并行化RDS压缩/解压缩以提高R中的序列化性能
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了r 并行化RDS压缩/解压缩以提高R中的序列化性能相关的知识,希望对你有一定的参考价值。
# The functions below use parallelized versions of gzip, xz, and bzip2 to
# improve compression/decompression performance of RDS serialization in R.
# Each function searches for the appropriate program (based on the required
# compression format) and if found, offloads the compression handling to the
# external program and therefore leaves R free to do the data import/export.
# The two main functions (saveRDS and readRDS) mask R's native read and write
# functions. The functions have been only tested on macOS, but they must work
# on any Linux/Unix.
#
# Requires the following packages: pxz, pbzip2, and pigz.
#
# Run the following line at the command prompt before using the functions.
#
# brew install pigz pbzip2 pigz
#
library(parallel)
saveRDS.xz <-
function(object, file, threads = parallel::detectCores()) {
pxzAvail <- any(grepl("(XZ Utils)", system("pxz -V", intern = TRUE)))
if (pxzAvail) {
con <- pipe(paste0("pxz -T", threads, " > ", file), "wb")
base::saveRDS(object, file = con)
close(con)
} else {
base::saveRDS(object, file = file, compress = "xz")
}
}
readRDS.xz <- function(file, threads = parallel::detectCores()) {
pxzAvail <- any(grepl("(XZ Utils)", system("pxz -V", intern = TRUE)))
if (pxzAvail) {
con <- pipe(paste0("pxz -d -k -c -T", threads, " ", file))
object <- base::readRDS(file = con)
close(con)
} else {
object <- base::readRDS(file)
}
return(object)
}
saveRDS.gz <-
function(object,
file,
threads = parallel::detectCores(),
compression_level = 6) {
pigzAvail <- any(grepl("pigz", system("pigz -V 2>&1", intern = TRUE)))
if (pigzAvail) {
con <-
pipe(paste0("pigz -c", compression_level, " -p", threads, " > ", file),
"wb")
base::saveRDS(object, file = con)
close(con)
} else {
base::saveRDS(object, file = file, compress = "gzip")
}
}
readRDS.gz <- function(file, threads = parallel::detectCores()) {
pigzAvail <- any(grepl("pigz", system("pigz -V 2>&1", intern = TRUE)))
if (pigzAvail) {
con <- pipe(paste0("pigz -d -c -p", threads, " ", file))
object <- base::readRDS(file = con)
close(con)
} else {
object <- base::readRDS(file)
}
return(object)
}
saveRDS.bz2 <-
function(object,
file,
threads = parallel::detectCores(),
compression_level = 6) {
pbz2Avail <-
any(grepl("Parallel BZIP2", system("pbzip2 -V 2>&1", intern = TRUE)))
if (pbz2Avail) {
con <-
pipe(paste0("pbzip2 -c", compression_level, " -p", threads, " > ", file),
"wb")
base::saveRDS(object, file = con)
close(con)
} else {
base::saveRDS(object, file = file, compress = "bzip2")
}
}
readRDS.bz2 <- function(file, threads = parallel::detectCores()) {
pbz2Avail <-
any(grepl("Parallel BZIP2", system("pbzip2 -V 2>&1", intern = TRUE)))
if (pbz2Avail) {
con <- pipe(paste0("pbzip2 -d -c -p", threads, " ", file))
object <- base::readRDS(file = con)
close(con)
} else {
object <- base::readRDS(file)
}
return(object)
}
readRDS <- function(file, threads = parallel::detectCores()) {
if (!file.exists(file)) {
stop(paste0(file, " does not exist!"))
}
fileDetails <- system2("file", args = file, stdout = TRUE)
selector <-
sapply(c("gzip", "XZ", "BZ"), function (x) {
grepl(x, fileDetails)
})
format <- names(selector)[selector]
if (length(format) == 0) {
format <- "not found"
}
if (format == "gzip") {
object <- readRDS.gz(file, threads = threads)
} else if (format == "XZ") {
object <- readRDS.xz(file, threads = threads)
} else if (format == "bzip2") {
object <- readRDS.bz2(file, threads = threads)
} else {
object <- base::readRDS(file)
}
return(object)
}
saveRDS <- function(object,
file = "",
compress = TRUE) {
if (compress %in% c(TRUE, "gz", "gzip")) {
saveRDS.gz(object, file)
} else if (compress %in% c("bzip", "bzip2", "bz", "bz2")) {
saveRDS.bz2(object, file)
} else if (compress %in% c("xz", "7zip", "7z")) {
saveRDS.xz(object, file)
} else if (compress == FALSE) {
base::saveRDS(object, file)
} else {
stop(paste0(compress, " is not a recognized compression method!"))
}
}
以上是关于r 并行化RDS压缩/解压缩以提高R中的序列化性能的主要内容,如果未能解决你的问题,请参考以下文章