Spark 案例实操

Posted 气质&末雨

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark 案例实操相关的知识,希望对你有一定的参考价值。

文章目录

Spark 案例实操

一、数据准备

在之前的学习中,我们已经学习了 Spark 的基础编程方式,接下来,我们看看在实际的工作中如何使用这些 API 实现具体的需求,这些需求是电商网站的真实需求,所以在实现功能前,我们必须先把数据准备好。

上面的数据是从数据文件中截取的一部分内容,表示为该电商网站的用户行为数据。主要包含用户的4种行为:搜索,点击,下单,支付。数据规则如下

  1. 数据文件中每行数据采用下划线分隔数据
  2. 每一行数据表示用户的一次行为,这个行为只能是4种行为的一种
  3. 如果搜索关键字为null,表示数据不是搜索数据
  4. 如果点击的品类ID和产品ID为-1,表示数据不是点击数据
  5. 针对于下单行为,一次可以下单多个商品,所以品类ID和产品ID可以是多个,id之间采用逗号分隔,如果本次不是下单行为,则数据采用 null 表示
  6. 支付行为和下单行为类似
    详细字段说明:
编号字段名称字段类型字段含义
1dateString用户点击行为的日期
2user_idLong用户的ID
3session_idStringSession的ID
4page_idLong某个页面的ID
5action_timeString动作的时间点(时间戳)
7click_category_id(品类ID)Long某一个商品品类的ID
8click_product_idLong某一个商品的ID
9order_category_ids(下单品类ID)String一次订单中所有品类的ID集合
10order_product_idsString一次订单中所有商品的ID集合
11pay_category_ids(支付品类ID)String一次支付中所有品类的ID集合
12pay_product_idsString一次支付中所有商品的ID集合
13city_idLong城市 id

二、案例需求

需求1:Top10 热门品类说明

需求说明

品类是指产品的分类,大型电商网站品类分多级,咱们的项目中品类只有一级,不同的公司可能对热门的定义不一样,我们按照每个品类的点击,下单,支付的量来统计热门品类:
鞋 点击数 下单数 支付数
衣服 点击数 下单数 支付数
电脑 点击数 下单数 支付数

例如:综合排名 = 点击数 * 20% + 下单数 * 30% + 支付数 * 50%
本项目需求优化为先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下单数;下单数再相同,就比较支付数。

代码示例

package com.atguigu.bigdata.spark.core.wc.anli

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf, SparkContext

//Spark 案例
class Spark_anli1 


object Spark_anli1
  def main(args: Array[String]): Unit = 
    //TODO : Top10 热门品类
    val conf = new SparkConf().setMaster("local[*]").setAppName("Spark案例")
    val context = new SparkContext(conf)
    /*
    * 我们的需求规则:
    * `先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下单数;下单数再相同,就比较支付数。`
    * */
    //1、读取原始的日志数据
    val rdd = context.textFile("D:\\\\BaiduNetdiskDownload\\\\尚硅谷spark.资料\\\\spark-core数据\\\\user_visit_action.txt")

    //2、统计品类的点击数量: (品类ID,点击数量)
    val clickActionRDD = rdd.filter( //点击行为的RDD
      action => 
        val date = action.split("_") //先按下划线进行分割
        date(6) != "-1" //date(6) 是品类ID,不能那个为-1因为为-1就不是点击行为了
      
    )
    val clickCountRDD: RDD[(String, Int)] = clickActionRDD.map(
      action => 
        val date = action.split("_")
        (date(6), 1) //品类ID,然后1 这样进行统计,下一步就是聚合了,就可以看出点击数了
      
    ).reduceByKey(_ + _)

    //3、统计品类的下单数量:(品类ID,下单数量)
    val orderActionRDD = rdd.filter( //下单行为的RDD
      action => 
        val date = action.split("_") //先按下划线进行分割
        date(8) != "null" //date(8) 是下单品类ID,不能那个为null因为为null就不是下单品类行为了
      
    )
    //orderid => 1,2,3 由于这个下单数量可能是多个,用逗号连接起来的,所以我们要进行扁平化拆分出来
    //[1,2,3]
    val orderCountRDD = orderActionRDD.flatMap(
      action => 
        val date = action.split("_")
        val cid = date(8) //这是下单品类id,然后里面可能有几个
        val cids = cid.split(",") //然后我们用逗号分隔开
        cids.map(id => (id, 1)) //最后用map 转换成这个样子
      
    )

    //4、统计品类的支付数量:(品类ID,支付数量)
    val payActionRDD = rdd.filter( //下单支付行为的RDD
      action => 
        val date = action.split("_") //先按下划线进行分割
        date(10) != "null" //date(10) 是下单支付品类ID,也是不能那个为null
      
    )
    val payCountRDD = payActionRDD.flatMap(
      action => 
        val date = action.split("_")
        val cid = date(10) //这是下单品类id,然后里面可能有几个
        val cids = cid.split(",") //然后我们用逗号分隔开
        cids.map(id => (id, 1)) //最后用map 转换成这个样子
      
    )
    //5、将品类进行排序,并且取前十名
    //  点击数量排序,下单数量排序,支付数量排序
    //  Scala 元组排序:先比较第一个,再比较第二个,再比较第三个,以此类推,用来做这个再合适不过了
    // 要是在第5步的时候,把数据变成这样(品类ID,(点击数量,下单数量,支付数量))  就很好搞了
    //把不同数据源的数据连在一块,想想之前学的,
    //join,还有zip,leftOuterJoin,cogroup(分组+链接)
    //join是不行的,因为join是两个数据源中相同的key,zip也不行,zip的链接方法是和这个没什么关系
    //所以最终只能用cogroup,相当于是两个数据源,第一个数据源中相同key的数据分到一个组中,
    // 然后第二个数据源中相同的key的数据分到一个组中,然后给他们。
    // 装到一个大的元组里面,第一个元素是key,然后第二个元素是一个小元组,然后把两个分组装到里面,输出出来。
    //(品类ID,(点击数量,下单数量,支付数量)) //这样三个都连在一起了
    val cogroupRDD: RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] = clickCountRDD.cogroup(orderCountRDD, payCountRDD)

    val result =  cogroupRDD.mapValues
      case (clickIter,orderIter,payIter) =>  //因为上面是迭代器,我们不需要,只需要要数量,所以改一下
        var clickCnt = 0
        val iter1 = clickIter.iterator
        if(iter1.hasNext)
          clickCnt = iter1.next()
        

        var orderCnt = 0
        val iter2 = orderIter.iterator
        if(iter2.hasNext)
          orderCnt = iter2.next()
        

        var payCnt = 0

        val iter3 = payIter.iterator
        if(iter3.hasNext)
          payCnt = iter3.next()
        
        (clickCnt,orderCnt,payCnt)
      

    
    //这是最终的排序sortBy false是降序,取前十
    val resultRDD = result.sortBy(_._2,false).take(10)


    //6、将结果采集到控制台

    resultRDD.foreach(println)
    context.stop()
  

代码优化(aex)

我们发现,之前有太多的reduceByKey() 聚合,影响效率,而且那个合并,cogroup()性能可能会比较低,存在shuffle,所以我们在一开始的时候,就可以改变结构,把数据变为(key,(0,0,0)) 变成这个样子,然后把三个rdd使用unon()合并在一起,然后再进行reduceByKey()进行两两聚合,这样效率就高的多了,代码量也少了。最后进行排序就可以了,倒序,取前十,就可以了。

package com.atguigu.bigdata.spark.core.wc.anli

import org.apache.spark.SparkConf, SparkContext
import org.apache.spark.rdd.RDD

class Spark_anli1_3 


object Spark_anli1_3
  def main(args: Array[String]): Unit = 
    //TODO : Top10 热门品类
    //RDD 被重复使用
    //cogroup性能可能比较低,可能会存在shuffle
    val conf = new SparkConf().setMaster("local[*]").setAppName("Spark案例")
    val context = new SparkContext(conf)
    /*
    * 我们的需求规则:
    * `先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下单数;下单数再相同,就比较支付数。`
    * */
    //1、读取原始的日志数据
    val rdd = context.textFile("D:\\\\BaiduNetdiskDownload\\\\尚硅谷spark.资料\\\\spark-core数据\\\\user_visit_action.txt")

    //获取商品的点击数量
    val cikleRDD = rdd.filter(
      action => 
        val date = action.split("_")
        date(6) != "-1"
      
    )
    val clickCount = cikleRDD.map (
      action => 
        val date = action.split("_")
        (date(6),(1,0,0))
      
    )

    //获取商品的下单数量
    val proRDD = rdd.filter(
      action => 
        val date = action.split("_")
        date(8) != "null"
      
    )
    val proCount = proRDD.flatMap(
      action => 
        val date = action.split("_")
        val cid = date(8)
        val cids = cid.split(",")
        cids.map(
          id => (id,(0,1,0))
        )
      
    )

    val payRDD = rdd.filter(
      action => 
        val date = action.split("_")
        date(10) != "null"
      
    )
    val payCount = payRDD.flatMap(
      action => 
        val date = action.split("_")
        val cid = date(10)
        val cids = cid.split(",")
        cids.map(
          id => 
            (id, (0, 0, 1))
          
        )
      
    )

    val result = clickCount.union(proCount).union(payCount)
    val reduceRDD = result.reduceByKey(
      (t1, t2) => 
        (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
      
    )
    val sortBYRDD = reduceRDD.sortBy(_._2, false).take(10)
    sortBYRDD.foreach(println)
  

还有一种极简方式,用if,else直接判断,都不用合并了,判断完直接两两聚合,然后进行排序,取前十。

package com.atguigu.bigdata.spark.core.wc.anli

import org.apache.spark.SparkConf, SparkContext
//这是第三种方法的第二种方法,可以说是极简了
class Spark_anli1_3_2 


object Spark_anli1_3_2
  def main(args: Array[String]): Unit = 
    val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val context = new SparkContext(conf)

    val rdd = context.textFile("D:\\\\BaiduNetdiskDownload\\\\尚硅谷spark.资料\\\\spark-core数据\\\\user_visit_action.txt")
    val result = rdd.flatMap(
      action => 
        val date = action.split("_")
        if (date(6) != "-1") 
          List((date(6), (1, 0, 0)))
         else if (date(8) != "null") 
          val cids = date(8).split(",")
          cids.map(id => (id, (0, 1, 0)))
         else if (date(10) != "null") 
          val cids = date(10).split(",")
          cids.map((id => (id, (0, 0, 1))))
         else 
          Nil
         
      
    )
    val reduceRDD = result.reduceByKey(
      (t1, t2) => 
        (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
      
    )
    val sortRDD = reduceRDD.sortBy(_._2, false).take(10)
    sortRDD.foreach(println)
    context.stop()

  

以上是关于Spark 案例实操的主要内容,如果未能解决你的问题,请参考以下文章

HBase实操 | 使用Spark通过BulkLoad快速导入数据到HBase

Hadoop大数据技术Yarn 案例实操

大数据之Hadoop(MapReduce):Combiner合并案例实操

mysql练习案例(实操)

Hive Sql/ Spark Sql 数据倾斜优化方案

MapReduceWordCount 案例实操