optimization & error -01

Posted satyrs

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了optimization & error -01相关的知识,希望对你有一定的参考价值。

  • 调优都是在场景限制之下。大部分选择并非一定。做测试来寻找瓶颈。(shuffle操作数量、RDD持久化操作数量以及gc)
  • 开发调优、资源调优、数据倾斜调优、shuffle调优几个部分。

(涉及代码质量(api及数据结构),参数,数据质量,考虑内存与网络而选择的模式(广播、序列化),官网建议)

 

  • RDD(主要是persist,以及代码质量)

一个RDD,重复使用,persist选择方式。persist(StorageLevel.MEMORY_AND_DISK_SER),选择时注意JVM的OOM异常。

ER表示,内存充足时优先持久化到内存中,内存不充足时持久化到磁盘文件中。

_SER后缀表示,使用序列化的方式来保存RDD数据。避免发生频繁GC。

后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上。网络开销。

 

 

  • shuffle算子 (几个api根据使用情境的选择)

0  reduceByKey、join、distinct、repartition都减少使用。

1  map-side预聚合(见下)。

2  groupByKey

.reduceByKey(_ + _)
-----------------------
.groupByKey().map(t => (t._1, t._2.sum))//都是对ParisRDD操作

 

reduceByKey每个机器各自统计之后,lamdba再将结果reduce成最终结果。groupByKey则所有相关kv pair都通过网络传输移动(shffle)。因此groupByKey移动数据时,若数据量大于单台执行机器内存总量,spark将数据保存磁盘,影响性能。保存时需要处理一个key的数据,当单个 key 的键值对超过内存容量会存在内存溢出的异常。

主要因groupByKey执行移动数据到执行机器时数据远大于reduceByKey。reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。

类似问题涉及shuffle还可使用小表的广播(见下)。

3  mapPartition

对大数据进行遍历,使用mapPartition而不是map,因为mapPartition是在每个partition中进行操作,一次函数调用会处理一个partition所有的数据,而不是一次函数调用处理一条,性能相对来说会高一些。也可以减少遍历时新建broadCastMap.value对象的空间消耗,同时匹配不到的数据也不会返回()。

4  foreachPartitions代替foreach

比如在foreach函数中,将RDD中所有数据写mysql,那么如果是普通的foreach算子,就会一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下;但是如果用foreachPartitions算子一次性处理一个partition的数据,那么对于每个partition,只要创建一个数据库连接即可,然后执行批量插入操作,此时性能是比较高的。

5  filter后进行coalesce操作

使用coalesce算子,手动减少RDD的partition数量,将RDD中的数据压缩到更少的partition中去。filter后partition可能过多,而数据较少,造成浪费,coalesce减少partition数目。

6  repartitionAndSortWithinPartitions替代repartition与sort类操作

如果需要在repartition重分区之后,还要进行排序,建议直接使用repartitionAndSortWithinPartitions算子。因为该算子可以一边进行重分区的shuffle操作,一边进行排序。shuffle与sort两个操作同时进行,比先shuffle再sort来说,性能可能是要高的。

 

  • 广播 (外部大变量,)

算子函数中使用外部变量的场景(尤其是大变量,比如100M以上的大集合),那么此时就应该使用Spark的广播(Broadcast)功能来提升性能。

主要为了避免spark将外部变量制作多个副本到task级别通过网络传输到各个节点,以及内存占用导致频繁GC。广播则task共享,副本只到executor级别。

因为:在算子函数中,使用广播变量时,首先会判断当前task所在Executor内存中,是否有变量副本。

 

  • kryo (序列化场景下)

使用场景:

  1. 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输。
  2. 将自定义的类型作为RDD的泛型类型时(比如JavaRDD),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。
  3. 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。

Kryo要求最好要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

 

 

 

  • 内存 (观察内存以及避免OOM)

1  检测对象内存消耗:创建RDD,然后放到cache里面去,然后在UI上面看storage的变化;或使用SizeEstimator来估算。

2  -XX:+UseCompressedOops选项可以压缩指针(8字节变成4字节)

“ Exception in thread “main” org.apache.spark.SparkException: Job aborted due to stage failure: Task 12 in stage 17.0 failed 4 times, most recent failure: Lost task 12.3 in stage 17.0 (TID 1257, ip-10-184-192-56.ec2.internal): ExecutorLostFailure (executor 79 lost)

 

之前使用groupBy时也出现过类似问题,删除limit语句后可以运行。也可从spark-defaults.conf文件中修改。改为spark.serializer org.apache.spark.serializer.JavaSerializer。

3  reduce task在当shuffle出现:比如sortByKey、groupByKey、reduceByKey和join等,需要hash table,消耗内存。解决办法:

  1. 增大level of parallelism,这样每个task的输入规模就相应减小
  2. 注意shuffle的内存上限设置。shuffle内存不够则性能变低。大量数据join等操作shuffle的内存上限经常配置到executor的50%。

4  当3中两个参数无法解决性能问题:考虑原始input大小,则必须增加内存。EC2机器类型选择考虑瓶颈。cache若不能改善性能则避免使用。

 

 

  • 网络  (bug--序列化及心跳时间)

1  executor或task失去问题,或出现timeout,解决:

  1. spark.network.timeout,设置高些,300或480。默认为2分钟。
  2. spark.core.connection.ack.wait.timeout 
    spark.akka.timeout 
    spark.storage.blockManagerSlaveTimeoutMs 
    spark.shuffle.io.connectionTimeout 
    spark.rpc.askTimeout or spark.rpc.lookupTimeout

2  com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException Serialization trace:

When kryo serialization is used, the query fails when ORDER BY and LIMIT is combined. After removing either ORDER BY or LIMIT clause, the query also runs.

 

 

 

  • 数据倾斜 (数据清洗类问题)

1  原因:

  1. (参数设置)常见于各种shuffle操作,例如reduceByKey,groupByKey,join等操作。同样与内存问题一样涉及到并发度不够问题。
  2. (数据质量)key设置不合理或有大量空值,或分布不均,或与数据关联性过大等等。数据有误。
  3. (程序质量)逻辑有误。

2  性能下降,executor上过多数据,出现OOM

3  数据质量导致,解决:抽样key

df.select("key").sample(false,0.1).(k=>(k,1)).reduceBykey(_+_).map(k=>(k._2,k._1)).sortByKey(false).take(10)//key过多也可选多些

 

多数因为null值或无意义()引起 (过滤),大量重复的测试数据  (过滤),业务的有效数据正常分布。

 

4  业务逻辑导致,解决步骤:

  1. 将原始的 key 转化为 key + 随机值(例如Random.nextInt) 
  2. 对数据进行 reduceByKey(func)
  3. 将 key + 随机值 转成 key
  4. 再对数据进行 reduceByKey(func)

tip1: 如果此时依旧存在问题,建议筛选出倾斜的数据单独处理。最后将这份数据与正常的数据进行union即可。tips2: 单独处理异常数据时,可以配合使用Map Join解决。

5  参数设置,解决:

    1. 并行度问题:spark.sql.shuffle.partitions参数控制shuffle的并发度,默认为200。 rdd操作可以设置spark.default.parallelism控制并发度,默认参数由不同的Cluster Manager控制。(不能根本解决)
    2. reduce-side-join问题:表较小时,可以map join代替reduce join。自己实现在 map 端实现数据关联,跳过大量数据进行 shuffle 的过程。提升十几甚至几十倍。属于在海量数据中匹配少量特定数据问题。主要思想是使用broadcast在各节点本地完成,spark.sql.autoBroadcastJoinThreshold设置大于表。

 

val df = ...
df.cache.count
df.registerTempTable("ipTable")
sqlContext.sql("select * from (select * from ipTable)a join (select * from hist)b on a.ip = b.ip")

 

 

  • 数据结构优化 

字符串替代对象,使用原始类型(比如Int、Long)替代字符串,使用数组替代集合类型

 

  • 参数 (为了充分利用集群资源)(spark开头的参数submit时使用--conf)

num-executors 默认值为少量。过多资源不够,过少性能不高。50-100

executor-memory 4-8g,申请的num-executors乘以executor-memory总内存1/3-1/2左右

executor-cores 同上

driver-memory  考虑到collect即可,一般1g即可

spark.default.parallelism  每个stage的默认task数量。500-1000左右

Spark自己根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。

通常来说,Spark默认设置的数量是偏少的(比如就几十个task),Executor没有task执行,浪费资源。num-executors * executor-cores的2~3倍较为合适。

spark.storage.memoryFraction 默认是0.6。默认Executor 60%的内存,可以用来保存持久化的RDD数据。考虑GC以及操作类型。

shuffle类操作比较多,而持久化操作比较少,那么这个参数的值适当降低一些比较合适。

如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

spark.shuffle.memoryFraction  默认是0.2。Executor默认只有20%的内存用来进行shuffle中聚合操作。

设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例。

 

以上是关于optimization & error -01的主要内容,如果未能解决你的问题,请参考以下文章

Error: webpack.optimize.CommonsChunkPlugin has been removed, please use config.optimization.splitChu

使用 scipy.optimize.curve_fit - ValueError 和 minpack.error 拟合 2D 高斯函数

php artisan optimize NULL.ERROR: Symfony\Component\Debug\Exception\FatalThrowableError: Call to unde

Error: webpack.optimize.UglifyJsPlugin has been removed, please use config.optimizat

Optimization & Map

iOS打包上传ipa文件时,报错<ERROR ITMS-90096: "Your binary is not optimized for iPhone 5 - New iPhone a