Spark数据处理与分析---留存率
Posted Shall潇
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark数据处理与分析---留存率相关的知识,希望对你有一定的参考价值。
需求分析
- 日志数据清洗
- 用户留存分析
- 活跃用户分析
一、数据清洗
样例数据
1、读入日志文件并转化为RDD[Row]类型
- 按照Tab切割数据,过滤掉字段数量少于8个的
import java.util.Properties
import org.apache.commons.lang.StringUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
/*
* 数据清洗
*
* */
object ETLDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("ETL")
val sc = SparkContext.getOrCreate(conf)
val spark = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
//1.加载数据
val rdd = sc.textFile("in/test.log")
// println(rdd.count())
//2.过滤为8个字段的数据,并封装为Row类型
/*
* event_time、url、method、status、sip、user_uip、action_prepend、action_client
*
* */
val rowRDD = rdd.map(_.split("\\t")).filter(x=>x.length==8).map(x=>Row(x(0),x(1),x(2),x(3),x(4),x(5),x(6),x(7)))
//创建Schema
val logs_schema = StructType(
Array(
StructField("event_time",StringType),
StructField("url",StringType),
StructField("method",StringType),
StructField("status",StringType),
StructField("sip",StringType),
StructField("user_uip",StringType),
StructField("action_prepend",StringType),
StructField("action_client",StringType)
)
)
//创建DataFrame
val logDF = spark.createDataFrame(rowRDD,logs_schema)
}
}
2、对数据进行清洗
- 按照第一列和第二列对数据进行去重
- 过滤掉状态码非200
- 过滤掉event_time为空的数据
- 将url按照”&”以及”=”切割
//继续清洗
val filterLogs = logDF.dropDuplicates("event_time", "url") //按照第一列和第二列对数据进行去重
.filter(x => x(3) == "200") //过滤掉状态码非200
.filter(x => StringUtils.isNotEmpty(x(0).toString)) //过滤掉event_time为空的数据
//将url按照”&”以及”=”切割
val full_logs_RDD: RDD[Row] = filterLogs.map(line => {
val str = line.getAs[String]("url") //先获取到 url这一列数据
val paramArray = str.split("\\\\?") //按照 ? 进行切分
var paramMap: Map[String, String] = null //先定义 存储 A=B 这样的键值对的类型
if (paramArray.length == 2) {
val strings = paramArray(1).split("&") //对 ?后面的字符串进行按 & 切分
paramMap = strings.map(x => x.split("=")).filter(x => x.length == 2)
.map(x => (x(0), x(1))).toMap //按照 =切分后的数据放入 Map
}
( //最好不要超过22个,如果非要那么多,建议使用元组中套元组
line.getAs[String]("event_time"), //再次获取数据
paramMap.getOrElse[String]("userUID", ""),
paramMap.getOrElse[String]("userSID", ""),
paramMap.getOrElse[String]("actionBegin", ""),
paramMap.getOrElse[String]("actionEnd", ""),
paramMap.getOrElse[String]("actionType", ""),
paramMap.getOrElse[String]("actionName", ""),
paramMap.getOrElse[String]("actionValue", ""),
paramMap.getOrElse[String]("actionTest", ""),
paramMap.getOrElse[String]("ifEquipment", ""),
line.getAs[String]("method"),
line.getAs[String]("status"),
line.getAs[String]("sip"),
line.getAs[String]("user_uip"),
line.getAs[String]("action_prepend"),
line.getAs[String]("action_client")
)
}).toDF().rdd
val full_logs_schema = StructType( //重新定义结构
Array(
StructField("event_time",StringType),
StructField("userUID",StringType),
StructField("userSID",StringType),
StructField("actionBegin",StringType),
StructField("actionEnd",StringType),
StructField("actionType",StringType),
StructField("actionName",StringType),
StructField("actionValue",StringType),
StructField("actionTest",StringType),
StructField("ifEquipment",StringType),
StructField("method",StringType),
StructField("status",StringType),
StructField("sip",StringType),
StructField("user_uip",StringType),
StructField("action_prepend",StringType),
StructField("action_client",StringType)
)
)
val full_logDF = spark.createDataFrame(full_logs_RDD,full_logs_schema) //得到最终的数据
3、将数据写入mysql表中
在执行写入数据程序之前,先创建好对应的数据库
进入到MySQL中
create database if not exists etl;
- 保存数据
//连接Mysql,将数据存入其中
val properties = new Properties()
properties.setProperty("url",JdbcUtil.url) //这里可以直接写自己的配置信息,但是最好封装一下
properties.setProperty("user",JdbcUtil.user)
properties.setProperty("password",JdbcUtil.password)
properties.setProperty("driver",JdbcUtil.driver)
//写入Mysql
println("将filterlogs写入Mysql")
filterLogs.write.mode(SaveMode.Overwrite).jdbc(JdbcUtil.url,"access_logs",properties)
println("写入完成")
println("将full_logDF写入Mysql")
full_logDF.write.mode(SaveMode.Overwrite).jdbc(JdbcUtil.url,"full_access_logs",properties)
println("写入完成")
spark.stop()
如果将写入Mysql和读取Mysql的封装成一个方法
就可以直接调用方法
例如下面
记得导包
package etl.util
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import java.util.Properties
object JdbcUtil {
val url = "jdbc:mysql://192.168.XXX.100:3306/etl"
val user = "root"
val password = "root"
val driver = "com.mysql.jdbc.Driver"
val properties = new Properties()
properties.setProperty("url",JdbcUtil.url)
properties.setProperty("user",JdbcUtil.user)
properties.setProperty("password",JdbcUtil.password)
properties.setProperty("driver",JdbcUtil.driver)
def getDataFrame(spark:SparkSession,tableName:String):DataFrame={
val frame = spark.read.jdbc(url,tableName,properties)
frame
}
def dataFrameToMysql(df:DataFrame, tableName:String)={
df.write.mode(SaveMode.Overwrite).jdbc(url,tableName,properties)
println("写入完成")
}
}
二、用户留存率分析
import java.text.SimpleDateFormat
import java.util.Properties
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SaveMode, SparkSession}
import etl.util.JdbcUtil
import org.apache.commons.lang.StringUtils
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DoubleType
/*
* 留存率
*
* */
object Retention {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("ETL")
val sc = SparkContext.getOrCreate(conf)
val spark = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
//连接Mysql,将数据存入其中:普通写法,后期可以封装
val properties = new Properties()
properties.setProperty("url",JdbcUtil.url)
properties.setProperty("user",JdbcUtil.user)
properties.setProperty("password",JdbcUtil.password)
properties.setProperty("driver",JdbcUtil.driver)
val logs = spark.read.jdbc(JdbcUtil.url,"full_access_logs",properties)
logs.cache()
//拉取actionName为注册的信息
val registered = logs.filter($"actionName" === "Registered")
.withColumnRenamed("event_time", "register_time")
.select("userUID", "register_time")
val signin = logs.filter($"actionName" === "Signin")
.withColumnRenamed("event_time", "signin_time")
.select("userUID", "signin_time")
val joined = registered.join(signin,Seq("userUID"),"left")
val sdf = new SimpleDateFormat("yyyy-MM-dd")
//注册UDF,传入参数为时间:2018-09-04T20:34:45+08:00
//输出为 Long类型的数字
val datetrans = spark.udf.register("datetrans", (event_time: String) => {
if (StringUtils.isEmpty(event_time))
0
else
sdf.parse(event_time.substring(0, 10)).getTime
})
datetrans
val joined2 = joined.withColumn("register_date",datetrans($"register_time"))
.withColumn("signin_date",datetrans($"signin_time"))
// joined2.show(false)
//一天的毫秒数:8640,0000
val daysc = 86400000
val result = joined2.groupBy("register_date").agg(countDistinct("userUID").cast(DoubleType).as("num")) //当天新增用户总量
// result.show()
// 1 天留存率 ---- 7 天留存率 改一下daysc,>=
val result1 = joined2.filter($"register_date" + daysc === $"signin_date")
.groupBy($"register_date").agg(countDistinct("userUID").as("num1")) //1 日留存量
// 355 381
val result_one_day = result.join(result1, "register_date")
.select($"register_date", round($"num1" / $"num", 2).as("一日留存率"))
//使用封装好的方法进行写入
// result_one_day.write.mode(SaveMode.Overwrite).jdbc(JdbcUtil.url,"",properties)
}
}
三、活跃用户统计
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import etl.util.JdbcUtil
import org.apache.spark.sql.functions._
/*
* 活跃用户:买课或者看课的用户
* */
object Active {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("etl0")
val spark = SparkSession.builder().config(conf).getOrCreate()
val sc = SparkContext.getOrCreate(conf)
import spark.implicits._
val logs = JdbcUtil.getDataFrame(spark,"full_access_logs")
// logs.printSchema()
val ds = logs.filter($"actionName"==="BuyCourse" || $"actionName"==="StartLearn")
// ds.show(false)
val ds2 = ds.map(x=>(x.getAs[String]("userUID"),x.getAs[String]("event_time").substring(0,10)))
// ds2.show(false)
ds2.withColumnRenamed("_2","日期").groupBy($"日期").agg(countDistinct($"_1").as("活跃人数")).show()
}
}
以上是关于Spark数据处理与分析---留存率的主要内容,如果未能解决你的问题,请参考以下文章