optimization & error -01
Posted satyrs
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了optimization & error -01相关的知识,希望对你有一定的参考价值。
- 调优都是在场景限制之下。大部分选择并非一定。做测试来寻找瓶颈。(shuffle操作数量、RDD持久化操作数量以及gc)
- 开发调优、资源调优、数据倾斜调优、shuffle调优几个部分。
(涉及代码质量(api及数据结构),参数,数据质量,考虑内存与网络而选择的模式(广播、序列化),官网建议)
- RDD(主要是persist,以及代码质量)
一个RDD,重复使用,persist选择方式。persist(StorageLevel.MEMORY_AND_DISK_SER),选择时注意JVM的OOM异常。
ER表示,内存充足时优先持久化到内存中,内存不充足时持久化到磁盘文件中。
_SER后缀表示,使用序列化的方式来保存RDD数据。避免发生频繁GC。
后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上。网络开销。
- shuffle算子 (几个api根据使用情境的选择)
0 reduceByKey、join、distinct、repartition都减少使用。
1 map-side预聚合(见下)。
2 groupByKey
.reduceByKey(_ + _)
-----------------------
.groupByKey().map(t => (t._1, t._2.sum))//都是对ParisRDD操作
reduceByKey每个机器各自统计之后,lamdba再将结果reduce成最终结果。
groupByKey则所有相关kv pair都通过网络传输移动(shffle)。因此groupByKey移动数据时,若数据量大于单台执行机器内存总量,spark将数据保存磁盘,影响性能。保存时需要处理一个key的数据,当单个 key 的键值对超过内存容量会存在内存溢出的异常。
主要因groupByKey执行移动数据到执行机器时数据远大于reduceByKey。reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。
类似问题涉及shuffle还可使用小表的广播(见下)。
3 mapPartition
对大数据进行遍历,使用mapPartition而不是map,因为mapPartition是在每个partition中进行操作,一次函数调用会处理一个partition所有的数据,而不是一次函数调用处理一条,性能相对来说会高一些。也可以减少遍历时新建broadCastMap.value对象的空间消耗,同时匹配不到的数据也不会返回()。
4 foreachPartitions代替foreach
比如在foreach函数中,将RDD中所有数据写mysql,那么如果是普通的foreach算子,就会一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下;但是如果用foreachPartitions算子一次性处理一个partition的数据,那么对于每个partition,只要创建一个数据库连接即可,然后执行批量插入操作,此时性能是比较高的。
5 filter后进行coalesce操作
使用coalesce算子,手动减少RDD的partition数量,将RDD中的数据压缩到更少的partition中去。filter后partition可能过多,而数据较少,造成浪费,coalesce减少partition数目。
6 repartitionAndSortWithinPartitions替代repartition与sort类操作
如果需要在repartition重分区之后,还要进行排序,建议直接使用repartitionAndSortWithinPartitions算子。因为该算子可以一边进行重分区的shuffle操作,一边进行排序。shuffle与sort两个操作同时进行,比先shuffle再sort来说,性能可能是要高的。
- 广播 (外部大变量,)
算子函数中使用外部变量的场景(尤其是大变量,比如100M以上的大集合),那么此时就应该使用Spark的广播(Broadcast)功能来提升性能。
主要为了避免spark将外部变量制作多个副本到task级别通过网络传输到各个节点,以及内存占用导致频繁GC。广播则task共享,副本只到executor级别。
因为:在算子函数中,使用广播变量时,首先会判断当前task所在Executor内存中,是否有变量副本。
- kryo (序列化场景下)
使用场景:
- 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输。
- 将自定义的类型作为RDD的泛型类型时(比如JavaRDD),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。
- 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。
Kryo要求最好要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
- 内存 (观察内存以及避免OOM)
1 检测对象内存消耗:创建RDD,然后放到cache里面去,然后在UI上面看storage的变化;或使用SizeEstimator来估算。
2 -XX:+UseCompressedOops选项可以压缩指针(8字节变成4字节)
“ Exception in thread “main” org.apache.spark.SparkException: Job aborted due to stage failure: Task 12 in stage 17.0 failed 4 times, most recent failure: Lost task 12.3 in stage 17.0 (TID 1257, ip-10-184-192-56.ec2.internal): ExecutorLostFailure (executor 79 lost)
之前使用groupBy时也出现过类似问题,删除limit语句后可以运行。也可从spark-defaults.conf文件中修改。改为spark.serializer org.apache.spark.serializer.JavaSerializer。
3 reduce task在当shuffle出现:比如sortByKey、groupByKey、reduceByKey和join等,需要hash table,消耗内存。解决办法:
- 增大level of parallelism,这样每个task的输入规模就相应减小
- 注意shuffle的内存上限设置。shuffle内存不够则性能变低。大量数据join等操作shuffle的内存上限经常配置到executor的50%。
4 当3中两个参数无法解决性能问题:考虑原始input大小,则必须增加内存。EC2机器类型选择考虑瓶颈。cache若不能改善性能则避免使用。
- 网络 (bug--序列化及心跳时间)
1 executor或task失去问题,或出现timeout,解决:
- spark.network.timeout,设置高些,300或480。默认为2分钟。
- spark.core.connection.ack.wait.timeout
spark.akka.timeout
spark.storage.blockManagerSlaveTimeoutMs
spark.shuffle.io.connectionTimeout
spark.rpc.askTimeout or spark.rpc.lookupTimeout
2 com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException Serialization trace:
When kryo serialization is used, the query fails when ORDER BY and LIMIT is combined. After removing either ORDER BY or LIMIT clause, the query also runs.
- 数据倾斜 (数据清洗类问题)
1 原因:
- (参数设置)常见于各种shuffle操作,例如reduceByKey,groupByKey,join等操作。同样与内存问题一样涉及到并发度不够问题。
- (数据质量)key设置不合理或有大量空值,或分布不均,或与数据关联性过大等等。数据有误。
- (程序质量)逻辑有误。
2 性能下降,executor上过多数据,出现OOM
3 数据质量导致,解决:抽样key
df.select("key").sample(false,0.1).(k=>(k,1)).reduceBykey(_+_).map(k=>(k._2,k._1)).sortByKey(false).take(10)//key过多也可选多些
多数因为null值或无意义()引起 (过滤),大量重复的测试数据 (过滤),业务的有效数据正常分布。
4 业务逻辑导致,解决步骤:
- 将原始的
key
转化为key + 随机值
(例如Random.nextInt) - 对数据进行
reduceByKey(func)
- 将
key + 随机值
转成key
- 再对数据进行
reduceByKey(func)
tip1: 如果此时依旧存在问题,建议筛选出倾斜的数据单独处理。最后将这份数据与正常的数据进行union即可。tips2: 单独处理异常数据时,可以配合使用Map Join解决。
5 参数设置,解决:
- 并行度问题:
spark.sql.shuffle.partitions
参数控制shuffle的并发度,默认为200。 rdd操作可以设置spark.default.parallelism
控制并发度,默认参数由不同的Cluster Manager控制。(不能根本解决) - reduce-side-join问题:表较小时,可以map join代替reduce join。自己实现在 map 端实现数据关联,跳过大量数据进行 shuffle 的过程。提升十几甚至几十倍。属于在海量数据中匹配少量特定数据问题。主要思想是使用broadcast在各节点本地完成,spark.sql.autoBroadcastJoinThreshold设置大于表。
val df = ... df.cache.count df.registerTempTable("ipTable") sqlContext.sql("select * from (select * from ipTable)a join (select * from hist)b on a.ip = b.ip")
- 数据结构优化
字符串替代对象,使用原始类型(比如Int、Long)替代字符串,使用数组替代集合类型
- 参数 (为了充分利用集群资源)(spark开头的参数submit时使用--conf)
num-executors 默认值为少量。过多资源不够,过少性能不高。50-100
executor-memory 4-8g,申请的num-executors乘以executor-memory总内存1/3-1/2左右
executor-cores 同上
driver-memory 考虑到collect即可,一般1g即可
spark.default.parallelism 每个stage的默认task数量。500-1000左右
Spark自己根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。
通常来说,Spark默认设置的数量是偏少的(比如就几十个task),Executor没有task执行,浪费资源。num-executors * executor-cores的2~3倍较为合适。
spark.storage.memoryFraction 默认是0.6。默认Executor 60%的内存,可以用来保存持久化的RDD数据。考虑GC以及操作类型。
shuffle类操作比较多,而持久化操作比较少,那么这个参数的值适当降低一些比较合适。
如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。
spark.shuffle.memoryFraction 默认是0.2。Executor默认只有20%的内存用来进行shuffle中聚合操作。
设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例。
以上是关于optimization & error -01的主要内容,如果未能解决你的问题,请参考以下文章
Error: webpack.optimize.CommonsChunkPlugin has been removed, please use config.optimization.splitChu
使用 scipy.optimize.curve_fit - ValueError 和 minpack.error 拟合 2D 高斯函数
php artisan optimize NULL.ERROR: Symfony\Component\Debug\Exception\FatalThrowableError: Call to unde
Error: webpack.optimize.UglifyJsPlugin has been removed, please use config.optimizat
iOS打包上传ipa文件时,报错<ERROR ITMS-90096: "Your binary is not optimized for iPhone 5 - New iPhone a