Spark框架—RDD分区和缓存

Posted 那人独钓寒江雪.

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark框架—RDD分区和缓存相关的知识,希望对你有一定的参考价值。

Spark框架—RDD分区和缓存

AccessLogAgg.scala

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.SparkConf, SparkContext
import org.junit.Test




class AccessLogAgg 
@Test
  def ipAgg(): Unit =
  def main(args: Array[String]): Unit = 
    //1.创建 SparkContext
    val conf: SparkConf =new SparkConf().setMaster("local[6]").setAppName("ip_agg")
    val sc = new SparkContext(conf)
    //2.读取文件,生成数据集
    val sourceRDD: RDD[String] =sc.textFile("codeStyles/java_error_in_IDEA_2046.log")
    //3.取出IP,赋予出现次数为1
    val ipRDD: RDD[Array[String]] =sourceRDD.map(item =>item.split(" "))
    //4.简单清洗
    //4.1去掉空的数据
    //4.2去掉非法的数据
    //4.3根据业务再规定整一个数据
    val cleanRDD: RDD[Array[String]] =ipRDD.filter(item => item.isEmpty)
    //5.根据IP出现的次数进行聚合
//    cleanRDD.reduceBykey((curr,agg) => curr + agg)
    //6.根据IP出现的次数进行排序

    //7.取出结果,打印结果

  



ActionOp.scala

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf, SparkContext
import org.junit.Test

class ActionOp 
  val conf: SparkConf =new SparkConf().setMaster("local[6]").setAppName("Trans_formationOp")
  val sc=new SparkContext(conf)
  /*
  *需求,最终生成("结果",price)
   */
  @Test
  /*
  *1.函数中传入的curr参数,并不是value,而是一整条数据
  * 2.reduce整体上的结果,只有一个
   */
  def reduce(): Unit =
    val rdd: RDD[(String, Double)] =sc.parallelize(Seq(("手机",10.0),("手机",15.0),("电脑",20.0),("排骨",13.0)))//不是按原来的顺序打印的
    val result: (String, Double) =rdd.reduce((curr, agg) => ("结果",curr._2+ agg._2 ))//agg是局部汇总结果 相加每次结果 curr也是汇总所有的数据
      println(result)//打印元祖
  

  @Test
  def count(): Unit =
    val rdd: RDD[(String, Int)] =sc.parallelize(Seq(("a",1),("b",2),("c",3),("c",4)))
    println(rdd.count())//元祖总数

    println(rdd.countByKey())//计算key的个数 返回的是map

  
  //返回的是数组形式
  //take和takeSample都散获取数据,一个是直接获取,一个是采样获取
  //first:一般情况下,action会从所有分区获取数据,相对来说数据就比较慢,first只是获取第一个元素,所以first只会处理第一个分区,取第一个数据的速度比take还要快
  @Test
  def take(): Unit =
    val rdd: RDD[Int] =sc.parallelize(Seq(1,2,3,4,5,6))
    rdd.take(3).foreach(item => println(item))//因为返回的是数组形式 所以用foreach
    println(rdd.first())
    rdd.takeSample(withReplacement = false,num = 3).foreach(println(_))//取三个数,没有重复值
  
  //数学计算
  //除了这四个支持意外,还有其他很多的支持,这些对于数字类型的支持都是Action
  @Test
  def numberic(): Unit =
    val rdd: RDD[Int] =sc.parallelize(Seq(1,2,3,4,10,20,30,50,100))
    println(rdd.max())
    println(rdd.min())
    println(rdd.mean())//求均值
    println(rdd.sum())
  
  


TransformationOp.scala

import junit.framework.Test
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf, SparkContext
import org.junit
//collect一般都在定义的函数外面收集
class TransformationOp 
  val conf: SparkConf =new SparkConf().setMaster("local[6]").setAppName("Trans_formationOp")
  val sc=new SparkContext(conf)
/*mapPartitions 和map算子是一样的,只不过map是针对每一条数据进行转换,mapPartitions 针对一整个分区的数据进行转换
* 1.map的func的参数是单条数据,mapPartitions的fnc的参数是一个集合(一个分区中的整个所有的数据)
* 2.map的func返回值也是单条数据,mapPartitions返回值是一个集合
 */
  @junit.Test
  def mapPartitions(): Unit =
    //1.数据的生成
    //2.算子使用
    //3.获取结果
    sc.parallelize(Seq(1,2,3,4,5,6),2)
      .mapPartitions(iter=>
          iter.foreach(item => println(item))//放在迭代器内和外有啥区别
        iter//需要一个返回值
      )
      .collect()
  

  @junit.Test
  def mapPartitions1(): Unit = 
    sc.parallelize(Seq(1, 2, 3, 4, 5, 6), 2)
      .mapPartitions(iter => 
        //遍历iter其中每一条数据进行转换,转换完以后,返回这个iter
        iter.map(item => item * 10)
        //返回值应该是定义foreach的变量
      )
        .collect()
        .foreach(item => println(item))//没有返回值

  
  @junit.Test
  def mapPartitionsWithIndex(): Unit =
    sc.parallelize(Seq(1,2,3,4,5,6),2)
      .mapPartitionsWithIndex((index,iter) =>
        println("index:"+index)
        iter.foreach(item=>println(item))
        iter
      )
      .collect()
  
  @junit.Test
  def map1(): Unit =
    sc.parallelize(Seq(1,2,3,4,5,6),2)
      .mapPartitionsWithIndex((index,iter)=>
          println("index:"+index)
        iter.map(item=> item *10)
        iter.foreach(item => println(item))
        iter
      )
      .collect()
  
  @junit.Test
  //1.定义集合
  //2.过滤数据
  //3.收集结果
  def filter(): Unit =  //filter相当于if结构
    sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
      .filter(item => item % 2 == 0)
      .collect()
      .foreach(item => println(item))
  

    //sample 算子可以从一个数据集中抽样出来一部分,常用于减小数据集以保证运行速度,并且尽可能少规律的损失
    //1.第一个参数值为投入哦,则抽样出来的数据集中可能会有重复,2.sample接受第二个参数意为抽样的比例,3.seed随机数种子,用于sample内部随即生成
  //1.为true是有放回的,2.为false是无放回的
  @junit.Test
  //1.定义集合
  //2.过滤数据
  //3.收集结果
  def sample(): Unit =
    //sample作用:把大数据集变小,尽可能的减少数据集规律的损失
     val rdd1: RDD[Int] =sc.parallelize(Seq(1,2,3,4,5,6,7,8,9,10),2)
      val rdd2: RDD[Int] =rdd1.sample(withReplacement = true,0.6)//第二个参数表示从10份里面抽取6份,true表示有返回值
      val result: Array[Int] =rdd2.collect()
      result.foreach(item => println(item))
  

  @junit.Test
  def mapValues(): Unit =
    sc.parallelize(Seq(("a",1),("b",2),("c",3),("d",4)))
      .mapValues(item => item * 10)
      .collect()
      .foreach(println(_))
  

  @junit.Test
  def intersection(): Unit =
    val rdd1: RDD[Int] =sc.parallelize(Seq(1,2,3,4,5))
    val rdd2: RDD[Int] =sc.parallelize(Seq(2,5,3,6))
    rdd1.intersection(rdd2)//交集
      .collect()
      .foreach(println(_))
  
  @junit.Test
  def union(): Unit =//并集
    val rdd1: RDD[Int] =sc.parallelize(Seq(1,2,3,4,5,6))
    val rdd2: RDD[Int] =sc.parallelize(Seq(3,7,8))
     rdd1.union(rdd2)
      .collect()
      .foreach(println(_))
  
  @junit.Test//差集
  def subtract(): Unit =
    val rdd1: RDD[Int] =sc.parallelize(Seq(1,2,3,4))
    val rdd2: RDD[Int] =sc.parallelize(Seq(1,3,4))
    rdd1.subtract(rdd2)
      .collect()
      .foreach(println(_))
  
  //groupByKey 能不能在map端做Combiner有没有意义?没有的  取出key值,按照Key分组,和ReduceByKey有点类似
  @junit.Test
  def groupByKey(): Unit =
    sc.parallelize(Seq(("a",1),("a",2),("b",1)))
      .groupByKey()
      .collect()
      .foreach(println(_))
  

  @junit.Test
  //计算集合内的平均值
  def combineByKey(): Unit =
    //1.准备集合
    val rdd: RDD[(String, Double)] =sc.parallelize(Seq(("zhangsan",99.0),("zhangsan",96.0),("lisi",97.0),("lisi",98.0),("zhangsan",97.0)))
    //2.算子操作
    //2.1createCombiner转换数据
    //2.2mergeValue分区上的聚合
    //2.3mergeCombiners把所有分区上的结果再次聚合,生成最终结果
    val combineResult: RDD[(String, (Double, Int))] =rdd.combineByKey(
      createCombiner = (curr:Double)=>(curr,1),
      mergeValue=(curr:(Double,Int),nextValue:Double) =>(curr._1+nextValue,curr._2+1),
      mergeCombiners = (curr:(Double,Int),agg:(Double,Int)) =>(curr._1+agg._1,curr._2+agg._2)
    )
    //("zhangsan",(99+96+97,3))
    val resultRDD: RDD[(String, Double)] =combineResult.map(item=>(item._1,item._2._1/item._2._2))

    //3.获取结果,打印结果
    resultRDD.collect().foreach(println(_))
  
  //foldByKey和spark中的reduceByKey的区别是可以指定初始值
  //foldByKey和scala中的foldLeft或者foldRight区别是,这个初始值作用于每一个数据,而foldLeft只作用一次

  @junit.Test
  def foldByKey(): Unit =
    sc.parallelize(Seq(("a",1),("a",1),("b",1)))
      .foldByKey(10)((curr,agg) => curr + agg)//agg是局部变量 10是单次增加不是全体增加
      .foreach(println(_))
  
  //zeroValue:指定初始值
  //seqOp:作用于每一个元素,根据初始值,进行计算
  //combOp:将seqOp处理过的结果进行聚合
  //aggregateByKey
  @junit.Test
  def aggregateByKey(): Unit = 
    val rdd: RDD[(String, Double)] = sc.parallelize(Seq(("手机", 10.0), ("手机", 15.0), ("电脑", 20.0)))
    rdd.aggregateByKey(0.8)((zeroValue, item) => item * zeroValue, (curr, agg) => curr + agg)
      .collect()
      .foreach(println(_))
  

  @junit.Test
  def join(): Unit =
    val rdd1: RDD[(String, Int)] =sc.parallelize(Seq(("a",1),("a",2),("b",1)))
    val rdd2: RDD[(String, Int)] =sc.parallelize(Seq(("a",10),("a",11),("b",12)))
    rdd1.join以上是关于Spark框架—RDD分区和缓存的主要内容,如果未能解决你的问题,请参考以下文章

Spark-core:Spark RDD的高级算子

对spark内存迭代计算框架的理解误区

Spark 如何驱逐缓存的分区?

大数据框架-spark

Spark RDD在Spark中的地位和作用如何?

Spark RDD Operations