Spark,如何添加更多存储内存?

Posted

技术标签:

【中文标题】Spark,如何添加更多存储内存?【英文标题】:Spark, How can add more storage memory? 【发布时间】:2016-02-11 10:58:22 【问题描述】:

喂,

当我使用最大的数据集并且我正在使用 MlLib (ALS) 时,我多次遇到此错误

数据集有 3 列(用户、电影和评分)和 1.200.000 行

WARN TaskSetManager: Stage 0 contains a task of very large size (116722 KB). The maximum recommended task size is 100 KB.
Exception in thread "dispatcher-event-loop-3" java.lang.OutOfMemoryError: Java heap space

我的机器现在有 8 个内核、240Gb RAM 和 100GB 磁盘(50Gb 免费)

我想添加更多存储内存和更多执行程序,我设置(我正在使用 spyder IDE)

conf = SparkConf()
conf.set("spark.executor.memory", "40g")
conf.set("spark.driver.memory","20g")
conf.set("spark.executor.cores","8")
conf.set("spark.num.executors","16")
conf.set("spark.python.worker.memory","40g")
conf.set("spark.driver.maxResultSize","0")
sc = SparkContext(conf=conf)

但我还有这个:

我做错了什么?

我如何启动 Spark (PySpark - Spyder IDE)

import sys
import os
from py4j.java_gateway import JavaGateway
gateway = JavaGateway()


os.environ['SPARK_HOME']="C:/Apache/spark-1.6.0"
sys.path.append("C:/Apache/spark-1.6.0/python/")

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext

conf = SparkConf()
conf.set("spark.executor.memory", "25g")
conf.set("spark.driver.memory","10g")
conf.set("spark.executor.cores","8")
conf.set("spark.python.worker.memory","30g")
conf.set("spark.driver.maxResultSize","0")

sc = SparkContext(conf=conf)

结果

16/02/12 18:37:47 INFO SparkContext: Running Spark version 1.6.0
16/02/12 18:37:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/12 18:37:48 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
    at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:355)
    at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:370)
    at org.apache.hadoop.util.Shell.<clinit>(Shell.java:363)
    at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
    at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:104)
    at org.apache.hadoop.security.Groups.<init>(Groups.java:86)
    at org.apache.hadoop.security.Groups.<init>(Groups.java:66)
    at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:280)
    at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:271)
    at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:248)
    at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:763)
    at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:748)
    at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:621)
    at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2136)
    at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2136)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2136)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:322)
    at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:59)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
    at java.lang.reflect.Constructor.newInstance(Unknown Source)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:214)
    at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
    at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Unknown Source)
16/02/12 18:37:48 INFO SecurityManager: Changing view acls to: rmalveslocal
16/02/12 18:37:48 INFO SecurityManager: Changing modify acls to: rmalveslocal
16/02/12 18:37:48 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(rmalveslocal); users with modify permissions: Set(rmalveslocal)
16/02/12 18:37:48 INFO Utils: Successfully started service 'sparkDriver' on port 64280.
16/02/12 18:37:49 INFO Slf4jLogger: Slf4jLogger started
16/02/12 18:37:49 INFO Remoting: Starting remoting
16/02/12 18:37:49 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@10.10.5.105:64293]
16/02/12 18:37:49 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 64293.
16/02/12 18:37:49 INFO SparkEnv: Registering MapOutputTracker
16/02/12 18:37:49 INFO SparkEnv: Registering BlockManagerMaster
16/02/12 18:37:49 INFO DiskBlockManager: Created local directory at C:\Users\rmalveslocal\AppData\Local\Temp\1\blockmgr-4bd2f97f-8b4d-423d-a4e3-06f08ecdeca9
16/02/12 18:37:49 INFO MemoryStore: MemoryStore started with capacity 511.1 MB
16/02/12 18:37:49 INFO SparkEnv: Registering OutputCommitCoordinator
16/02/12 18:37:50 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/02/12 18:37:50 INFO SparkUI: Started SparkUI at http://10.10.5.105:4040
16/02/12 18:37:50 INFO Executor: Starting executor ID driver on host localhost
16/02/12 18:37:50 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 64330.
16/02/12 18:37:50 INFO NettyBlockTransferService: Server created on 64330
16/02/12 18:37:50 INFO BlockManagerMaster: Trying to register BlockManager
16/02/12 18:37:50 INFO BlockManagerMasterEndpoint: Registering block manager localhost:64330 with 511.1 MB RAM, BlockManagerId(driver, localhost, 64330)
16/02/12 18:37:50 INFO BlockManagerMaster: Registered BlockManager

【问题讨论】:

【参考方案1】:

您没有指定您正在使用的运行模式(独立、YARN、Mesos),但我假设您使用的是独立模式(用于一台服务器)

这里有三个概念

工作节点 - 运行一个或多个执行器的主机 Executor - 一个承载任务的容器 任务 - 在 执行者(共同构成工作的阶段的一部分 - 这两个术语 对本次讨论不重要)

独立模式下的默认设置是将所有可用的核心分配给一个执行器。在您的情况下,您还将其设置为 8,这等于您的所有核心。结果是您有一个使用所有内核的执行程序,并且由于您还将执行程序内存设置为 40G,因此您仅将一小部分内存用于 ti (40/240)

您可以增加执行程序的内存以允许更多任务并行运行(每个任务都有更多内存)或将核心数设置为 1 以便您能够托管 8 个执行程序(在这种情况下您可能希望将内存设置为较小的数字,因为 8*40=320)

【讨论】:

谢谢。我的想法是每个工作人员有 1 个核心,工作节点内有 2 个执行程序,但如果我理解正确,我错了,最好的组合是:conf.set("spark.executor.cores","1"), conf.set("spark.executor.cores","1"), conf.设置(“spark.num.executors”,“8”),conf.set(“spark.executor.memory”,“40g”)。我说的对吗? 8*40 太多(320 而你只有 240)你可能想要 spark.exeutor.cores =8 和 spark.executor.memory=180/190G (你会从 8 获得并发在同一个执行程序中运行的任务和更少的进程间通信开销)您需要为操作系统和 python 保留内存(您将其设置为 40G) @Arnon Rotem-Gal-Oz 谢谢,因为我的 spark.num.executors 是 8 对吗?因为我有 8 个核心和 8 个执行器,对吗? (一个使用所有核心的执行器) num cores 可以是 8 个(或为空,因为它在独立模式下默认使用 all)我不记得 num.executors 配置,但即使有也不会影响这里 - 你有一个使用所有机器的机器 我更改了配置,但我仍然遇到同样的问题,并且我的存储内存没有改变(511.1 MB,如我在问题中的上图)。为什么我把内存换成 2Gb,spark 不识别?

以上是关于Spark,如何添加更多存储内存?的主要内容,如果未能解决你的问题,请参考以下文章

Spark如何实现任务之间的内存公平?

spark 源码分析之十五 -- Spark内存管理剖析

如何从 Spark 中的 Slaves 内存中创建 RDD?

spark内存溢出及其解决方案

Spark 如何驱逐缓存的分区?

Apache Spark 与 MapReduce