如何在 Spark 中训练具有稀疏矩阵的随机森林?
Posted
技术标签:
【中文标题】如何在 Spark 中训练具有稀疏矩阵的随机森林?【英文标题】:How can I train a random forest with a sparse matrix in Spark? 【发布时间】:2018-11-14 22:46:36 【问题描述】:考虑这个使用sparklyr
的简单示例:
library(sparklyr)
library(janeaustenr) # to get some text data
library(stringr)
library(dplyr)
mytext <- austen_books() %>%
mutate(label = as.integer(str_detect(text, 'great'))) #create a fake label variable
mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE)
# Source: table<mytext_spark> [?? x 3]
# Database: spark_connection
text book label
<chr> <chr> <int>
1 SENSE AND SENSIBILITY Sense & Sensibility 0
2 "" Sense & Sensibility 0
3 by Jane Austen Sense & Sensibility 0
4 "" Sense & Sensibility 0
5 (1811) Sense & Sensibility 0
6 "" Sense & Sensibility 0
7 "" Sense & Sensibility 0
8 "" Sense & Sensibility 0
9 "" Sense & Sensibility 0
10 CHAPTER 1 Sense & Sensibility 0
11 "" Sense & Sensibility 0
12 "" Sense & Sensibility 0
13 The family of Dashwood had long been settled in Sussex. Their estate Sense & Sensibility 0
14 was large, and their residence was at Norland Park, in the centre of Sense & Sensibility 0
15 their property, where, for many generations, they had lived in so Sense & Sensibility 0
16 respectable a manner as to engage the general good opinion of their Sense & Sensibility 0
数据框的大小相当小(大约70k
行和14k
唯一词)。
现在,在我的集群上训练 naive bayes
模型只需几秒钟。
首先,我定义pipeline
pipeline <- ml_pipeline(sc) %>%
ft_regex_tokenizer(input.col='text',
output.col = 'mytoken',
pattern = "\\s+",
gaps =TRUE) %>%
ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%
ml_naive_bayes( label_col = "label",
features_col = "finaltoken",
prediction_col = "pcol",
probability_col = "prcol",
raw_prediction_col = "rpcol",
model_type = "multinomial",
smoothing = 0,
thresholds = c(1, 1))
然后训练naive bayes
模型
> library(microbenchmark)
> microbenchmark(model <- ml_fit(pipeline, mytext_spark),times = 3)
Unit: seconds
expr min lq mean median uq max neval
model <- ml_fit(pipeline, mytext_spark) 6.718354 6.996424 7.647227 7.274494 8.111663 8.948832 3
现在的问题是,尝试在同一个(实际上很小!!)数据集上运行任何基于 tree
的模型(random forest
、boosted trees
等)都行不通。
pipeline2 <- ml_pipeline(sc) %>%
ft_regex_tokenizer(input.col='text',
output.col = 'mytoken',
pattern = "\\s+",
gaps =TRUE) %>%
ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%
ml_gbt_classifier( label_col = "label",
features_col = "finaltoken",
prediction_col = "pcol",
probability_col = "prcol",
raw_prediction_col = "rpcol",
max_memory_in_mb = 10240,
cache_node_ids = TRUE)
model2 <- ml_fit(pipeline2, mytext_spark)
# wont work :(
错误:org.apache.spark.SparkException:作业因阶段失败而中止:阶段 69.0 中的任务 0 失败 4 次,最近一次失败: 在阶段 69.0 中丢失任务 0.3(TID 1580、1.1.1.1.1、执行程序 5): java.lang.IllegalArgumentException:大小超过 Integer.MAX_VALUE
我认为这是由于令牌的矩阵表示的稀疏性,但是这里有什么可以做的吗?这是sparklyr
的问题吗? spark
有问题吗?我的代码效率低吗?
谢谢!
【问题讨论】:
【参考方案1】:您收到此错误是因为您实际上达到了我们在 Spark https://issues.apache.org/jira/browse/SPARK-6235 中著名的 2G 限制
解决方案是在将数据输入算法之前重新分区。
这实际上是这篇文章中的两个陷阱:
使用本地数据。 Spark 中基于树的模型需要大量内存。那么,让我们来看看你的代码,它看起来是无害的;
library(janeaustenr) # to get some text data
library(stringr)
mytext <- austen_books() %>%
mutate(label = as.integer(str_detect(text, 'great'))) # create a fake label variable
mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE)
那么最后一行是做什么的呢?
copy_to
(不是为大数据集设计的),实际上只是将本地R数据帧复制到一个1分区的Spark DataFrame
因此,您只需对数据重新分区,以确保一旦管道在输入gbt
之前准备好您的数据,分区大小小于 2GB。
因此,您只需执行以下操作即可重新分区数据:
# 20 is an arbitrary number I chose to test and it seems to work well in this case,
# you might want to reconsider that if you have a bigger dataset.
mytext_spark <-
copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE) %>%
sdf_repartition(partitions = 20)
PS1: max_memory_in_mb
是您为 gbt
提供的内存量,用于计算其统计信息。它与作为输入的数据量没有直接关系。
PS2:如果您没有为执行程序设置足够的内存,您可能会遇到java.lang.OutOfMemoryError : GC overhead limit exceeded
编辑:重新分区数据是什么意思?
在谈论重新分区之前,我们总是可以参考什么是分区的定义。我会尽量简短。
分区是大型分布式数据集的逻辑块。
Spark 使用分区来管理数据,有助于以最小的网络流量并行化分布式数据处理,以便在执行程序之间发送数据。 默认情况下,Spark 会尝试从靠近它的节点将数据读入 RDD。由于 Spark 通常访问分布式分区数据,为了优化转换操作,它会创建分区来保存数据块。
增加分区数将使每个分区的数据更少(或根本没有!)
来源:摘自@JacekLaskowski Mastering Apache Spark book。
但数据分区并不总是正确的,就像在这种情况下一样。所以需要重新分区。 (sdf_repartition
代表sparklyr
)
sdf_repartition
会将您的数据分散并打乱您的节点。即sdf_repartition(20)
将为您的数据创建 20 个分区,而不是在这种情况下您最初拥有的 1 个。
我希望这会有所帮助。
整个代码:
library(sparklyr)
library(dplyr)
config <- spark_config()
config$`sparklyr.shell.driver-memory` <- "4G"
config$`sparklyr.shell.executor-memory` <- "4G"
Sys.setenv(SPARK_HOME = "/Users/eliasah/server/spark-2.3.1-SNAPSHOT-bin-2.7.3")
sc <- spark_connect(master = "local", config = config)
library(janeaustenr) # to get some text data
library(stringr)
mytext <- austen_books() %>%
mutate(label = as.integer(str_detect(text, 'great'))) #create a fake label variable
mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE) %>% sdf_repartition(partitions = 20)
pipeline <- ml_pipeline(sc) %>%
ft_regex_tokenizer(input.col='text',
output.col = 'mytoken',
pattern = "\\s+",
gaps =TRUE) %>%
ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%
ml_naive_bayes( label_col = "label",
features_col = "finaltoken",
prediction_col = "pcol",
probability_col = "prcol",
raw_prediction_col = "rpcol",
model_type = "multinomial",
smoothing = 0,
thresholds = c(1, 1))
library(microbenchmark)
microbenchmark(model <- ml_fit(pipeline, mytext_spark),times = 3)
pipeline2 <- ml_pipeline(sc) %>%
ft_regex_tokenizer(input.col='text',
output.col = 'mytoken',
pattern = "\\s+",
gaps =TRUE) %>%
ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%
ml_gbt_classifier( label_col = "label",
features_col = "finaltoken",
prediction_col = "pcol",
probability_col = "prcol",
raw_prediction_col = "rpcol",
max_memory_in_mb = 10240, # this is amount of data that can be use for
cache_node_ids = TRUE)
model2 <- ml_fit(pipeline2, mytext_spark)
pipeline3 <- ml_pipeline(sc) %>%
ft_regex_tokenizer(input.col='text',
output.col = 'mytoken',
pattern = "\\s+",
gaps =TRUE) %>%
ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken')
model2
# PipelineModel (Transformer) with 3 stages
# <pipeline_1ce45bb8b7a7>
# Stages
# |--1 RegexTokenizer (Transformer)
# | <regex_tokenizer_1ce4342b543b>
# | (Parameters -- Column Names)
# | input_col: text
# | output_col: mytoken
# |--2 CountVectorizerModel (Transformer)
# | <count_vectorizer_1ce4e0e6489>
# | (Parameters -- Column Names)
# | input_col: mytoken
# | output_col: finaltoken
# | (Transformer Info)
# | vocabulary: <list>
# |--3 GBTClassificationModel (Transformer)
# | <gbt_classifier_1ce41ab30213>
# | (Parameters -- Column Names)
# | features_col: finaltoken
# | label_col: label
# | prediction_col: pcol
# | probability_col: prcol
# | raw_prediction_col: rpcol
# | (Transformer Info)
# | feature_importances: num [1:39158] 6.73e-04 7.20e-04 1.01e-15 1.97e-03 0.00 ...
# | num_classes: int 2
# | num_features: int 39158
# | total_num_nodes: int 540
# | tree_weights: num [1:20] 1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 ...
# | trees: <list>
【讨论】:
数据在磁盘上小于 6MB,在内存中小于 4.1 MB。这与 2GB 的限制相差甚远(甚至纠正了 Spark 的低效率)只是说...... :) 我同意,但出于某种原因。根据日志,1 个分区中的 250k 词汇达到 2005mb... 我认为这并不是真正的核心问题。看看特征提取管道,思考如何导出决策树。 @ℕʘʘḆḽḘ 也许,但这是一个冗长的答案,并不完全适合 SO。通过细微的调整,我可以将模型安装在单个虚拟内核/1GB 内存上,尽管它需要很长时间(大约 8 小时左右,对于 5MB 的数据并不完全令人满意)。如果没有人提供完全令人满意的答案,我将在本周晚些时候尝试重新访问,并提供一些建议。 @ℕʘʘḆḽḘ 你的集群设置怎么样? (内存、资源管理器等)【参考方案2】:能否提供完整的错误回溯?
我的猜测是您的内存不足。随机森林和 gbt 树是集成模型,因此它们比朴素贝叶斯需要更多的内存和计算能力。
尝试重新分区数据(spark.sparkContext.defaultParallelism 值是一个很好的起点),以便您的每个工作人员获得更小且分布更均匀的块。
如果这不起作用,请尝试将您的 max_memory_in_mb
参数减少到 256
。
【讨论】:
不,但等等,我有一个巨大的集群,这个数据集相对较小。我不认为这可能是由于内存错误。你能试着站在你这边,告诉我你得到了什么错误吗?为什么减少max_memory_in_mb
在这里有用?谢谢以上是关于如何在 Spark 中训练具有稀疏矩阵的随机森林?的主要内容,如果未能解决你的问题,请参考以下文章