将 data.table 列快速连接成一个字符串列

Posted

技术标签:

【中文标题】将 data.table 列快速连接成一个字符串列【英文标题】:Fast concatenation of data.table columns into one string column 【发布时间】:2018-06-22 07:58:04 【问题描述】:

给定data.table 中的任意列名列表,我想将这些列的内容连接成单个字符串,存储在新列中。我需要连接的列并不总是相同的,因此我需要动态生成表达式。

我偷偷怀疑我使用 eval(parse(...)) 调用的方式可以用更优雅的方式替换,但下面的方法是迄今为止我能得到的最快的方法。

对于 1000 万行,此示例数据大约需要 21.7 秒(base R paste0 需要稍长一些 - 23.6 秒)。我的实际数据有 18-20 列被连接起来,多达 1 亿行,所以减速变得有点不切实际。

有什么想法可以加快速度吗?


当前方法

library(data.table)
library(stringi)

RowCount <- 1e7
DT <- data.table(x = "foo",
                 y = "bar",
                 a = sample.int(9, RowCount, TRUE),
                 b = sample.int(9, RowCount, TRUE),
                 c = sample.int(9, RowCount, TRUE),
                 d = sample.int(9, RowCount, TRUE),
                 e = sample.int(9, RowCount, TRUE),
                 f = sample.int(9, RowCount, TRUE))

## Generate an expression to paste an arbitrary list of columns together
ConcatCols <- c("x","a","b","c","d","e","f","y")
PasteStatement <- stri_c('stri_c(',stri_c(ConcatCols,collapse = ","),')')
print(PasteStatement)

给予

[1] "stri_c(x,a,b,c,d,e,f,y)"

然后使用以下表达式连接列:

DT[,State := eval(parse(text = PasteStatement))]

输出样本:

     x   y a b c d e f        State
1: foo bar 4 8 3 6 9 2 foo483692bar
2: foo bar 8 4 8 7 8 4 foo848784bar
3: foo bar 2 6 2 4 3 5 foo262435bar
4: foo bar 2 4 2 4 9 9 foo242499bar
5: foo bar 5 9 8 7 2 7 foo598727bar

分析结果


更新 1:freadfwritesed

按照@Gregor 的建议,尝试使用sed 在磁盘上进行连接。感谢 data.table 超快的freadfwrite 函数,我能够将列写入磁盘,使用 sed 消除逗号分隔符,然后在大约 18.3 秒内读回后处理的输出 -- 切换的速度不够快,但还是一个有趣的切线!

ConcatCols <- c("x","a","b","c","d","e","f","y")
fwrite(DT[,..ConcatCols],"/home/xxx/DT.csv")
system("sed 's/,//g' /home/xxx/DT.csv > /home/xxx/DT_Post.csv ")
Post <- fread("/home/xxx/DT_Post.csv")
DT[,State := Post[[1]]]

18.3 秒总时间的细分(无法使用 profvis,因为 sed 对 R 分析器不可见)

data.table::fwrite() - 0.5 秒 sed- 14.8 秒 data.table::fread() - 3.0 秒 := - 0.0 秒

如果不出意外,这证明了 data.table 作者在磁盘 IO 性能优化方面所做的大量工作。 (我用的是1.10.5开发版,给fread增加了多线程,fwrite多线程已经有一段时间了。

一个警告:如果有一种解决方法可以使用fwrite 和@Gregor 在下面的另一条评论中建议的空白分隔符来写入文件,那么这种方法可以合理地缩减为 ~ 3.5秒!

对此切线的更新:分叉 data.table 并注释掉需要大于长度 0 的分隔符的行,却神秘地得到了一些空格?在导致一些段错误试图弄乱C 内部之后,我暂时把它放在了冰上。理想的解决方案不需要写入磁盘并将所有内容都保存在内存中。


更新 2:sprintf 用于整数特定情况

这里的第二个更新:虽然我在原始使用示例中包含了字符串,但我的实际用例专门连接整数值(根据上游清理步骤,始终可以假定为非 null)。

由于用例非常具体并且与原始问题不同,我不会直接将时间与之前发布的时间进行比较。然而,一个要点是,虽然stringi 很好地处理了许多字符编码格式、混合向量类型而无需指定它们,并且开箱即用地进行了一堆错误处理,但这确实增加了一些时间(这可能是在大多数情况下都值得)

通过使用基本 R 的 sprintf 函数并让它预先知道所有输入都是整数,我们可以为 500 万行计算 18 个整数列减少大约 30% 的运行时间。 (20.3 秒而不是 28.9)

library(data.table)
library(stringi)
RowCount <- 5e6
DT <- data.table(x = "foo",
                 y = "bar",
                 a = sample.int(9, RowCount, TRUE),
                 b = sample.int(9, RowCount, TRUE),
                 c = sample.int(9, RowCount, TRUE),
                 d = sample.int(9, RowCount, TRUE),
                 e = sample.int(9, RowCount, TRUE),
                 f = sample.int(9, RowCount, TRUE))

## Generate an expression to paste an arbitrary list of columns together
ConcatCols <- list("a","b","c","d","e","f")
## Do it 3x as many times
ConcatCols <- c(ConcatCols,ConcatCols,ConcatCols)

## Using stringi::stri_c ---------------------------------------------------
stri_joinStatement <- stri_c('stri_join(',stri_c(ConcatCols,collapse = ","),', sep="", collapse=NULL, ignore_null=TRUE)')
DT[, State := eval(parse(text = stri_joinStatement))]

## Using sprintf -----------------------------------------------------------
sprintfStatement <- stri_c("sprintf('",stri_flatten(rep("%i",length(ConcatCols))),"', ",stri_c(ConcatCols,collapse = ","),")")
DT[,State_sprintf_i := eval(parse(text = sprintfStatement))]

生成的语句如下:

> cat(stri_joinStatement)
stri_join(a,b,c,d,e,f,a,b,c,d,e,f,a,b,c,d,e,f, sep="", collapse=NULL, ignore_null=TRUE)
> cat(sprintfStatement)
sprintf('%i%i%i%i%i%i%i%i%i%i%i%i%i%i%i%i%i%i', a,b,c,d,e,f,a,b,c,d,e,f,a,b,c,d,e,f)


更新 3:R 不必很慢。

根据@Martin Modrák 的回答,我根据data.table 内部专门针对专门的“个位整数”案例:fastConcat 组装了一个小马包。 (不要很快在 CRAN 上寻找它,但您可以通过从 github repo 安装来使用它,风险自负,msummersgill/fastConcat。)

如果有人更了解c,这可能会进一步改进,但目前,它在 2.5 秒 内运行与更新 2 相同的情况 - 大约 8x 比 sprintf() 快,并且比我最初使用的 stringi::stri_c() 方法快 11.5x

对我来说,这凸显了在 R 中一些最简单的操作的性能改进的巨大机会比如基本的字符串向量连接 和更好的调整 c。我猜像@Matt Dowle 这样的人已经看到这个很多年了——要是他有时间重写所有R,而不仅仅是data.frame。


【问题讨论】:

所有stri_c 立即都是一个用于连接字符串的C++ 函数。我认为你无法超越它在 R 中的性能。即使 paste 可以非常快速地编译代码,因此它的性能几乎一样好。 使用命令行工具对数据进行预处理或后处理可能对您有用吗?或者在 SQL 或 Hadoop 中连接数据,或者您正在加载它? 几个想法:(a) 在从 Hadoop 中提取数据时合并列。 Hive、Pig 和 Spark 都支持列连接(据我所知)。 (b) 不幸的是,fread 不允许使用空白分隔符,但 readr::write_delim 可以。它可能太慢了,但值得一试。 (c) sed 可能是您可以从命令行执行的最快操作,但 answers to this question 建议您可以使用不同的语法获得一些加速,特别是如果您复制文件而不是在原地编辑它。 (d) 不知道这是否可行,但看起来fwrite 中的单行输入检查使您无法将"" 指定为分隔符。您可以尝试使用fixInNamespace 删除该行,然后看看它是否允许您使用fwritesep = ""。我以前从未使用过fixInNamespace,但这应该是可行的。悬而未决的问题是sep 不是空字符串是否有更深层次的原因。 提交FR支持sep = ""imo。 【参考方案1】:

C 来救援!

从 data.table 中窃取一些代码,我们可以编写一个运行速度更快的 C 函数(并且可以并行化以更快)。

首先确保您有一个可以使用的 C++ 工具链:

library(inline)

fx <- inline::cfunction( signature(x = "integer", y = "numeric" ) , '
    return ScalarReal( INTEGER(x)[0] * REAL(y)[0] ) ;
' )
fx( 2L, 5 ) #Should return 10

那么这应该可以工作(假设只有整数数据,但代码可以扩展到其他类型):

library(inline)
library(data.table)
library(stringi)

header <- "

//Taken from https://github.com/Rdatatable/data.table/blob/master/src/fwrite.c
static inline void reverse(char *upp, char *low)

  upp--;
  while (upp>low) 
  char tmp = *upp;
  *upp = *low;
  *low = tmp;
  upp--;
  low++;
  


void writeInt32(int *col, size_t row, char **pch)

  char *ch = *pch;
  int x = col[row];
  if (x == INT_MIN) 
  *ch++ = 'N';
  *ch++ = 'A';
   else 
  if (x<0)  *ch++ = '-'; x=-x; 
  // Avoid log() for speed. Write backwards then reverse when we know how long.
  char *low = ch;
  do  *ch++ = '0'+x%10; x/=10;  while (x>0);
  reverse(ch, low);
  
  *pch = ch;


//end of copied code 

"



 worker_fun <- inline::cfunction( signature(x = "list", preallocated_target = "character", columns = "integer", start_row = "integer", end_row = "integer"), includes = header , "
  const size_t _start_row = INTEGER(start_row)[0] - 1;
  const size_t _end_row = INTEGER(end_row)[0];

  const int max_out_len = 256 * 256; //max length of the final string
  char buffer[max_out_len];
  const size_t num_elements = _end_row - _start_row;
  const size_t num_columns = LENGTH(columns);
  const int * _columns = INTEGER(columns);

  for(size_t i = _start_row; i < _end_row; ++i) 
    char *buf_pos = buffer;
    for(size_t c = 0; c < num_columns; ++c) 
      if(c > 0) 
        buf_pos[0] = ',';
        ++buf_pos;
      
      writeInt32(INTEGER(VECTOR_ELT(x, _columns[c] - 1)), i, &buf_pos);
    
    SET_STRING_ELT(preallocated_target,i, mkCharLen(buffer, buf_pos - buffer));
  
return preallocated_target;
" )

#Test with the same data

RowCount <- 5e6
DT <- data.table(x = "foo",
                 y = "bar",
                 a = sample.int(9, RowCount, TRUE),
                 b = sample.int(9, RowCount, TRUE),
                 c = sample.int(9, RowCount, TRUE),
                 d = sample.int(9, RowCount, TRUE),
                 e = sample.int(9, RowCount, TRUE),
                 f = sample.int(9, RowCount, TRUE))

## Generate an expression to paste an arbitrary list of columns together
ConcatCols <- list("a","b","c","d","e","f")
## Do it 3x as many times
ConcatCols <- c(ConcatCols,ConcatCols,ConcatCols)


ptm <- proc.time()
preallocated_target <- character(RowCount)
column_indices <- sapply(ConcatCols, FUN = function(x)  which(colnames(DT) == x ))
x <- worker_fun(DT, preallocated_target, column_indices, as.integer(1), as.integer(RowCount))
DT[, State := preallocated_target]
proc.time() - ptm

虽然您的(仅整数)示例在我的 PC 上运行了大约 20 秒,但它在大约 5 秒内运行并且可以轻松并行化。

注意事项:

代码尚未准备好用于生产 - 应对函数输入进行大量健全性检查(尤其是检查所有列的长度是否相同、检查列类型、preallocated_target 大小等) 该函数将其输出放入预先分配的字符向量中,这是非标准且丑陋的(R 通常没有传递引用语义),但允许并行化(见下文)。 最后两个参数是要处理的开始行和结束行,再一次,这是为了并行化 该函数接受列索引而不是列名。所有列都必须是整数类型。 除了输入 data.table 和 preallocated_target 之外,输入必须是整数 不包括函数的编译时间(因为您应该事先编译它 - 甚至可以制作一个包)

并行化

编辑: 由于clusterExport 和R 字符串存储的工作方式,下面的方法实际上会失败。因此并行化可能也需要在 C 中完成,类​​似于在 data.table 中实现的方式。

由于您无法跨 R 进程传递内联编译的函数,因此并行化需要更多工作。为了能够并行使用上述函数,您需要使用 R 编译器单独编译并使用 dyn.load 或将其包装在一个包中或使用分叉后端进行并行(我没有,分叉仅适用于在 UNIX 上)。

然后并行运行看起来像(未测试):

no_cores <- detectCores()

# Initiate cluster
cl <- makeCluster(no_cores)

#Preallocated target and prepare params
num_elements <- length(DT[[1]])
preallocated_target <- character(num_elements)
block_size <- 4096 #No of rows processed at once. Adjust for best performance
column_indices <- sapply(ConcatCols, FUN = function(x)  which(colnames(DT) == x ))

num_blocks <- ceiling(num_elements / block_size)

clusterExport(cl, 
   c("DT","preallocated_target","column_indices","num_elements", "block_size"))
clusterEvalQ(cl, <CODE TO LOAD THE NATIVE FUNCTION HERE>)

parLapply(cl, 1:num_blocks ,
          function(block_id)
          
            throw_away <- 
              worker_fun(DT, preallocated_target, columns, 
              (block_id - 1) * block_size + 1, min(num_elements, block_id * block_size - 1))
            return(NULL)
          )



stopCluster(cl)

【讨论】:

感谢您付出的所有努力!按原样运行您的代码我得到了 8.5 秒 的运行时间,使用 sprintf() 从基线 20.5 秒加快了 2.4 倍。我目前正在尝试逐行工作以尝试了解每个部分在做什么,但这里似乎有一些非常可靠的潜力!我可能会尝试将其放入一个函数包中,以便可以预编译它以使用 OpenMP 并允许非整数、可变长度输入。如果我能做到这一点,那么我想我们可能会有赢家! 不确定你打算在这个兔子洞里走多远,但我最后把这些放在了 github 上的一个包中,地址是 msummersgill/fastConcat。目前只是试图将相同的代码编译成 R 函数,我猜 inline 包正在抽象出我缺少的一些东西,以便将它作为独立的 C/C++ 启动和运行。 老实说,那是我为 R 编写 C 代码的第一次体验,所以我也不知道什么是让 C 代码在包中工作的必要条件 :-) 不过玩得很开心。请注意,内联可以为您提供完整的源代码(只要出现编译时错误,它就会显示它)。我相信 OpenMP 的东西和其他列类型的 writeXX 函数可以很容易地从 data.table 中的 fwrite.c 中获取,只需稍作修改即可使其实际工作。 感谢内联提示!我从inline::code(worker_fun) 的输出重新开始,包实际上运行了!现在开始调整它,如果用户指定 sep = ""(我的整数用例),我添加了一个 if-else 以跳过分隔符部分,现在它下降到 3.1 秒,a 从sprintf()baseline 提高 6.6 倍!【参考方案2】:

我不知道样本数据对于您的实际数据有多大的代表性,但对于您的样本数据,您只需将每个独特的 ConcatCols 组合连接一次而不是多次,就可以显着提高性能。

这意味着对于样本数据,如果您也进行所有重复,您将看到约 500k 的连接,而 1000 万。

参见以下代码和时序示例:

system.time(
  setkeyv(DT, ConcatCols)
  DTunique <- unique(DT[, ConcatCols, with=FALSE], by = key(DT))
  DTunique[, State :=  do.call(paste, c(DTunique, sep = ""))]
  DT[DTunique, State := i.State, on = ConcatCols]
)
#       user      system     elapsed 
#      7.448       0.462       4.618 

大约一半的时间花在setkey 部分上。如果您的数据已经被键入,则时间会进一步缩短到 2 秒多一点。

setkeyv(DT, ConcatCols)
system.time(
  DTunique <- unique(DT[, ConcatCols, with=FALSE], by = key(DT))
  DTunique[, State :=  do.call(paste, c(DTunique, sep = ""))]
  DT[DTunique, State := i.State, on = ConcatCols]
)
#       user      system     elapsed 
#      2.526       0.280       2.181 

【讨论】:

这也是一个很好的答案!我的实际数据通常只有大约 10,000 个唯一组合(无论我处理的是 100 万还是 1 亿),因此这对于我的应用程序来说是一种非常有效的方法。使用更具代表性的数据集(1000 万行,18 列,9216 个唯一组合),此方法执行时间 5.2 秒,仅比 根据@Martin Modrák 的回答,msummersgill/fastConcat 中的定制函数 fastConcat::concat() 的运行时间为 3.4 秒 如果您说的唯一组合的最大数量限制在 10k,我怀疑我的方法将比其他答案更好地扩展,例如 1 亿行。但是我没有测试过 非常聪明的把戏!您或许可以将这两种方法结合起来更进一步。 @MartinModrák,谢谢!是的,当然两者的结合应该很快。我也喜欢你的回答,但由于我对 C 代码的理解有限,我更喜欢保持简单,只使用 data.table。 不错!我会使用你的方法,但另一种 data.tablish 方式:DT[, z := do.call(paste0, .BY), by=ConcatCols](需要多 3 倍的时间)【参考方案3】:

这使用来自包tidyrunite。可能不是最快的,但它可能比手动编码的 R 代码更快。

library(tidyr)
system.time(
  DNew <- DT %>% unite(State, ConcatCols, sep = "", remove = FALSE)
)
# user  system elapsed 
# 14.974   0.183  15.343 

DNew[1:10]
# State   x   y a b c d e f
# 1: foo211621bar foo bar 2 1 1 6 2 1
# 2: foo532735bar foo bar 5 3 2 7 3 5
# 3: foo965776bar foo bar 9 6 5 7 7 6
# 4: foo221284bar foo bar 2 2 1 2 8 4
# 5: foo485976bar foo bar 4 8 5 9 7 6
# 6: foo566778bar foo bar 5 6 6 7 7 8
# 7: foo892636bar foo bar 8 9 2 6 3 6
# 8: foo836672bar foo bar 8 3 6 6 7 2
# 9: foo963926bar foo bar 9 6 3 9 2 6
# 10: foo385216bar foo bar 3 8 5 2 1 6

【讨论】:

不确定您正在运行哪个基准测试,但在具有 1000 万行的原始基准上,tidyr::unite() 在我的服务器上花费了 25.7 秒,而基本 R @ 花费了 23.6 秒987654326@。当您查看source code on github 时,这种稍慢的执行非常有意义,因为事实证明,unite 只是围绕基础 R paste() 编写的包装函数。仅整数连接的第二个示例不会产生与其他基准方法相同的结果,因为列不重复。

以上是关于将 data.table 列快速连接成一个字符串列的主要内容,如果未能解决你的问题,请参考以下文章

data.table 具有两个字符串列的集合元素,提取唯一的行,每行未排序

将日期时间列转换为字符串列

将数据框“df1”的字符串列与数据框“df2”的另一个字符串列进行比较,基于哪些其他列进行比较

将具有日期和纪元格式值的字符串列转换为 postgresql/Tableau prep 中的日期列

Spark将布尔列与字符串列进行比较与比较值相等的int和字符串的工作方式不同

data.table 连接然后将列添加到现有的 data.frame 而无需重新复制