spark 大杂烩

Posted shaozhiqi

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark 大杂烩相关的知识,希望对你有一定的参考价值。

累加器

val dataRdd = sc.makeRDD(List(1, 2, 3, 4), 2)
var sum = 0
 //累加器可以收集driver和各个excuter中累加的结果
//如果此处删除累加器,用java的算法sum=sum+i那么结果是0,driver端的sum就是0,缺有无法得知各个excuter中加到了几
val accumulator = sc.longAccumulator
dataRdd.foreach({
      case i=>{
        sum=sum+i
        accumulator.add(i)
      }
 })
 println("sum = "+accumulator.value)
 //也可以自定义累加器加单词等,不只有long

序列化

//class Params(query:String) extends java.io.Serializable{
class Params(query: String) {

  //main中用到params对象的此方法会提示需要对象序列化
  def isMatch(s: String): Boolean = {
    s.contains(query)
  }

  //main中用到params对象的此方法会提示需要对象序列化
  def getMatch1(rdd: RDD[String]): RDD[String] = {
    rdd.filter(isMatch)
  }

  //main中用到params对象即使没有序列化也ok
  def getMatch2(rdd: RDD[String]): RDD[String] = {
    val q = query
    rdd.filter(x => x.contains(q))
  }
  
}

宽窄依赖

窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用,窄依赖我形象的比喻为独生子女

宽依指的是多个子RDD的Partition会依赖同一个父RDD的 Partition,会引起shuffle.总结:宽依我们形象的比喻为超生

DAG

DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就就成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的 Stage,对于窄依糗partition的转换处理在 Stage中完成算,对于宽依赖,由于有Shuffle的存在,只能在 parentRDD处理完成后,才能开始接下来的计算,因此宽依是划分Stage的依据。

技术图片

RDD任务划分

RDD任务切分中间分为: Application、job、 Stage和Task

1) Application:初始化一个 Spark Context I即生成一个 Application

2)Job:一个 Action算子就会生成一个Job

3) Stage:根据RDD之间的依赖关系的不同将Job划分成不同的 Stage,通到一个宽依赖则划分一个 Stage,

 技术图片

stage=1+发生shuffle的个数

task=每个stage的数据分区数之和= 5+3

RDD缓存

RDD通过 persist方法或 cache方法可以将前面的计算结果缓存,认情况下 persist()会把数据以序列化的形式缓存在jvm的堆究间中。

但是并不是这两个方法被调用时立即缓存,面是触发后面的 action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。并且血统中会指向这个缓存。当缓存丢失时就会重新计算。

RDD数据分区器

Spark目前支持Hash分区和 Range分区,用户也可以自定义分区,Hash分区为当前的默认分区,Spak中分区直接决定了RDD中分区的个数、RDD中每条数据经过 Shuttle过程属于哪个分区和 Reduce 的个数

注意:(1)只有 Key-value类型的RDD才有分区器的,非Key- Value类型的RDD分区的值是Nono

     (2)每个RDD的分区ID范国:0- numPartition-1,决定这个值是所于那个分区的。

Hash分区器

Hashpartitioner分的原理:对于给定的key,计算其 hashcode,并除以分区的个数取余,如果余数小于0,则用余数+分区的个数(否则加0),最后返回的值就是这个key所属的分区ID

Range分区器

Hashpartitioner分区弊端:可能导致每个分区中数据量的不匀,校端情况下会导致某些分区拥有RDD的全部数据。

Rangepartitioner作用:将一定范围内的数映射到某一个分区内,尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,一个分区中的元素背定都是比另一个分区内的元素小或者大,但是分区内的元素是不能保证顺序的。简单的说就是将一定范国内的数映射到某一个分区内。

foeachPatirtion内

foeachPatirtion内datas.foreach是在excuter中执行,datas已是集合不是rdd,不涉及网路交互

Spark三大数据结构

RDD:分布式数据集

广播变量:分布式只读共享变量,广播变量的作用是将公用只读的集合广播出去,广播到excuter中,每个task读取自己excuter中的副本。注意广播内容的大小和excuter内存

累加器:分布式只写共亨变量,累加器可以将driver和excuter中的数据相加

 reduceBykey和 groupByKey的区别

1. reducebykey:按照key进行聚合,在 shuttle之前有 combine(顶聚合)操作,返回结果是 RDD[k,v]

2. geroupbykey:按照key进行分组,直接进行 shuffle

3.开发指导: reducebykey比 groupbykey,建议使用。但是需要注意是否会影响业务逻辑

 

 

 

 

以上是关于spark 大杂烩的主要内容,如果未能解决你的问题,请参考以下文章

spark 大杂烩

在这个 spark 代码片段中 ordering.by 是啥意思?

python+spark程序代码片段

spark关于join后有重复列的问题(org.apache.spark.sql.AnalysisException: Reference '*' is ambiguous)(代码片段

分享几个实用的代码片段(附代码例子)

分享几个实用的代码片段(附代码例子)