Spark手机流量日志处理使用SparkSQL按月统计流量使用量最多的用户

Posted 大数据小禅

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark手机流量日志处理使用SparkSQL按月统计流量使用量最多的用户相关的知识,希望对你有一定的参考价值。

🚀 作者 :“大数据小禅”

🚀文章简介:本篇文章属于Spark系列文章,专栏将会记录从spark基础到进阶的内容
🚀 内容涉及到Spark的入门集群搭建,核心组件,RDD,算子的使用,底层原理,SparkCore,SparkSQL,SparkStreaming等,Spark专栏地址.欢迎小伙伴们订阅💪

手机流量日志处理

SparkSQL简介

  • Spark SQL是Apache Spark的一个模块,提供了一种基于结构化数据的编程接口。它允许用户使用SQL语句或DataFrame API来查询和操作数据,同时还支持使用Spark的分布式计算引擎进行高效的并行计算。

  • Spark SQL支持多种数据源,包括Hive、JSON、Parquet、Avro、ORC等,这些数据源可以通过DataFrame API或SQL语句进行查询和操作。同时,Spark SQL还提供了一些高级功能,如窗口函数、聚合函数、UDF等,以满足更复杂的数据分析需求。

  • Spark SQL还支持将SQL查询结果写入到外部数据源,如Hive表、JSON文件、Parquet文件等。此外,Spark SQL还提供了一些工具,如Spark SQL CLI、JDBC/ODBC驱动程序等,方便用户进行交互式查询和数据分析。

  • 使用前需要新引入对应依赖

依赖引入

使用Spark SQL需要在项目中添加以下依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>3.1.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.1.2</version>
    </dependency>
</dependencies>

其中,spark-sql_2.12是Spark SQL的核心依赖,spark-core_2.12是Spark的核心依赖。注意,版本号可以根据实际情况进行调整。

如果需要使用其他数据源,如mysql、Hive等,则需要添加相应的依赖。例如,如果需要连接MySQL数据库,则需要添加以下依赖:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
    <version>3.1.2</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.25</version>
</dependency>

其中,spark-sql-kafka-0-10_2.12是连接Kafka数据源的依赖,mysql-connector-java是连接MySQL数据库的依赖。注意,版本号也可以根据实际情况进行调整。

以上是使用Maven进行依赖配置的方式。

SparkSQL快速入门案例

  • 准备数据
  • 我们假设有一个CSV文件employee.csv,包含了员工的信息,如下所示:
id,name,age,gender,salary
1,Jack,25,M,5000
2,Lucy,28,F,6000
3,Tom,30,M,8000
4,Lily,27,F,7000
5,David,32,M,9000

创建SparkSession对象
首先,我们需要创建一个SparkSession对象,它是Spark SQL的入口点。可以使用以下代码创建SparkSession对象:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder
  .appName("Spark SQL Demo")
  .getOrCreate()
//加载CSV文件
//使用SparkSession对象的read方法加载CSV文件:

val df = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("employee.csv")
//其中,header=true表示第一行是列名,inferSchema=true表示自动推断列的数据类型。

//创建临时表
//使用DataFrame的createOrReplaceTempView方法将DataFrame注册为一个临时表:

df.createOrReplaceTempView("employee")
//执行SQL查询
//使用SparkSession对象的sql方法执行SQL查询:
val result = spark.sql("SELECT * FROM employee WHERE age > 27")
这将返回所有年龄大于27岁的员工信息。

//输出结果
//使用DataFrame的show方法输出查询结果:

result.show()
//这将输出所有符合条件的员工信息。
  • 完整代码如下:
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder
  .appName("Spark SQL Demo")
  .getOrCreate()

val df = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("employee.csv")

df.createOrReplaceTempView("employee")

val result = spark.sql("SELECT * FROM employee WHERE age > 27")

result.show()
输出结果:

+---+----+---+------+-----+
| id|name|age|gender|salary|
+---+----+---+------+-----+
|  2|Lucy| 28|     F| 6000|
|  3| Tom| 30|     M| 8000|
|  5|David| 32|     M| 9000|
+---+----+---+------+-----+

手机流量日志数据格式与处理要求

  • 日志字段与字段说明如下
    1.需要实现的需求1.按月统计流量使用量最多的用户(每个月使用流量最多的用户)
    2.将结果数据持久化到硬盘

处理程序

/**
  * @Description
  * @Author xiaochan
  * @Version 1.0
  */
// 时间戳         手机号码          基站物理地址             ip        接受数 接受数据包 上行流量  下行流量  状态码
//2020-03-10	15707126156	QK-X7-7N-G2-1N-QZ:CMCC	212.188.187.220	33	     40	    67584	   81920	200
//使用量 =上+下  手机号码就是用户   RDD处理方式->((月,号码),(上行+下行))
//1.下载手机流量日志
//2.按月统计流量使用量最多的用户
//3.将结果数据持久化到硬盘
object LogPhone 
  System.setProperty("hadoop.home.dir","F:\\\\hadoop-2.7.3\\\\hadoop-2.7.3")
  def main(args: Array[String]): Unit = 
    //1.创建sparksession
    val sc = new sql.SparkSession.Builder()
      .appName("test")
      .master("local[6]")
      .config("spark.testing.memory", "471859201")
      .getOrCreate()
    // 读取输入文件
    val log = sc.sparkContext.textFile("dataset\\\\phone.log")
    val value = log.map(_.split("\\t")).filter(arr => 
      !(arr(1) == null)
    ).map(tmp => 
      //处理日期 获取月份
      val month: String = tmp(0).split("-")(1)
      //号码
      val user = tmp(1)
      //使用流量数
      var use = tmp(6) + tmp(7)
      Log(user, use.toLong, month)
    )
    sc.createDataFrame(value).createOrReplaceTempView("log")
    //每个月流量使用做多的用户 group by行数会减少,开窗函数over()行数不会减少
    val data: DataFrame = sc.sql("select user,month,useall from " +
      "(select user,month,sum(use) over(partition by user,month order by use desc) as useall," +
      "dense_rank() over(partition by month order by use desc) as rn from log)t1 where rn=1 order by month")
    data.show()
    data.write.parquet("dataset\\\\output\\\\directory")

    sc.close()
  


/**
  * @Description
  * @Author xiaochan
  * @Version 1.0
  */
case class Log(
    user: String,
    use: Long,
    month: String)


  • 结果如下

spark中使用sparksql对日志进行分析(属于小案例)

一:使用sparksql开发

1.sparksql开发的两种方式

  HQL:SQL语句开发

    eq : sqlContext.sql("xxxx")

  DSL : sparkSql中DataFrame的API调用方式

    eq:val df=sqlContext.xxx

       df.select("number")

 

二:HQL的开发案例

1.新建目录上传日志

  

 

2.开启服务

  

 

三:书写程序

1.描述

  这个程序一共包括两个部分。

  所以写的是两个程序。

 

2.程序一:对日志的描述--ApacheAccessLog

 1 package com.ibeifeng.bigdata.spark.log
 2 
 3 import scala.util.matching.Regex
 4 
 5 /**
 6    * 64.242.88.10 - - [07/Mar/2004:16:05:49 -0800] "GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1" 401 12846
 7    * Created by ibf on 01/15.
 8    */
 9 case class ApacheAccessLog(
10                              ipAddress: String, // IP地址
11                              clientId: String, // 客户端唯一标识符
12                              userId: String, // 用户唯一标识符
13                              serverTime: String, // 服务器时间
14                              method: String, // 请求类型/方式
15                              endpoint: String, // 请求的资源
16                              protocol: String, // 请求的协议名称
17                              responseCode: Int, // 请求返回值:比如:200、401
18                              contentSize: Long // 返回的结果数据大小
19                            )
20 
21 /**
22    * 64.242.88.10 - - [07/Mar/2004:16:05:49 -0800] "GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1" 401 12846
23    * Created by ibf on 01/15.
24    * 提供一些操作Apache Log的工具类供SparkCore使用
25    */
26 object ApacheAccessLog {
27    // Apache日志的正则
28    val PARTTERN: Regex =
29    """^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] "(\\S+) (\\S+) (\\S+)" (\\d{3}) (\\d+)""".r
30 
31    /**
32      * 验证一下输入的数据是否符合给定的日志正则,如果符合返回true;否则返回false
33      *
34      * @param line
35      * @return
36      */
37    def isValidateLogLine(line: String): Boolean = {
38      val options = PARTTERN.findFirstMatchIn(line)
39 
40      if (options.isEmpty) {
41        false
42      } else {
43        true
44      }
45    }
46 
47    /**
48      * 解析输入的日志数据
49      *
50      * @param line
51      * @return
52      */
53    def parseLogLine(line: String): ApacheAccessLog = {
54      if (!isValidateLogLine(line)) {
55        throw new IllegalArgumentException("参数格式异常")
56      }
57 
58      // 从line中获取匹配的数据
59      val options = PARTTERN.findFirstMatchIn(line)
60 
61      // 获取matcher
62      val matcher = options.get
63 
64      // 构建返回值
65      ApacheAccessLog(
66        matcher.group(1), // 获取匹配字符串中第一个小括号中的值
67        matcher.group(2),
68        matcher.group(3),
69        matcher.group(4),
70        matcher.group(5),
71        matcher.group(6),
72        matcher.group(7),
73        matcher.group(8).toInt,
74        matcher.group(9).toLong
75      )
76    }
77  }

 

3.程序二:针对需求进行--LogAnalysis

 1 package com.ibeifeng.bigdata.spark.log
 2 
 3 import com.ibeifeng.bigdata.spark.core.ApacheAccessLog
 4 import org.apache.spark.sql.{DataFrame, SQLContext}
 5 import org.apache.spark.{SparkContext, SparkConf}
 6 /**
 7  * Created by Administrator on 2017/4/25.
 8  */
 9 object LogAnalysis {
10   def main(args: Array[String]):Unit={
11     //sqlContext
12     val conf=new SparkConf()
13       .setMaster("local[*]")
14       .setAppName("log-analysis-sparksql")
15     val sc=SparkContext.getOrCreate(conf)
16     val sqlContext=new SQLContext(sc)
17     import sqlContext.implicits._                //如果不写,下面的转换不成功
18 
19     //transform
20     val path="/spark/logs/input"
21     val rdd=sc.textFile(path)
22     val apacheAccessDataFrame=rdd
23       .filter(line=>ApacheAccessLog.isValidateLogLine(line))
24       .map(line => {
25         ApacheAccessLog.parseLogLine(line)
26     }).toDF()                                    //rdd转换为DataFrame
27 
28     //register temptable
29     apacheAccessDataFrame.registerTempTable("log_analysis_temp_table")
30     sqlContext.sql("select * from log_analysis_temp_table limit 1").show()
31 
32     //需求一:求contentSize的平均值,最大值以及最小值
33     val resultDataFrame1=sqlContext.sql(
34       """
35         |SELECT
36         |AVG(contentSize) as avg_contentSize,
37         |MAX(contentSize) as max_contentSize,
38         |MIN(contentSize) as min_contentSize
39         |FROM log_analysis_temp_table
40       """.stripMargin)
41     resultDataFrame1.show()
42 
43     //save                                         //save as HDFS
44     val resultRdd=resultDataFrame1.map(row=>{
45       val avgSize=row.getAs[Double]("avg_contentSize")
46       val minSize=row.getAs[Long]("min_contentSize")
47       val maxSize=row.getAs[Long]("max_contentSize")
48       (avgSize,minSize,maxSize)
49     })
50     resultRdd.saveAsTextFile(s"/spark/logs/output/sql_${System.currentTimeMillis()}")
51 
52     //需求二:求各个返回值出现的数据个数
53     val resultDataFrame2=sqlContext.sql(
54     """
55       |SELECT
56       |responseCode AS code,
57       |COUNT(1) AS count
58       |FROM log_analysis_temp_table
59       |GROUP BY responseCode
60     """.stripMargin
61     )
62     resultDataFrame2.show()
63 
64     //需求三:求访问次数大于N的IP地址,并对黑名单进行限制
65     val blackIP=Array("200-55-104-193.ds1.prima.net.ar","10.0.0.153","208-38-57-205.ip.cal.radiant.net")
66     val N=10
67     val resultDataFrame3=sqlContext.sql(
68     s"""
69       |SELECT
70       |ipAddress AS ip,
71       |COUNT(1) AS count
72       |FROM log_analysis_temp_table
73       |WHERE not(ipAddress in(${blackIP.map(ip=>s"\'${ip}\'").mkString(",")}))
74       |GROUP BY ipAddress
75       |HAVING count>${N}
76     """.stripMargin)
77     resultDataFrame3.show()
78 
79     //需求四:求访问次数最多的前k个endpoint的值
80     val k=10
81     val resultDataFrame4=sqlContext.sql(
82     s"""
83        |SELECT
84        |  t.endpoint,
85        |  t.count
86        |FROM(
87        |SELECT
88        |  endpoint,
89        |  COUNT(1) AS count
90        |FROM log_analysis_temp_table
91        |GROUP BY endpoint) t
92        |ORDER BY t.count DESC
93        |limit ${k}
94      """.stripMargin)
95     resultDataFrame4.show()
96   }
97 }

 

4.运行结果

  

  

  

  

  

 

  

 

  

以上是关于Spark手机流量日志处理使用SparkSQL按月统计流量使用量最多的用户的主要内容,如果未能解决你的问题,请参考以下文章

大数据Spark及SparkSQL数据倾斜现象和解决思路

sparkStreaming结合sparkSql进行日志分析

3.8 Spark 用户日志分析

spark结构化数据处理:Spark SQLDataFrame和Dataset

Spark 处理小文件

Spark SQL 结构化数据文件处理