大数据进阶之路——Spark SQL日志分析

Posted 孙中明

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据进阶之路——Spark SQL日志分析相关的知识,希望对你有一定的参考价值。

基本方案

用户行为日志:用户每次访问网站时所有的行为数据(访问、浏览、搜索、点击…)
用户行为轨迹、流量日志

日志数据内容:

  • 1)访问的系统属性: 操作系统、浏览器等等
  • 2)访问特征:点击的url、从哪个url跳转过来的(referer)、页面上的停留时间等
  • 3)访问信息:session_id、访问ip(访问城市)等
2013-05-19 13:00:00     http://www.taobao.com/17/?tracker_u=1624169&type=1      B58W48U4WKZCJ5D1T3Z9ZY88RU7QA7B1        http://hao.360.cn/      1.196.34.243 

数据处理流程

  • 1) 数据采集
    Flume: web日志写入到HDFS

  • 2)数据清洗
    脏数据
    Spark、Hive、MapReduce 或者是其他的一些分布式计算框架
    清洗完之后的数据可以存放在HDFS(Hive/Spark SQL)

  • 3)数据处理
    按照我们的需要进行相应业务的统计和分析
    Spark、Hive、MapReduce 或者是其他的一些分布式计算框架

  • 4)处理结果入库
    结果可以存放到RDBMS、NoSQL

  • 5)数据的可视化
    通过图形化展示的方式展现出来:饼图、柱状图、地图、折线图
    ECharts、HUE、Zeppelin

数据清洗

首先通过debug 找到分割后各个字段的对应的

  • 报错
java.io.IOException: Could not locate executable null\\bin\\winutils.exe in the Hadoop binaries.

执行第一步数据清洗时候,数据能打印出来,但是不能写入本地文件,这是因为本地没有hadoop伪分布式系统

装一个插件即可

https://hiszm.lanzous.com/iWyqmhrgk0f

下载上述插件,然后,新建目录并且放入到目录里面
C:\\Data\\hadoop\\bin

然后再系统环境变量添加
HADOOP_HOME
C:\\Data\\hadoop

package org.sparksql

import org.apache.spark.sql.SparkSession

object SparkFormatApp {

  def main(args: Array[String]): Unit = {

    //SparkSession是spark的入口类
    val spark = SparkSession.builder().appName("SparkFormatApp")
                .master("local[2]").getOrCreate()
    val access = spark.sparkContext.textFile("10000_access.log")

    //access.take(10).foreach(println)

    access.map(line=>{
      val splits = line.split(" ")
      val ip = splits(0)
      val time = splits(3) + " " + splits(4)
      val traffic = splits(9)
      val url =  splits(11).replace("\\"","")
     //(ip,DateUtils.parse(time),traffic,traffic,url)
      DateUtils.parse(time) + "\\t" + url + "\\t" + traffic + "\\t" + ip
    }).saveAsTextFile("output")

    //.take(10).foreach(println)
    //.saveAsTextFile("output")

    spark.stop()

  }
}

一般的日志处理方式,我们是需要进行分区的,
按照日志中的访问时间进行相应的分区,比如:d,h,m5(每5分钟一个分区)

二次清洗

  • 输入:访问时间、访问URL、耗费的流量、访问IP地址信息
  • 输出:URL、cmsType(video/article)、cmsId(编号)、流量、ip、城市信息、访问时间、天
package org.sparksql

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}

//访问日志工具转换类
object AccessConvertUtils {

  val struct=StructType(
    Array(
      StructField("url",StringType),
      StructField("cmsType",StringType),
      StructField("cmsId",LongType),
      StructField("traffic",LongType),
      StructField("ip",StringType),
      StructField("city",StringType),
      StructField("time",StringType),
      StructField("day",StringType)
    )
  )

//根据输入的每一行信息转化成输出的样式
  def parseLog(log:String)={
    try{
      val splits=log.split("\\t")
      val url =splits(1)
      val traffic = splits(2).toLong
      val ip = splits(3)

      val domain="http://www.imooc.com/"
      val cms=url.substring(url.indexOf(domain) + domain.length)
      val cmsTypeId = cms.split("/")
      var cmsType = ""
      var cmsId = 0l
      if(cmsTypeId.length > 1){
        cmsType = cmsTypeId(0)
        cmsId = cmsTypeId(1).toLong
      }

      val city = IpUtils.getCity(ip)
      val time = splits(0)
      val day =  time.substring(0,10).replaceAll("-","")
      Row(url,cmsType,cmsId,traffic,ip,city,time,day)
    }catch {
      case e : Exception => Row(0)
    }
  }
}


  • IP=>省份

使用github上已有的开源项目
1)git clone https://github.com/wzhe06/ipdatabase.git

2)编译下载的项目:mvn clean package -DskipTests

3)安装jar包到自己的maven仓库

mvn install:install-file -Dfile=C:\\Data\\ipdatabase\\target\\ipdatabase-1.0-SNAPSHOT.jar -DgroupId=com.ggstar -DartifactId=ipdatabase -Dversion=1.0 -Dpackaging=jar

  1. 拷贝相关文件不然会报错

java.io.FileNotFoundException: file:/Users/rocky/maven_repos/com/ggstar/ipdatabase/1.0/ipdatabase-1.0.jar!/ipRegion.xlsx (No such file or directory)
  1. 测试

package org.sparksql

import org.apache.spark.sql.SparkSession

object SparkCleanApp {

  def main(args: Array[String]): Unit = {
    //SparkSession是spark的入口类
    val spark = SparkSession.builder().appName("SparkFormatApp")
      .master("local[2]").getOrCreate()
    val accessRDD = spark.sparkContext.textFile("access.log")

    //accessRDD.take(10).foreach(println)

    val accessDF = spark.createDataFrame(accessRDD.map(x=>AccessConvertUtils.parseLog(x)),AccessConvertUtils.struct)

    accessDF.printSchema()
    accessDF.show()

    spark.stop
  }


}

root
 |-- url: string (nullable = true)
 |-- cmsType: string (nullable = true)
 |-- cmsId: long (nullable = true)
 |-- traffic: long (nullable = true)
 |-- ip: string (nullable = true)
 |-- city: string (nullable = true)
 |-- time: string (nullable = true)
 |-- day: string (nullable = true)




+--------------------+-------+-----+-------+---------------+----+-------------------+--------+
|                 url|cmsType|cmsId|traffic|             ip|city|               time|     day|
+--------------------+-------+-----+-------+---------------+----+-------------------+--------+
|http://www.imooc....|  video| 4500|    304|  218.75.35.226| 浙江省|2017-05-11 14:09:14|20170511|
|http://www.imooc....|  video|14623|     69| 202.96.134.133| 广东省|2017-05-11 15:25:05|20170511|
|http://www.imooc....|article|17894|    115| 202.96.134.133| 广东省|2017-05-11 07:50:01|20170511|
|http://www.imooc....|article|17896|    804|  218.75.35.226| 浙江省|2017-05-11 02:46:43|20170511|
|http://www.imooc....|article|17893|    893|222.129.235.182| 北京市|2017-05-11 09:30:25|20170511|
|http://www.imooc....|article|17891|    407|  218.75.35.226| 浙江省|2017-05-11 08:07:35|20170511|
|http://www.imooc....|article|17897|     78| 202.96.134.133| 广东省|2017-05-11 19:08:13|20170511|
|http://www.imooc....|article|17894|    658|222.129.235.182| 北京市|2017-05-11 04:18:47|20170511|
|http://www.imooc....|article|17893|    161|   58.32.19.255| 上海市|2017-05-11 01:25:21|20170511|
|http://www.imooc....|article|17895|    701|    218.22.9.56| 安徽省|2017-05-11 13:37:22|20170511|
|http://www.imooc....|article|17892|    986|  218.75.35.226| 浙江省|2017-05-11 05:53:47|20170511|
|http://www.imooc....|  video|14540|    987|   58.32.19.255| 上海市|2017-05-11 18:44:56|20170511|
|http://www.imooc....|article|17892|    610|  218.75.35.226| 浙江省|2017-05-11 17:48:51|20170511|
|http://www.imooc....|article|17893|      0|    218.22.9.56| 安徽省|2017-05-11 16:20:03|20170511|
|http://www.imooc....|article|17891|    262|   58.32.19.255| 上海市|2017-05-11 00:38:01|20170511|
|http://www.imooc....|  video| 4600|    465|  218.75.35.226| 浙江省|2017-05-11 17:38:16|20170511|
|http://www.imooc....|  video| 4600|    833|222.129.235.182| 北京市|2017-05-11 07:11:36|20170511|
|http://www.imooc....|article|17895|    320|222.129.235.182| 北京市|2017-05-11 19:25:04|20170511|
|http://www.imooc....|article|17898|    460| 202.96.134.133| 广东省|2017-05-11 15:14:28|20170511|
|http://www.imooc....|article|17899|    389|222.129.235.182| 北京市|2017-05-11 02:43:15|20170511|
+--------------------+-------+-----+-------+---------------+----+-------------------+--------+

调优点:

  1. 控制文件输出的大小: coalesce
  2. 分区字段的数据类型调整:spark.sql.sources.partitionColumnTypeInference.enabled
  3. 批量插入数据库数据,提交使用batch操作
package org.sparksql

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
object TopNApp {
  //最受欢迎
  def videoAccessTopN(spark: SparkSession, accessDF: DataFrame) = {
    import spark.implicits._
    val videoTopNDF = accessDF.filter($"day"==="20170511"&& $"cmsType" === "video")
      .groupBy("day","cmsId").agg(count("cmsId")
      .as("times")).orderBy($"times".desc)
    videoTopNDF.show()

    accessDF.createOrReplaceTempView("access_log")
    val videoTopNDF1 = spark.sql("select day,cmsId,count(1) as times from access_log where day='20170511' and cmsType = 'video' group by day,cmsId order by times desc")

    videoTopNDF1.show()


  }

  def main(args: Array[String]): Unit = {
    //SparkSession是spark的入口类
    val spark = SparkSession.builder().appName("SparkFormatApp")
      .config("spark.sql.sources.partitionColumnTypeInference.enabled","false")
      .master("local[2]").getOrCreate()

    val accessDF= spark.read.format("parquet").load("output2/")
    accessDF.printSchema()
    accessDF.show(false)

    videoAccessTopN(spark,accessDF)
    spark.stop()
  }




}








+--------以上是关于大数据进阶之路——Spark SQL日志分析的主要内容,如果未能解决你的问题,请参考以下文章

日志分析为例进入大数据 Spark SQL 的世界 共10章

CK2255-以慕课网日志分析为例 进入大数据 Spark SQL 的世界

spark-sql的进阶案例

大数据必经之路-认识Spark

大数据必经之路-认识Spark

spark SQL 读取mysql中的数据日志分析