spark 资源大小分配与并行处理
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark 资源大小分配与并行处理相关的知识,希望对你有一定的参考价值。
参考技术A 写这篇博客的起因我在跑一个spark job时,有时能跑通,有时跑不通。程序的需求是对比两个hbase表。程序逻辑很简单,分别从两个hbase表读取全量数据,然后以cogroup二者,对比同一个rowkey下每个列是否一致。跑不通的错误日志如下:
17/02/25 21:24:20 INFO collection.ExternalAppendOnlyMap: Thread 1896 spilling in-memory map of 83.6 MB to disk (46 times so far)
17/02/25 21:24:22 WARN server.TransportChannelHandler: Exception in connection from /10.110.1.57:57832
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
……
17/02/25 21:24:22 ERROR server.TransportRequestHandler: Error sending result ChunkFetchSuccessstreamChunkId=StreamChunkIdstreamId=1258210057016, chunkIndex=0, buffer=FileSegmentManagedBufferfile=/data-g/hadoop/yarn/local-dir/usercache/test/appcache/application_1466564207556_1562806/blockmgr-ebe23f0d-5a9e-4a37-952b-73bfab6cceed/3f/shuffle_0_6_0.data, offset=474965639, length=95049579 to /10.130.1.27:53263; closing connection
java.nio.channels.ClosedChannelException
17/02/25 21:24:22 ERROR client.TransportResponseHandler: Still have 1 requests outstanding when connection from c1-hd-dn8.bdp.idc/10.130.1.27:50014 is closed
17/02/25 21:24:22 INFO shuffle.RetryingBlockFetcher: Retrying fetch (1/3) for 1 outstanding blocks after 5000 ms
17/02/25 21:24:22 INFO collection.ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 84.1 MB to disk (44 times so far)
17/02/25 21:24:23 INFO collection.ExternalAppendOnlyMap: Thread 1895 spilling in-memory map of 83.9 MB to disk (47 times so far)
17/02/25 21:24:27 ERROR shuffle.RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks (after 1 retries)
java.io.IOException: Failed to connect to someHost/someIP:50004
……
Caused by: java.net.ConnectException: Connection refused: someHost/someIP:50004
……
针对上面这个问题,做了相关的尝试,解决了以下几个问题:
(1)运行spark job该分配多少资源,即我们该分配多少个executor?每个executor分配多少内存、多少个core?
(2)spark job 的并行度由什么因素决定?
(3)为什么yarn UI也的executor显示的used memory内存大小比配置的内存小?
该分配多少资源主要看输入量的大小、资源计算的复杂度。一般瓶颈会在shuffle阶段,如果执行某个shuffle的task内存不足,那很可能会跑不下去,程序挂掉。
spark中的计算任务都是一个个task单独执行,executor内存越多,单个task执行时内存越足,执行越顺利。 executor越多,core越多,可并行执行的task数目也就越多。假如总共100个task,5个executor,4个core,那么平均需要执行100/(5*4) = 5个批次;如果是2个executor,4个core,那么需要执行100/(2*4) = 13个批次。
core的数量一般根据内存大小和机器物理核数来定。最好不要超过物理核数。如果executor内存是4G,分配了4个core,那么每个core只有4G/4 = 1G内存。所以core不宜太大,如果太大,每个task执行时的内存将会变小,影响正常执行。
举个例子,我们的输入是两个hbase表,均为3.5G。shuffle阶段两个表会根据rowkey 做join,会产生几十G的shuffle数据。我们这样设置资源:
--driver-memory 1g \
--executor-memory 4g \
--num-executors 6 \
--executor-cores 4 \
并行度分为理论上最大的并行度和实际执行的并行度两种,“理论上”指的是总共的partition数目,一个partition对应一个task执行,如果数据有100个partition,那么理论上并行度最高可以达到100。“实际执行”指的是这些task实际分到executor各个core执行时的并行度。加入有100个partition,但是分配的资源只有10个executor,每个executor2个core,那么他们的并行度是10*2=40, 实际执行时会分批执行,分为100/(10*2) = 5批。我们一般讨论的并行度是理论上的并行度。
并行度(partition数目)由初始数据大小、初始数据类型,程序中设定的numPartitions大小,分配资源的executor、core数目共同决定。 并行度一般在shuffle时发生改变,如果未设定,则默认取上一个stage中最大的partition数目作为当前stage的并行度。所以如果不做设定,那么并行度与初始数据的并行度紧密相关。
1.初始数据文件类型因素
如果读入的数据为hdfs文件,那么默认的并行度是block数量。block大小默认是64MB或128MB。
如果读入的数据是hbase表,那么默认的并行度是表的region数目。
2.人为设定numPartitions
如果人为的在读取数据或者在shuffle类算子中设定numPartitions,那么整体的并行度将会以人为设定的为准。
3.人为设定spark.default.parallelism
spark.default.parallelism参数是全局的,优先级低于人为设定的numPartiton。在shuffle时,如果没有设定numPartiton,那么将为以spark.default.parallelism设定的数目作为并行度。
4.系统默认的spark.default.parallelism
系统默认的spark.default.parallelism = executor数目*core数目
以上4个因素的优先级:
1.numPartitions参数 > 2. spark.default.parallelism参数 > 3. 读取初始文件产生的并行度
spark中的内存分为多个部分,UI页面上显示的只是缓存RDD用的storage memory,大约是(总内存 - 300M) * 60% * 50% 的量,所以会偏小。具体内存分配如下图:
以上。
Spark性能调优之合理设置并行度
Spark性能调优之合理设置并行度
1.Spark的并行度指的是什么?
task没有设置,或者设置的很少,比如就设置了,100个task 。 50个executor ,每个executor 有3个core ,也就是说
Application 任何一个stage运行的时候,都有总数150个cpu core ,可以并行运行。但是,你现在只有100个task ,平均分配一下,每个executor 分配到2个task,ok,那么同时在运行的task,只有100个task,每个executor 只会并行运行 2个task。 每个executor 剩下的一个cpu core 就浪费掉了!你的资源,虽然分配充足了,但是问题是, 并行度没有与资源相匹配,导致你分配下去的资源都浪费掉了。合理的并行度的设置,应该要设置的足够大,大到可以完全合理的利用你的集群资源; 比如上面的例子,总共集群有150个cpu core ,可以并行运行150个task。那么你就应该将你的Application 的并行度,至少设置成150个,才能完全有效的利用你的集群资源,让150个task ,并行执行,而且task增加到150个以后,即可以同时并行运行,还可以让每个task要处理的数量变少; 比如总共 150G 的数据要处理, 如果是100个task ,每个task 要计算1.5G的数据。 现在增加到150个task,每个task只要处理1G数据。
2.如何去提高并行度?
1、task数量,至少设置成与spark Application 的总cpu core 数量相同(最理性情况,150个core,分配150task,一起运行,差不多同一时间运行完毕)官方推荐,task数量,设置成spark Application 总cpu core数量的2~3倍 ,比如150个cpu core ,基本设置 task数量为 300~ 500. 与理性情况不同的,有些task 会运行快一点,比如50s 就完了,有些task 可能会慢一点,要一分半才运行完,所以如果你的task数量,刚好设置的跟cpu core 数量相同,可能会导致资源的浪费,因为 比如150task ,10个先运行完了,剩余140个还在运行,但是这个时候,就有10个cpu core空闲出来了,导致浪费。如果设置2~3倍,那么一个task运行完以后,另外一个task马上补上来,尽量让cpu core不要空闲。同时尽量提升spark运行效率和速度。提升性能。
2、如何设置一个Spark Application的并行度?
spark.defalut.parallelism 默认是没有值的,如果设置了值比如说10,是在shuffle的过程才会起作用(val rdd2 = rdd1.reduceByKey(_+_) //rdd2的分区数就是10,rdd1的分区数不受这个参数的影响)
new SparkConf().set(“spark.defalut.parallelism”,”“500)
以上是关于spark 资源大小分配与并行处理的主要内容,如果未能解决你的问题,请参考以下文章