重复调用以触发内存中的数据帧而减速
Posted
技术标签:
【中文标题】重复调用以触发内存中的数据帧而减速【英文标题】:Slowdown with repeated calls to spark dataframe in memory 【发布时间】:2019-01-28 04:33:09 【问题描述】:假设我有 40 个连续 (DoubleType
) 变量,我使用 ft_quantile_discretizer
将这些变量分成四分位数。识别所有变量的四分位数非常快,因为该函数支持一次执行多个变量。
接下来,我想对这些分桶变量进行一次热编码,但目前不支持通过一次调用对所有这些变量进行一次热编码的功能。因此,我通过循环遍历变量,一次为每个分桶变量传递ft_string_indexer
、ft_one_hot_encoder
和sdf_separate_column
。这可以完成工作。但是,随着循环的进行,它会大大减慢。我认为它内存不足,但无法弄清楚如何对其进行编程,以便它在变量之间以相同的速度执行。
如果q_vars
是连续变量的变量名字符数组(比如 40 个),我该如何以更高效的方式编写代码?
for (v in q_vars)
data_sprk_q<-data_sprk_q %>%
ft_string_indexer(v,paste0(v,"b"),"keep",string_order_type = "alphabetAsc") %>%
ft_one_hot_encoder(paste0(v,"b"),paste0(v,"bc")) %>%
sdf_separate_column(paste0(v,"bc"),into=q_vars_cat_list[[v]])
我还尝试将所有变量都作为单个大型管道执行,但这也没有解决问题,所以我认为它与循环本身没有任何关系。
test_text<-paste0("data_sprk_q<-data_sprk_q %>% ", paste0("ft_string_indexer('",q_vars,"',paste0('",q_vars,"','b'),'keep',string_order_type = 'alphabetAsc') %>% ft_one_hot_encoder(paste0('",q_vars,"','b'),paste0('",q_vars,"','bc')) %>% sdf_separate_column(paste0('",q_vars,"','bc'),into=",q_vars_cat_list,")",collapse=" %>% "))
eval(parse(text=test_text))
任何帮助将不胜感激。
【问题讨论】:
为什么要对声称已离散化的数据应用 StringIndexer?你为什么使用sdf_separate_column
?
@user6910411 我正在使用 sdf_separate_column 来拆分以其他方式存储为列表的列。在我的模型中,我想允许灵活地包含哪些桶,并且据我了解,如果我将一个热编码列表留在其中,我将无法做到这一点(除非我应用了一些正则化,删除系数为 0)。我正在应用 StringIndexer 来控制哪些存储桶获得哪些名称,因为至少在我的理解中,带有分隔列的默认命名与存储桶的频率有关。
【参考方案1】:
一般来说,由于 Catalyst 优化器的线性复杂性比线性复杂性更差,长 ML 流水线会出现一些(有时是相当大的)减速。除了将流程拆分为多个管道并打破其间的血统(使用检查点并将数据写入持久存储并加载回),目前您无能为力。
但是,您当前的代码在此之上添加了许多问题:
除非你使用超过 10 个桶 StringIndexer
ft_string_indexer(v ,paste0(v, "b"), "keep", string_order_type = "alphabetAsc")
只是复制QuantileDiscretizer
分配的标签。使用字典顺序时,级别数量越多,行为的用处就越少。
可能根本不需要应用 One-Hot-Encoding(在最坏的情况下可能有害),具体取决于下游流程,即使使用线性模型,也可能不是绝对必要的(您可以认为分配的标签是有效的序数,并且记录为名义值,并且增加维度不是期望的结果)。
不过最大的问题是sdf_separate_column
的应用。它
sparklyr
在每个索引上使用UserDefinedFunction
,有效地导致同一行的重复分配、解码和垃圾回收,给集群带来很大压力。
最后但同样重要的是,它丢弃了 Spark ML 广泛使用的列元数据。
我强烈建议不要在这里使用此功能。根据您的 cmets,您似乎希望在将结果传递给其他算法之前对列进行子集化 - 为此您可以使用 VectorSlicer
。
总体而言,您可以将管道重写为
set.seed(1)
df <- copy_to(sc, tibble(x=rnorm(100), y=runif(100), z=rpois(100, 1)))
input_cols <- colnames(df)
discretized_cols <- paste0(input_cols, "_d")
encoded_cols <- paste0(discretized_cols, "_e") %>% setNames(discretized_cols)
discretizer <- ft_quantile_discretizer(
sc, input_cols = input_cols, output_cols = discretized_cols, num_buckets = 10
)
encoders <- lapply(
discretized_cols,
function(x) ft_one_hot_encoder(sc, input_col=x, output_col=encoded_cols[x])
)
transformed_df <- do.call(ml_pipeline, c(list(discretizer), encoders)) %>%
ml_fit(df) %>%
ml_transform(df)
并在需要时申请ft_vector_slicer
。例如,要从x
中获取对应于第一个、第三个和第六个桶的值,您可以:
transformed_df %>%
ft_vector_slicer(
input_col="x_d_e", output_col="x_d_e_s", indices=c(0, 2, 5))
【讨论】:
感谢您非常彻底的回复!这很酷。非常感谢。 我对您的示例有几个后续问题,我试图将其推广到我的应用程序中。 (1) 为什么在do.call(ml_pipeline, ...)
语句中,discretizer
需要在list
内,而encoders
不需要?当我尝试使用与 ft_vector_slicer
类似的方法创建另一个管道时,我收到一条错误消息,提示缺少参数“x”。 (2) 如果每个编码变量的向量切片指令位于一个列表中,您将如何将ft_vector_slicer
指令添加到管道中?以上是关于重复调用以触发内存中的数据帧而减速的主要内容,如果未能解决你的问题,请参考以下文章
我正在尝试遍历字符串数组并触发 api 调用以重新加载集合视图