如何在 Spark 中训练具有稀疏矩阵的随机森林?

Posted

技术标签:

【中文标题】如何在 Spark 中训练具有稀疏矩阵的随机森林?【英文标题】:How can I train a random forest with a sparse matrix in Spark? 【发布时间】:2018-11-14 22:46:36 【问题描述】:

考虑这个使用sparklyr的简单示例:

library(sparklyr)
library(janeaustenr) # to get some text data
library(stringr)
library(dplyr)

mytext <- austen_books() %>% 
  mutate(label = as.integer(str_detect(text, 'great'))) #create a fake label variable

mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE)

# Source:   table<mytext_spark> [?? x 3]
# Database: spark_connection
   text                                                                    book                label
   <chr>                                                                   <chr>               <int>
 1 SENSE AND SENSIBILITY                                                   Sense & Sensibility     0
 2 ""                                                                      Sense & Sensibility     0
 3 by Jane Austen                                                          Sense & Sensibility     0
 4 ""                                                                      Sense & Sensibility     0
 5 (1811)                                                                  Sense & Sensibility     0
 6 ""                                                                      Sense & Sensibility     0
 7 ""                                                                      Sense & Sensibility     0
 8 ""                                                                      Sense & Sensibility     0
 9 ""                                                                      Sense & Sensibility     0
10 CHAPTER 1                                                               Sense & Sensibility     0
11 ""                                                                      Sense & Sensibility     0
12 ""                                                                      Sense & Sensibility     0
13 The family of Dashwood had long been settled in Sussex.  Their estate   Sense & Sensibility     0
14 was large, and their residence was at Norland Park, in the centre of    Sense & Sensibility     0
15 their property, where, for many generations, they had lived in so       Sense & Sensibility     0
16 respectable a manner as to engage the general good opinion of their     Sense & Sensibility     0

数据框的大小相当小(大约70k 行和14k 唯一词)。

现在,在我的集群上训练 naive bayes 模型只需几秒钟。 首先,我定义pipeline

pipeline <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken', 
                     pattern = "\\s+", 
                     gaps =TRUE) %>% 
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>% 
  ml_naive_bayes( label_col = "label", 
                  features_col = "finaltoken", 
                  prediction_col = "pcol",
                  probability_col = "prcol", 
                  raw_prediction_col = "rpcol",
                  model_type = "multinomial", 
                  smoothing = 0, 
                  thresholds = c(1, 1))

然后训练naive bayes 模型

> library(microbenchmark)
> microbenchmark(model <- ml_fit(pipeline, mytext_spark),times = 3)
Unit: seconds
                                    expr      min       lq     mean   median       uq      max neval
 model <- ml_fit(pipeline, mytext_spark) 6.718354 6.996424 7.647227 7.274494 8.111663 8.948832     3

现在的问题是,尝试在同一个(实际上很小!!)数据集上运行任何基于 tree 的模型(random forestboosted trees 等)都行不通。

pipeline2 <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken', 
                     pattern = "\\s+", 
                     gaps =TRUE) %>% 
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>% 
  ml_gbt_classifier( label_col = "label", 
                     features_col = "finaltoken", 
                     prediction_col = "pcol",
                     probability_col = "prcol", 
                     raw_prediction_col = "rpcol",
                     max_memory_in_mb = 10240,
                     cache_node_ids = TRUE)

model2 <- ml_fit(pipeline2, mytext_spark)
# wont work :(

错误:org.apache.spark.SparkException:作业因阶段失败而中止:阶段 69.0 中的任务 0 失败 4 次,最近一次失败: 在阶段 69.0 中丢失任务 0.3(TID 1580、1.1.1.1.1、执行程序 5): java.lang.IllegalArgumentException:大小超过 Integer.MAX_VALUE

我认为这是由于令牌的矩阵表示的稀疏性,但是这里有什么可以做的吗?这是sparklyr 的问题吗? spark 有问题吗?我的代码效率低吗?

谢谢!

【问题讨论】:

【参考方案1】:

您收到此错误是因为您实际上达到了我们在 Spark https://issues.apache.org/jira/browse/SPARK-6235 中著名的 2G 限制

解决方案是在将数据输入算法之前重新分区。

这实际上是这篇文章中的两个陷阱:

使用本地数据。 Spark 中基于树的模型需要大量内存。

那么,让我们来看看你的代码,它看起来是无害的;

 library(janeaustenr) # to get some text data
 library(stringr)

 mytext <- austen_books() %>% 
    mutate(label = as.integer(str_detect(text, 'great'))) # create a fake label variable

 mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE)

那么最后一行是做什么的呢?

copy_to(不是为大数据集设计的),实际上只是将本地R数据帧复制到一个1分区的Spark DataFrame

因此,您只需对数据重新分区,以确保一旦管道在输入gbt 之前准备好您的数据,分区大小小于 2GB。

因此,您只需执行以下操作即可重新分区数据:

# 20 is an arbitrary number I chose to test and it seems to work well in this case, 
# you might want to reconsider that if you have a bigger dataset.
mytext_spark <- 
 copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE) %>% 
 sdf_repartition(partitions = 20)

PS1: max_memory_in_mb 是您为 gbt 提供的内存量,用于计算其统计信息。它与作为输入的数据量没有直接关系。

PS2:如果您没有为执行程序设置足够的内存,您可能会遇到java.lang.OutOfMemoryError : GC overhead limit exceeded

编辑:重新分区数据是什么意思?

在谈论重新分区之前,我们总是可以参考什么是分区的定义。我会尽量简短。

分区是大型分布式数据集的逻辑块。

Spark 使用分区来管理数据,有助于以最小的网络流量并行化分布式数据处理,以便在执行程序之间发送数据。 默认情况下,Spark 会尝试从靠近它的节点将数据读入 RDD。由于 Spark 通常访问分布式分区数据,为了优化转换操作,它会创建分区来保存数据块。

增加分区数将使每个分区的数据更少(或根本没有!)

来源:摘自@JacekLaskowski Mastering Apache Spark book。

但数据分区并不总是正确的,就像在这种情况下一样。所以需要重新分区。 (sdf_repartition 代表sparklyr

sdf_repartition 会将您的数据分散并打乱您的节点。即sdf_repartition(20) 将为您的数据创建 20 个分区,而不是在这种情况下您最初拥有的 1 个。

我希望这会有所帮助。

整个代码:

library(sparklyr)
library(dplyr)
config <- spark_config()
config$`sparklyr.shell.driver-memory` <- "4G"
config$`sparklyr.shell.executor-memory` <- "4G"
Sys.setenv(SPARK_HOME = "/Users/eliasah/server/spark-2.3.1-SNAPSHOT-bin-2.7.3")
sc <- spark_connect(master = "local", config = config)

library(janeaustenr) # to get some text data
library(stringr)

mytext <- austen_books() %>% 
  mutate(label = as.integer(str_detect(text, 'great'))) #create a fake label variable

mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE) %>% sdf_repartition(partitions = 20)

pipeline <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken', 
                     pattern = "\\s+", 
                     gaps =TRUE) %>% 
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>% 
  ml_naive_bayes( label_col = "label", 
                  features_col = "finaltoken", 
                  prediction_col = "pcol",
                  probability_col = "prcol", 
                  raw_prediction_col = "rpcol",
                  model_type = "multinomial", 
                  smoothing = 0, 
                  thresholds = c(1, 1))

library(microbenchmark)
microbenchmark(model <- ml_fit(pipeline, mytext_spark),times = 3)

pipeline2 <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken', 
                     pattern = "\\s+", 
                     gaps =TRUE) %>% 
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>% 
  ml_gbt_classifier( label_col = "label", 
                     features_col = "finaltoken", 
                     prediction_col = "pcol",
                     probability_col = "prcol", 
                     raw_prediction_col = "rpcol",
                     max_memory_in_mb = 10240, # this is amount of data that can be use for 
                     cache_node_ids = TRUE)

model2 <- ml_fit(pipeline2, mytext_spark)

pipeline3 <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken', 
                     pattern = "\\s+", 
                     gaps =TRUE) %>% 
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') 

model2
# PipelineModel (Transformer) with 3 stages
# <pipeline_1ce45bb8b7a7> 
#   Stages 
# |--1 RegexTokenizer (Transformer)
# |    <regex_tokenizer_1ce4342b543b> 
# |     (Parameters -- Column Names)
# |      input_col: text
# |      output_col: mytoken
# |--2 CountVectorizerModel (Transformer)
# |    <count_vectorizer_1ce4e0e6489> 
# |     (Parameters -- Column Names)
# |      input_col: mytoken
# |      output_col: finaltoken
# |     (Transformer Info)
# |      vocabulary: <list> 
# |--3 GBTClassificationModel (Transformer)
# |    <gbt_classifier_1ce41ab30213> 
# |     (Parameters -- Column Names)
# |      features_col: finaltoken
# |      label_col: label
# |      prediction_col: pcol
# |      probability_col: prcol
# |      raw_prediction_col: rpcol
# |     (Transformer Info)
# |      feature_importances:  num [1:39158] 6.73e-04 7.20e-04 1.01e-15 1.97e-03 0.00 ... 
# |      num_classes:  int 2 
# |      num_features:  int 39158 
# |      total_num_nodes:  int 540 
# |      tree_weights:  num [1:20] 1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 ... 
# |      trees: <list> 

【讨论】:

数据在磁盘上小于 6MB,在内存中小于 4.1 MB。这与 2GB 的限制相差甚远(甚至纠正了 Spark 的低效率)只是说...... :) 我同意,但出于某种原因。根据日志,1 个分区中的 250k 词汇达到 2005mb... 我认为这并不是真正的核心问题。看看特征提取管道,思考如何导出决策树。 @ℕʘʘḆḽḘ 也许,但这是一个冗长的答案,并不完全适合 SO。通过细微的调整,我可以将模型安装在单个虚拟内核/1GB 内存上,尽管它需要很长时间(大约 8 小时左右,对于 5MB 的数据并不完全令人满意)。如果没有人提供完全令人满意的答案,我将在本周晚些时候尝试重新访问,并提供一些建议。 @ℕʘʘḆḽḘ 你的集群设置怎么样? (内存、资源管理器等)【参考方案2】:

能否提供完整的错误回溯?

我的猜测是您的内存不足。随机森林和 gbt 树是集成模型,因此它们比朴素贝叶斯需要更多的内存和计算能力。

尝试重新分区数据(spark.sparkContext.defaultParallelism 值是一个很好的起点),以便您的每个工作人员获得更小且分布更均匀的块。

如果这不起作用,请尝试将您的 max_memory_in_mb 参数减少到 256

【讨论】:

不,但等等,我有一个巨大的集群,这个数据集相对较小。我不认为这可能是由于内存错误。你能试着站在你这边,告诉我你得到了什么错误吗?为什么减少 max_memory_in_mb 在这里有用?谢谢

以上是关于如何在 Spark 中训练具有稀疏矩阵的随机森林?的主要内容,如果未能解决你的问题,请参考以下文章

spark-mllib 密集向量和稀疏向量

如何在 python 中的大型数据集上训练随机森林?

spark 随机森林 源码解析

spark 随机森林 源码解析

如何在 Spark 中处理最新的随机森林中的分类特征?

Spark Random Forest classifier 随机森林分类