r 并行化RDS压缩/解压缩以提高R中的序列化性能

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了r 并行化RDS压缩/解压缩以提高R中的序列化性能相关的知识,希望对你有一定的参考价值。

# The functions below use parallelized versions of gzip, xz, and bzip2 to
# improve compression/decompression performance of RDS serialization in R.
# Each function searches for the appropriate program (based on the required
# compression format) and if found, offloads the compression handling to the
# external program and therefore leaves R free to do the data import/export.
# The two main functions (saveRDS and readRDS) mask R's native read and write
# functions. The functions have been only tested on macOS, but they must work
# on any Linux/Unix.
#
# Requires the following packages: pxz, pbzip2, and pigz.
#
# Run the following line at the command prompt before using the functions.
#
#     brew install pigz pbzip2 pigz
#

library(parallel)

saveRDS.xz <-
  function(object, file, threads = parallel::detectCores()) {
    pxzAvail <- any(grepl("(XZ Utils)", system("pxz -V", intern = TRUE)))
    if (pxzAvail) {
      con <- pipe(paste0("pxz -T", threads, " > ", file), "wb")
      base::saveRDS(object, file = con)
      close(con)
    } else {
      base::saveRDS(object, file = file, compress = "xz")
    }
  }

readRDS.xz <- function(file, threads = parallel::detectCores()) {
  pxzAvail <- any(grepl("(XZ Utils)", system("pxz -V", intern = TRUE)))
  if (pxzAvail) {
    con <- pipe(paste0("pxz -d -k -c -T", threads, " ", file))
    object <- base::readRDS(file = con)
    close(con)
  } else {
    object <- base::readRDS(file)
  }
  return(object)
}

saveRDS.gz <-
  function(object,
           file,
           threads = parallel::detectCores(),
           compression_level = 6) {
    pigzAvail <- any(grepl("pigz", system("pigz -V 2>&1", intern = TRUE)))
    if (pigzAvail) {
      con <-
        pipe(paste0("pigz -c", compression_level, " -p", threads, " > ", file),
             "wb")
      base::saveRDS(object, file = con)
      close(con)
    } else {
      base::saveRDS(object, file = file, compress = "gzip")
    }
  }

readRDS.gz <- function(file, threads = parallel::detectCores()) {
  pigzAvail <- any(grepl("pigz", system("pigz -V 2>&1", intern = TRUE)))
  if (pigzAvail) {
    con <- pipe(paste0("pigz -d -c -p", threads, " ", file))
    object <- base::readRDS(file = con)
    close(con)
  } else {
    object <- base::readRDS(file)
  }
  return(object)
}

saveRDS.bz2 <-
  function(object,
           file,
           threads = parallel::detectCores(),
           compression_level = 6) {
    pbz2Avail <-
      any(grepl("Parallel BZIP2", system("pbzip2 -V 2>&1", intern = TRUE)))
    if (pbz2Avail) {
      con <-
        pipe(paste0("pbzip2 -c", compression_level, " -p", threads, " > ", file),
             "wb")
      base::saveRDS(object, file = con)
      close(con)
    } else {
      base::saveRDS(object, file = file, compress = "bzip2")
    }
  }

readRDS.bz2 <- function(file, threads = parallel::detectCores()) {
  pbz2Avail <-
    any(grepl("Parallel BZIP2", system("pbzip2 -V 2>&1", intern = TRUE)))
  if (pbz2Avail) {
    con <- pipe(paste0("pbzip2 -d -c -p", threads, " ", file))
    object <- base::readRDS(file = con)
    close(con)
  } else {
    object <- base::readRDS(file)
  }
  return(object)
}

readRDS <- function(file, threads = parallel::detectCores()) {
  if (!file.exists(file)) {
    stop(paste0(file, " does not exist!"))
  }
  fileDetails <- system2("file", args = file, stdout = TRUE)
  selector <-
    sapply(c("gzip", "XZ", "BZ"), function (x) {
      grepl(x, fileDetails)
    })
  format <- names(selector)[selector]
  if (length(format) == 0) {
    format <- "not found"
  }
  if (format == "gzip") {
    object <- readRDS.gz(file, threads = threads)
  } else if (format == "XZ") {
    object <- readRDS.xz(file, threads = threads)
  } else if (format == "bzip2") {
    object <- readRDS.bz2(file, threads = threads)
  } else {
    object <- base::readRDS(file)
  }
  return(object)
}

saveRDS <- function(object,
                    file = "",
                    compress = TRUE) {
  if (compress %in% c(TRUE, "gz", "gzip")) {
    saveRDS.gz(object, file)
  } else if (compress %in% c("bzip", "bzip2", "bz", "bz2")) {
    saveRDS.bz2(object, file)
  } else if (compress %in% c("xz", "7zip", "7z")) {
    saveRDS.xz(object, file)
  } else if (compress == FALSE) {
    base::saveRDS(object, file)
  } else {
    stop(paste0(compress, " is not a recognized compression method!"))
  }
}

以上是关于r 并行化RDS压缩/解压缩以提高R中的序列化性能的主要内容,如果未能解决你的问题,请参考以下文章

R语言并行化基础与提高

Linux中的压缩解压缩

并行化大型动态程序

zip压缩和unzip解压缩

python怎样压缩和解压缩ZIP文件

zip解压缩