Spark手机流量日志处理使用SparkSQL按月统计流量使用量最多的用户
Posted 大数据小禅
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark手机流量日志处理使用SparkSQL按月统计流量使用量最多的用户相关的知识,希望对你有一定的参考价值。
🚀 作者 :“大数据小禅”
🚀文章简介:本篇文章属于Spark系列文章,专栏将会记录从spark基础到进阶的内容
🚀 内容涉及到Spark的入门集群搭建,核心组件,RDD,算子的使用,底层原理,SparkCore,SparkSQL,SparkStreaming等,Spark专栏地址.欢迎小伙伴们订阅💪
手机流量日志处理
SparkSQL简介
-
Spark SQL是Apache Spark的一个模块,提供了一种基于结构化数据的编程接口。它允许用户使用SQL语句或DataFrame API来查询和操作数据,同时还支持使用Spark的分布式计算引擎进行高效的并行计算。
-
Spark SQL支持多种数据源,包括Hive、JSON、Parquet、Avro、ORC等,这些数据源可以通过DataFrame API或SQL语句进行查询和操作。同时,Spark SQL还提供了一些高级功能,如窗口函数、聚合函数、UDF等,以满足更复杂的数据分析需求。
-
Spark SQL还支持将SQL查询结果写入到外部数据源,如Hive表、JSON文件、Parquet文件等。此外,Spark SQL还提供了一些工具,如Spark SQL CLI、JDBC/ODBC驱动程序等,方便用户进行交互式查询和数据分析。
-
使用前需要新引入对应依赖
依赖引入
使用Spark SQL需要在项目中添加以下依赖:
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.2</version>
</dependency>
</dependencies>
其中,spark-sql_2.12是Spark SQL的核心依赖,spark-core_2.12是Spark的核心依赖。注意,版本号可以根据实际情况进行调整。
如果需要使用其他数据源,如mysql、Hive等,则需要添加相应的依赖。例如,如果需要连接MySQL数据库,则需要添加以下依赖:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.25</version>
</dependency>
其中,spark-sql-kafka-0-10_2.12是连接Kafka数据源的依赖,mysql-connector-java是连接MySQL数据库的依赖。注意,版本号也可以根据实际情况进行调整。
以上是使用Maven进行依赖配置的方式。
SparkSQL快速入门案例
- 准备数据
- 我们假设有一个CSV文件employee.csv,包含了员工的信息,如下所示:
id,name,age,gender,salary
1,Jack,25,M,5000
2,Lucy,28,F,6000
3,Tom,30,M,8000
4,Lily,27,F,7000
5,David,32,M,9000
创建SparkSession对象
首先,我们需要创建一个SparkSession对象,它是Spark SQL的入口点。可以使用以下代码创建SparkSession对象:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder
.appName("Spark SQL Demo")
.getOrCreate()
//加载CSV文件
//使用SparkSession对象的read方法加载CSV文件:
val df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("employee.csv")
//其中,header=true表示第一行是列名,inferSchema=true表示自动推断列的数据类型。
//创建临时表
//使用DataFrame的createOrReplaceTempView方法将DataFrame注册为一个临时表:
df.createOrReplaceTempView("employee")
//执行SQL查询
//使用SparkSession对象的sql方法执行SQL查询:
val result = spark.sql("SELECT * FROM employee WHERE age > 27")
这将返回所有年龄大于27岁的员工信息。
//输出结果
//使用DataFrame的show方法输出查询结果:
result.show()
//这将输出所有符合条件的员工信息。
- 完整代码如下:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder
.appName("Spark SQL Demo")
.getOrCreate()
val df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("employee.csv")
df.createOrReplaceTempView("employee")
val result = spark.sql("SELECT * FROM employee WHERE age > 27")
result.show()
输出结果:
+---+----+---+------+-----+
| id|name|age|gender|salary|
+---+----+---+------+-----+
| 2|Lucy| 28| F| 6000|
| 3| Tom| 30| M| 8000|
| 5|David| 32| M| 9000|
+---+----+---+------+-----+
手机流量日志数据格式与处理要求
- 日志字段与字段说明如下
1.需要实现的需求1.按月统计流量使用量最多的用户(每个月使用流量最多的用户)
2.将结果数据持久化到硬盘
处理程序
/**
* @Description
* @Author xiaochan
* @Version 1.0
*/
// 时间戳 手机号码 基站物理地址 ip 接受数 接受数据包 上行流量 下行流量 状态码
//2020-03-10 15707126156 QK-X7-7N-G2-1N-QZ:CMCC 212.188.187.220 33 40 67584 81920 200
//使用量 =上+下 手机号码就是用户 RDD处理方式->((月,号码),(上行+下行))
//1.下载手机流量日志
//2.按月统计流量使用量最多的用户
//3.将结果数据持久化到硬盘
object LogPhone
System.setProperty("hadoop.home.dir","F:\\\\hadoop-2.7.3\\\\hadoop-2.7.3")
def main(args: Array[String]): Unit =
//1.创建sparksession
val sc = new sql.SparkSession.Builder()
.appName("test")
.master("local[6]")
.config("spark.testing.memory", "471859201")
.getOrCreate()
// 读取输入文件
val log = sc.sparkContext.textFile("dataset\\\\phone.log")
val value = log.map(_.split("\\t")).filter(arr =>
!(arr(1) == null)
).map(tmp =>
//处理日期 获取月份
val month: String = tmp(0).split("-")(1)
//号码
val user = tmp(1)
//使用流量数
var use = tmp(6) + tmp(7)
Log(user, use.toLong, month)
)
sc.createDataFrame(value).createOrReplaceTempView("log")
//每个月流量使用做多的用户 group by行数会减少,开窗函数over()行数不会减少
val data: DataFrame = sc.sql("select user,month,useall from " +
"(select user,month,sum(use) over(partition by user,month order by use desc) as useall," +
"dense_rank() over(partition by month order by use desc) as rn from log)t1 where rn=1 order by month")
data.show()
data.write.parquet("dataset\\\\output\\\\directory")
sc.close()
/**
* @Description
* @Author xiaochan
* @Version 1.0
*/
case class Log(
user: String,
use: Long,
month: String)
- 结果如下
spark中使用sparksql对日志进行分析(属于小案例)
一:使用sparksql开发
1.sparksql开发的两种方式
HQL:SQL语句开发
eq : sqlContext.sql("xxxx")
DSL : sparkSql中DataFrame的API调用方式
eq:val df=sqlContext.xxx
df.select("number")
二:HQL的开发案例
1.新建目录上传日志
2.开启服务
三:书写程序
1.描述
这个程序一共包括两个部分。
所以写的是两个程序。
2.程序一:对日志的描述--ApacheAccessLog
1 package com.ibeifeng.bigdata.spark.log 2 3 import scala.util.matching.Regex 4 5 /** 6 * 64.242.88.10 - - [07/Mar/2004:16:05:49 -0800] "GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1" 401 12846 7 * Created by ibf on 01/15. 8 */ 9 case class ApacheAccessLog( 10 ipAddress: String, // IP地址 11 clientId: String, // 客户端唯一标识符 12 userId: String, // 用户唯一标识符 13 serverTime: String, // 服务器时间 14 method: String, // 请求类型/方式 15 endpoint: String, // 请求的资源 16 protocol: String, // 请求的协议名称 17 responseCode: Int, // 请求返回值:比如:200、401 18 contentSize: Long // 返回的结果数据大小 19 ) 20 21 /** 22 * 64.242.88.10 - - [07/Mar/2004:16:05:49 -0800] "GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1" 401 12846 23 * Created by ibf on 01/15. 24 * 提供一些操作Apache Log的工具类供SparkCore使用 25 */ 26 object ApacheAccessLog { 27 // Apache日志的正则 28 val PARTTERN: Regex = 29 """^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] "(\\S+) (\\S+) (\\S+)" (\\d{3}) (\\d+)""".r 30 31 /** 32 * 验证一下输入的数据是否符合给定的日志正则,如果符合返回true;否则返回false 33 * 34 * @param line 35 * @return 36 */ 37 def isValidateLogLine(line: String): Boolean = { 38 val options = PARTTERN.findFirstMatchIn(line) 39 40 if (options.isEmpty) { 41 false 42 } else { 43 true 44 } 45 } 46 47 /** 48 * 解析输入的日志数据 49 * 50 * @param line 51 * @return 52 */ 53 def parseLogLine(line: String): ApacheAccessLog = { 54 if (!isValidateLogLine(line)) { 55 throw new IllegalArgumentException("参数格式异常") 56 } 57 58 // 从line中获取匹配的数据 59 val options = PARTTERN.findFirstMatchIn(line) 60 61 // 获取matcher 62 val matcher = options.get 63 64 // 构建返回值 65 ApacheAccessLog( 66 matcher.group(1), // 获取匹配字符串中第一个小括号中的值 67 matcher.group(2), 68 matcher.group(3), 69 matcher.group(4), 70 matcher.group(5), 71 matcher.group(6), 72 matcher.group(7), 73 matcher.group(8).toInt, 74 matcher.group(9).toLong 75 ) 76 } 77 }
3.程序二:针对需求进行--LogAnalysis
1 package com.ibeifeng.bigdata.spark.log 2 3 import com.ibeifeng.bigdata.spark.core.ApacheAccessLog 4 import org.apache.spark.sql.{DataFrame, SQLContext} 5 import org.apache.spark.{SparkContext, SparkConf} 6 /** 7 * Created by Administrator on 2017/4/25. 8 */ 9 object LogAnalysis { 10 def main(args: Array[String]):Unit={ 11 //sqlContext 12 val conf=new SparkConf() 13 .setMaster("local[*]") 14 .setAppName("log-analysis-sparksql") 15 val sc=SparkContext.getOrCreate(conf) 16 val sqlContext=new SQLContext(sc) 17 import sqlContext.implicits._ //如果不写,下面的转换不成功 18 19 //transform 20 val path="/spark/logs/input" 21 val rdd=sc.textFile(path) 22 val apacheAccessDataFrame=rdd 23 .filter(line=>ApacheAccessLog.isValidateLogLine(line)) 24 .map(line => { 25 ApacheAccessLog.parseLogLine(line) 26 }).toDF() //rdd转换为DataFrame 27 28 //register temptable 29 apacheAccessDataFrame.registerTempTable("log_analysis_temp_table") 30 sqlContext.sql("select * from log_analysis_temp_table limit 1").show() 31 32 //需求一:求contentSize的平均值,最大值以及最小值 33 val resultDataFrame1=sqlContext.sql( 34 """ 35 |SELECT 36 |AVG(contentSize) as avg_contentSize, 37 |MAX(contentSize) as max_contentSize, 38 |MIN(contentSize) as min_contentSize 39 |FROM log_analysis_temp_table 40 """.stripMargin) 41 resultDataFrame1.show() 42 43 //save //save as HDFS 44 val resultRdd=resultDataFrame1.map(row=>{ 45 val avgSize=row.getAs[Double]("avg_contentSize") 46 val minSize=row.getAs[Long]("min_contentSize") 47 val maxSize=row.getAs[Long]("max_contentSize") 48 (avgSize,minSize,maxSize) 49 }) 50 resultRdd.saveAsTextFile(s"/spark/logs/output/sql_${System.currentTimeMillis()}") 51 52 //需求二:求各个返回值出现的数据个数 53 val resultDataFrame2=sqlContext.sql( 54 """ 55 |SELECT 56 |responseCode AS code, 57 |COUNT(1) AS count 58 |FROM log_analysis_temp_table 59 |GROUP BY responseCode 60 """.stripMargin 61 ) 62 resultDataFrame2.show() 63 64 //需求三:求访问次数大于N的IP地址,并对黑名单进行限制 65 val blackIP=Array("200-55-104-193.ds1.prima.net.ar","10.0.0.153","208-38-57-205.ip.cal.radiant.net") 66 val N=10 67 val resultDataFrame3=sqlContext.sql( 68 s""" 69 |SELECT 70 |ipAddress AS ip, 71 |COUNT(1) AS count 72 |FROM log_analysis_temp_table 73 |WHERE not(ipAddress in(${blackIP.map(ip=>s"\'${ip}\'").mkString(",")})) 74 |GROUP BY ipAddress 75 |HAVING count>${N} 76 """.stripMargin) 77 resultDataFrame3.show() 78 79 //需求四:求访问次数最多的前k个endpoint的值 80 val k=10 81 val resultDataFrame4=sqlContext.sql( 82 s""" 83 |SELECT 84 | t.endpoint, 85 | t.count 86 |FROM( 87 |SELECT 88 | endpoint, 89 | COUNT(1) AS count 90 |FROM log_analysis_temp_table 91 |GROUP BY endpoint) t 92 |ORDER BY t.count DESC 93 |limit ${k} 94 """.stripMargin) 95 resultDataFrame4.show() 96 } 97 }
4.运行结果
以上是关于Spark手机流量日志处理使用SparkSQL按月统计流量使用量最多的用户的主要内容,如果未能解决你的问题,请参考以下文章
sparkStreaming结合sparkSql进行日志分析