大数据进阶之路——Spark SQL日志分析
Posted 孙中明
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据进阶之路——Spark SQL日志分析相关的知识,希望对你有一定的参考价值。
基本方案
用户行为日志:用户每次访问网站时所有的行为数据(访问、浏览、搜索、点击…)
用户行为轨迹、流量日志
日志数据内容:
- 1)访问的系统属性: 操作系统、浏览器等等
- 2)访问特征:点击的url、从哪个url跳转过来的(referer)、页面上的停留时间等
- 3)访问信息:session_id、访问ip(访问城市)等
2013-05-19 13:00:00 http://www.taobao.com/17/?tracker_u=1624169&type=1 B58W48U4WKZCJ5D1T3Z9ZY88RU7QA7B1 http://hao.360.cn/ 1.196.34.243
数据处理流程
-
1) 数据采集
Flume: web日志写入到HDFS -
2)数据清洗
脏数据
Spark、Hive、MapReduce 或者是其他的一些分布式计算框架
清洗完之后的数据可以存放在HDFS(Hive/Spark SQL) -
3)数据处理
按照我们的需要进行相应业务的统计和分析
Spark、Hive、MapReduce 或者是其他的一些分布式计算框架 -
4)处理结果入库
结果可以存放到RDBMS、NoSQL -
5)数据的可视化
通过图形化展示的方式展现出来:饼图、柱状图、地图、折线图
ECharts、HUE、Zeppelin
数据清洗
首先通过debug
找到分割后各个字段的对应的
- 报错
java.io.IOException: Could not locate executable null\\bin\\winutils.exe in the Hadoop binaries.
执行第一步数据清洗时候,数据能打印出来,但是不能写入本地文件,这是因为本地没有hadoop伪分布式系统
装一个插件即可
https://hiszm.lanzous.com/iWyqmhrgk0f
下载上述插件,然后,新建目录并且放入到目录里面
C:\\Data\\hadoop\\bin
然后再系统环境变量添加
HADOOP_HOME
C:\\Data\\hadoop
package org.sparksql
import org.apache.spark.sql.SparkSession
object SparkFormatApp {
def main(args: Array[String]): Unit = {
//SparkSession是spark的入口类
val spark = SparkSession.builder().appName("SparkFormatApp")
.master("local[2]").getOrCreate()
val access = spark.sparkContext.textFile("10000_access.log")
//access.take(10).foreach(println)
access.map(line=>{
val splits = line.split(" ")
val ip = splits(0)
val time = splits(3) + " " + splits(4)
val traffic = splits(9)
val url = splits(11).replace("\\"","")
//(ip,DateUtils.parse(time),traffic,traffic,url)
DateUtils.parse(time) + "\\t" + url + "\\t" + traffic + "\\t" + ip
}).saveAsTextFile("output")
//.take(10).foreach(println)
//.saveAsTextFile("output")
spark.stop()
}
}
一般的日志处理方式,我们是需要进行分区的,
按照日志中的访问时间进行相应的分区,比如:d,h,m5(每5分钟一个分区)
二次清洗
- 输入:
访问时间、访问URL、耗费的流量、访问IP地址信息
- 输出:
URL、cmsType(video/article)、cmsId(编号)、流量、ip、城市信息、访问时间、天
package org.sparksql
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
//访问日志工具转换类
object AccessConvertUtils {
val struct=StructType(
Array(
StructField("url",StringType),
StructField("cmsType",StringType),
StructField("cmsId",LongType),
StructField("traffic",LongType),
StructField("ip",StringType),
StructField("city",StringType),
StructField("time",StringType),
StructField("day",StringType)
)
)
//根据输入的每一行信息转化成输出的样式
def parseLog(log:String)={
try{
val splits=log.split("\\t")
val url =splits(1)
val traffic = splits(2).toLong
val ip = splits(3)
val domain="http://www.imooc.com/"
val cms=url.substring(url.indexOf(domain) + domain.length)
val cmsTypeId = cms.split("/")
var cmsType = ""
var cmsId = 0l
if(cmsTypeId.length > 1){
cmsType = cmsTypeId(0)
cmsId = cmsTypeId(1).toLong
}
val city = IpUtils.getCity(ip)
val time = splits(0)
val day = time.substring(0,10).replaceAll("-","")
Row(url,cmsType,cmsId,traffic,ip,city,time,day)
}catch {
case e : Exception => Row(0)
}
}
}
- IP=>省份
使用github上已有的开源项目
1)git clone https://github.com/wzhe06/ipdatabase.git
2)编译下载的项目:mvn clean package -DskipTests
3)安装jar包到自己的maven仓库
mvn install:install-file -Dfile=C:\\Data\\ipdatabase\\target\\ipdatabase-1.0-SNAPSHOT.jar -DgroupId=com.ggstar -DartifactId=ipdatabase -Dversion=1.0 -Dpackaging=jar
- 拷贝相关文件不然会报错
java.io.FileNotFoundException: file:/Users/rocky/maven_repos/com/ggstar/ipdatabase/1.0/ipdatabase-1.0.jar!/ipRegion.xlsx (No such file or directory)
- 测试
package org.sparksql
import org.apache.spark.sql.SparkSession
object SparkCleanApp {
def main(args: Array[String]): Unit = {
//SparkSession是spark的入口类
val spark = SparkSession.builder().appName("SparkFormatApp")
.master("local[2]").getOrCreate()
val accessRDD = spark.sparkContext.textFile("access.log")
//accessRDD.take(10).foreach(println)
val accessDF = spark.createDataFrame(accessRDD.map(x=>AccessConvertUtils.parseLog(x)),AccessConvertUtils.struct)
accessDF.printSchema()
accessDF.show()
spark.stop
}
}
root
|-- url: string (nullable = true)
|-- cmsType: string (nullable = true)
|-- cmsId: long (nullable = true)
|-- traffic: long (nullable = true)
|-- ip: string (nullable = true)
|-- city: string (nullable = true)
|-- time: string (nullable = true)
|-- day: string (nullable = true)
+--------------------+-------+-----+-------+---------------+----+-------------------+--------+
| url|cmsType|cmsId|traffic| ip|city| time| day|
+--------------------+-------+-----+-------+---------------+----+-------------------+--------+
|http://www.imooc....| video| 4500| 304| 218.75.35.226| 浙江省|2017-05-11 14:09:14|20170511|
|http://www.imooc....| video|14623| 69| 202.96.134.133| 广东省|2017-05-11 15:25:05|20170511|
|http://www.imooc....|article|17894| 115| 202.96.134.133| 广东省|2017-05-11 07:50:01|20170511|
|http://www.imooc....|article|17896| 804| 218.75.35.226| 浙江省|2017-05-11 02:46:43|20170511|
|http://www.imooc....|article|17893| 893|222.129.235.182| 北京市|2017-05-11 09:30:25|20170511|
|http://www.imooc....|article|17891| 407| 218.75.35.226| 浙江省|2017-05-11 08:07:35|20170511|
|http://www.imooc....|article|17897| 78| 202.96.134.133| 广东省|2017-05-11 19:08:13|20170511|
|http://www.imooc....|article|17894| 658|222.129.235.182| 北京市|2017-05-11 04:18:47|20170511|
|http://www.imooc....|article|17893| 161| 58.32.19.255| 上海市|2017-05-11 01:25:21|20170511|
|http://www.imooc....|article|17895| 701| 218.22.9.56| 安徽省|2017-05-11 13:37:22|20170511|
|http://www.imooc....|article|17892| 986| 218.75.35.226| 浙江省|2017-05-11 05:53:47|20170511|
|http://www.imooc....| video|14540| 987| 58.32.19.255| 上海市|2017-05-11 18:44:56|20170511|
|http://www.imooc....|article|17892| 610| 218.75.35.226| 浙江省|2017-05-11 17:48:51|20170511|
|http://www.imooc....|article|17893| 0| 218.22.9.56| 安徽省|2017-05-11 16:20:03|20170511|
|http://www.imooc....|article|17891| 262| 58.32.19.255| 上海市|2017-05-11 00:38:01|20170511|
|http://www.imooc....| video| 4600| 465| 218.75.35.226| 浙江省|2017-05-11 17:38:16|20170511|
|http://www.imooc....| video| 4600| 833|222.129.235.182| 北京市|2017-05-11 07:11:36|20170511|
|http://www.imooc....|article|17895| 320|222.129.235.182| 北京市|2017-05-11 19:25:04|20170511|
|http://www.imooc....|article|17898| 460| 202.96.134.133| 广东省|2017-05-11 15:14:28|20170511|
|http://www.imooc....|article|17899| 389|222.129.235.182| 北京市|2017-05-11 02:43:15|20170511|
+--------------------+-------+-----+-------+---------------+----+-------------------+--------+
调优点:
- 控制文件输出的大小: coalesce
- 分区字段的数据类型调整:spark.sql.sources.partitionColumnTypeInference.enabled
- 批量插入数据库数据,提交使用batch操作
package org.sparksql
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
object TopNApp {
//最受欢迎
def videoAccessTopN(spark: SparkSession, accessDF: DataFrame) = {
import spark.implicits._
val videoTopNDF = accessDF.filter($"day"==="20170511"&& $"cmsType" === "video")
.groupBy("day","cmsId").agg(count("cmsId")
.as("times")).orderBy($"times".desc)
videoTopNDF.show()
accessDF.createOrReplaceTempView("access_log")
val videoTopNDF1 = spark.sql("select day,cmsId,count(1) as times from access_log where day='20170511' and cmsType = 'video' group by day,cmsId order by times desc")
videoTopNDF1.show()
}
def main(args: Array[String]): Unit = {
//SparkSession是spark的入口类
val spark = SparkSession.builder().appName("SparkFormatApp")
.config("spark.sql.sources.partitionColumnTypeInference.enabled","false")
.master("local[2]").getOrCreate()
val accessDF= spark.read.format("parquet").load("output2/")
accessDF.printSchema()
accessDF.show(false)
videoAccessTopN(spark,accessDF)
spark.stop()
}
}
+--------以上是关于大数据进阶之路——Spark SQL日志分析的主要内容,如果未能解决你的问题,请参考以下文章
日志分析为例进入大数据 Spark SQL 的世界 共10章