Spark 案例实操
Posted 气质&末雨
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark 案例实操相关的知识,希望对你有一定的参考价值。
文章目录
Spark 案例实操
一、数据准备
在之前的学习中,我们已经学习了 Spark 的基础编程方式,接下来,我们看看在实际的工作中如何使用这些 API 实现具体的需求,这些需求是电商网站的真实需求,所以在实现功能前,我们必须先把数据准备好。
上面的数据是从数据文件中截取的一部分内容,表示为该电商网站的用户行为数据。主要包含用户的4种行为:搜索,点击,下单,支付。数据规则如下:
- 数据文件中每行数据采用
下划线
分隔数据 - 每一行数据表示用户的一次行为,这个行为只能是4种行为的一种
- 如果搜索关键字为null,表示数据不是搜索数据
- 如果点击的品类ID和产品ID为-1,表示数据不是点击数据
- 针对于下单行为,一次可以下单多个商品,所以品类ID和产品ID可以是多个,id之间采用
逗号
分隔,如果本次不是下单行为,则数据采用null
表示 - 支付行为和下单行为类似
详细字段说明:
编号 | 字段名称 | 字段类型 | 字段含义 |
---|---|---|---|
1 | date | String | 用户点击行为的日期 |
2 | user_id | Long | 用户的ID |
3 | session_id | String | Session的ID |
4 | page_id | Long | 某个页面的ID |
5 | action_time | String | 动作的时间点(时间戳) |
7 | click_category_id(品类ID) | Long | 某一个商品品类的ID |
8 | click_product_id | Long | 某一个商品的ID |
9 | order_category_ids (下单品类ID) | String | 一次订单中所有品类的ID集合 |
10 | order_product_ids | String | 一次订单中所有商品的ID集合 |
11 | pay_category_ids (支付品类ID) | String | 一次支付中所有品类的ID集合 |
12 | pay_product_ids | String | 一次支付中所有商品的ID集合 |
13 | city_id | Long | 城市 id |
二、案例需求
需求1:Top10 热门品类说明
需求说明
品类是指产品的分类,大型电商网站品类分多级,咱们的项目中品类只有一级,不同的公司可能对热门的定义不一样,我们按照每个品类的点击,下单,支付的量来统计热门品类:
鞋 点击数 下单数 支付数
衣服 点击数 下单数 支付数
电脑 点击数 下单数 支付数
例如:综合排名 = 点击数 * 20% + 下单数 * 30% + 支付数 * 50%
本项目需求优化为:先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下单数;下单数再相同,就比较支付数。
代码示例
package com.atguigu.bigdata.spark.core.wc.anli
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf, SparkContext
//Spark 案例
class Spark_anli1
object Spark_anli1
def main(args: Array[String]): Unit =
//TODO : Top10 热门品类
val conf = new SparkConf().setMaster("local[*]").setAppName("Spark案例")
val context = new SparkContext(conf)
/*
* 我们的需求规则:
* `先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下单数;下单数再相同,就比较支付数。`
* */
//1、读取原始的日志数据
val rdd = context.textFile("D:\\\\BaiduNetdiskDownload\\\\尚硅谷spark.资料\\\\spark-core数据\\\\user_visit_action.txt")
//2、统计品类的点击数量: (品类ID,点击数量)
val clickActionRDD = rdd.filter( //点击行为的RDD
action =>
val date = action.split("_") //先按下划线进行分割
date(6) != "-1" //date(6) 是品类ID,不能那个为-1因为为-1就不是点击行为了
)
val clickCountRDD: RDD[(String, Int)] = clickActionRDD.map(
action =>
val date = action.split("_")
(date(6), 1) //品类ID,然后1 这样进行统计,下一步就是聚合了,就可以看出点击数了
).reduceByKey(_ + _)
//3、统计品类的下单数量:(品类ID,下单数量)
val orderActionRDD = rdd.filter( //下单行为的RDD
action =>
val date = action.split("_") //先按下划线进行分割
date(8) != "null" //date(8) 是下单品类ID,不能那个为null因为为null就不是下单品类行为了
)
//orderid => 1,2,3 由于这个下单数量可能是多个,用逗号连接起来的,所以我们要进行扁平化拆分出来
//[1,2,3]
val orderCountRDD = orderActionRDD.flatMap(
action =>
val date = action.split("_")
val cid = date(8) //这是下单品类id,然后里面可能有几个
val cids = cid.split(",") //然后我们用逗号分隔开
cids.map(id => (id, 1)) //最后用map 转换成这个样子
)
//4、统计品类的支付数量:(品类ID,支付数量)
val payActionRDD = rdd.filter( //下单支付行为的RDD
action =>
val date = action.split("_") //先按下划线进行分割
date(10) != "null" //date(10) 是下单支付品类ID,也是不能那个为null
)
val payCountRDD = payActionRDD.flatMap(
action =>
val date = action.split("_")
val cid = date(10) //这是下单品类id,然后里面可能有几个
val cids = cid.split(",") //然后我们用逗号分隔开
cids.map(id => (id, 1)) //最后用map 转换成这个样子
)
//5、将品类进行排序,并且取前十名
// 点击数量排序,下单数量排序,支付数量排序
// Scala 元组排序:先比较第一个,再比较第二个,再比较第三个,以此类推,用来做这个再合适不过了
// 要是在第5步的时候,把数据变成这样(品类ID,(点击数量,下单数量,支付数量)) 就很好搞了
//把不同数据源的数据连在一块,想想之前学的,
//join,还有zip,leftOuterJoin,cogroup(分组+链接)
//join是不行的,因为join是两个数据源中相同的key,zip也不行,zip的链接方法是和这个没什么关系
//所以最终只能用cogroup,相当于是两个数据源,第一个数据源中相同key的数据分到一个组中,
// 然后第二个数据源中相同的key的数据分到一个组中,然后给他们。
// 装到一个大的元组里面,第一个元素是key,然后第二个元素是一个小元组,然后把两个分组装到里面,输出出来。
//(品类ID,(点击数量,下单数量,支付数量)) //这样三个都连在一起了
val cogroupRDD: RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] = clickCountRDD.cogroup(orderCountRDD, payCountRDD)
val result = cogroupRDD.mapValues
case (clickIter,orderIter,payIter) => //因为上面是迭代器,我们不需要,只需要要数量,所以改一下
var clickCnt = 0
val iter1 = clickIter.iterator
if(iter1.hasNext)
clickCnt = iter1.next()
var orderCnt = 0
val iter2 = orderIter.iterator
if(iter2.hasNext)
orderCnt = iter2.next()
var payCnt = 0
val iter3 = payIter.iterator
if(iter3.hasNext)
payCnt = iter3.next()
(clickCnt,orderCnt,payCnt)
//这是最终的排序sortBy false是降序,取前十
val resultRDD = result.sortBy(_._2,false).take(10)
//6、将结果采集到控制台
resultRDD.foreach(println)
context.stop()
代码优化(aex)
我们发现,之前有太多的reduceByKey()
聚合,影响效率,而且那个合并,cogroup()
性能可能会比较低,存在shuffle
,所以我们在一开始的时候,就可以改变结构,把数据变为(key,(0,0,0))
变成这个样子,然后把三个rdd使用unon()
合并在一起,然后再进行reduceByKey()
进行两两聚合,这样效率就高的多了,代码量也少了。最后进行排序就可以了,倒序,取前十,就可以了。
package com.atguigu.bigdata.spark.core.wc.anli
import org.apache.spark.SparkConf, SparkContext
import org.apache.spark.rdd.RDD
class Spark_anli1_3
object Spark_anli1_3
def main(args: Array[String]): Unit =
//TODO : Top10 热门品类
//RDD 被重复使用
//cogroup性能可能比较低,可能会存在shuffle
val conf = new SparkConf().setMaster("local[*]").setAppName("Spark案例")
val context = new SparkContext(conf)
/*
* 我们的需求规则:
* `先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下单数;下单数再相同,就比较支付数。`
* */
//1、读取原始的日志数据
val rdd = context.textFile("D:\\\\BaiduNetdiskDownload\\\\尚硅谷spark.资料\\\\spark-core数据\\\\user_visit_action.txt")
//获取商品的点击数量
val cikleRDD = rdd.filter(
action =>
val date = action.split("_")
date(6) != "-1"
)
val clickCount = cikleRDD.map (
action =>
val date = action.split("_")
(date(6),(1,0,0))
)
//获取商品的下单数量
val proRDD = rdd.filter(
action =>
val date = action.split("_")
date(8) != "null"
)
val proCount = proRDD.flatMap(
action =>
val date = action.split("_")
val cid = date(8)
val cids = cid.split(",")
cids.map(
id => (id,(0,1,0))
)
)
val payRDD = rdd.filter(
action =>
val date = action.split("_")
date(10) != "null"
)
val payCount = payRDD.flatMap(
action =>
val date = action.split("_")
val cid = date(10)
val cids = cid.split(",")
cids.map(
id =>
(id, (0, 0, 1))
)
)
val result = clickCount.union(proCount).union(payCount)
val reduceRDD = result.reduceByKey(
(t1, t2) =>
(t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
)
val sortBYRDD = reduceRDD.sortBy(_._2, false).take(10)
sortBYRDD.foreach(println)
还有一种极简方式,用if,else直接判断,都不用合并了,判断完直接两两聚合,然后进行排序,取前十。
package com.atguigu.bigdata.spark.core.wc.anli
import org.apache.spark.SparkConf, SparkContext
//这是第三种方法的第二种方法,可以说是极简了
class Spark_anli1_3_2
object Spark_anli1_3_2
def main(args: Array[String]): Unit =
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val context = new SparkContext(conf)
val rdd = context.textFile("D:\\\\BaiduNetdiskDownload\\\\尚硅谷spark.资料\\\\spark-core数据\\\\user_visit_action.txt")
val result = rdd.flatMap(
action =>
val date = action.split("_")
if (date(6) != "-1")
List((date(6), (1, 0, 0)))
else if (date(8) != "null")
val cids = date(8).split(",")
cids.map(id => (id, (0, 1, 0)))
else if (date(10) != "null")
val cids = date(10).split(",")
cids.map((id => (id, (0, 0, 1))))
else
Nil
)
val reduceRDD = result.reduceByKey(
(t1, t2) =>
(t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
)
val sortRDD = reduceRDD.sortBy(_._2, false).take(10)
sortRDD.foreach(println)
context.stop()
以上是关于Spark 案例实操的主要内容,如果未能解决你的问题,请参考以下文章
HBase实操 | 使用Spark通过BulkLoad快速导入数据到HBase