spark中使用sparksql对日志进行分析(属于小案例)
Posted 曹军
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark中使用sparksql对日志进行分析(属于小案例)相关的知识,希望对你有一定的参考价值。
一:使用sparksql开发
1.sparksql开发的两种方式
HQL:SQL语句开发
eq : sqlContext.sql("xxxx")
DSL : sparkSql中DataFrame的API调用方式
eq:val df=sqlContext.xxx
df.select("number")
二:HQL的开发案例
1.新建目录上传日志
2.开启服务
三:书写程序
1.描述
这个程序一共包括两个部分。
所以写的是两个程序。
2.程序一:对日志的描述--ApacheAccessLog
1 package com.ibeifeng.bigdata.spark.log 2 3 import scala.util.matching.Regex 4 5 /** 6 * 64.242.88.10 - - [07/Mar/2004:16:05:49 -0800] "GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1" 401 12846 7 * Created by ibf on 01/15. 8 */ 9 case class ApacheAccessLog( 10 ipAddress: String, // IP地址 11 clientId: String, // 客户端唯一标识符 12 userId: String, // 用户唯一标识符 13 serverTime: String, // 服务器时间 14 method: String, // 请求类型/方式 15 endpoint: String, // 请求的资源 16 protocol: String, // 请求的协议名称 17 responseCode: Int, // 请求返回值:比如:200、401 18 contentSize: Long // 返回的结果数据大小 19 ) 20 21 /** 22 * 64.242.88.10 - - [07/Mar/2004:16:05:49 -0800] "GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1" 401 12846 23 * Created by ibf on 01/15. 24 * 提供一些操作Apache Log的工具类供SparkCore使用 25 */ 26 object ApacheAccessLog { 27 // Apache日志的正则 28 val PARTTERN: Regex = 29 """^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] "(\\S+) (\\S+) (\\S+)" (\\d{3}) (\\d+)""".r 30 31 /** 32 * 验证一下输入的数据是否符合给定的日志正则,如果符合返回true;否则返回false 33 * 34 * @param line 35 * @return 36 */ 37 def isValidateLogLine(line: String): Boolean = { 38 val options = PARTTERN.findFirstMatchIn(line) 39 40 if (options.isEmpty) { 41 false 42 } else { 43 true 44 } 45 } 46 47 /** 48 * 解析输入的日志数据 49 * 50 * @param line 51 * @return 52 */ 53 def parseLogLine(line: String): ApacheAccessLog = { 54 if (!isValidateLogLine(line)) { 55 throw new IllegalArgumentException("参数格式异常") 56 } 57 58 // 从line中获取匹配的数据 59 val options = PARTTERN.findFirstMatchIn(line) 60 61 // 获取matcher 62 val matcher = options.get 63 64 // 构建返回值 65 ApacheAccessLog( 66 matcher.group(1), // 获取匹配字符串中第一个小括号中的值 67 matcher.group(2), 68 matcher.group(3), 69 matcher.group(4), 70 matcher.group(5), 71 matcher.group(6), 72 matcher.group(7), 73 matcher.group(8).toInt, 74 matcher.group(9).toLong 75 ) 76 } 77 }
3.程序二:针对需求进行--LogAnalysis
1 package com.ibeifeng.bigdata.spark.log 2 3 import com.ibeifeng.bigdata.spark.core.ApacheAccessLog 4 import org.apache.spark.sql.{DataFrame, SQLContext} 5 import org.apache.spark.{SparkContext, SparkConf} 6 /** 7 * Created by Administrator on 2017/4/25. 8 */ 9 object LogAnalysis { 10 def main(args: Array[String]):Unit={ 11 //sqlContext 12 val conf=new SparkConf() 13 .setMaster("local[*]") 14 .setAppName("log-analysis-sparksql") 15 val sc=SparkContext.getOrCreate(conf) 16 val sqlContext=new SQLContext(sc) 17 import sqlContext.implicits._ //如果不写,下面的转换不成功 18 19 //transform 20 val path="/spark/logs/input" 21 val rdd=sc.textFile(path) 22 val apacheAccessDataFrame=rdd 23 .filter(line=>ApacheAccessLog.isValidateLogLine(line)) 24 .map(line => { 25 ApacheAccessLog.parseLogLine(line) 26 }).toDF() //rdd转换为DataFrame 27 28 //register temptable 29 apacheAccessDataFrame.registerTempTable("log_analysis_temp_table") 30 sqlContext.sql("select * from log_analysis_temp_table limit 1").show() 31 32 //需求一:求contentSize的平均值,最大值以及最小值 33 val resultDataFrame1=sqlContext.sql( 34 """ 35 |SELECT 36 |AVG(contentSize) as avg_contentSize, 37 |MAX(contentSize) as max_contentSize, 38 |MIN(contentSize) as min_contentSize 39 |FROM log_analysis_temp_table 40 """.stripMargin) 41 resultDataFrame1.show() 42 43 //save //save as HDFS 44 val resultRdd=resultDataFrame1.map(row=>{ 45 val avgSize=row.getAs[Double]("avg_contentSize") 46 val minSize=row.getAs[Long]("min_contentSize") 47 val maxSize=row.getAs[Long]("max_contentSize") 48 (avgSize,minSize,maxSize) 49 }) 50 resultRdd.saveAsTextFile(s"/spark/logs/output/sql_${System.currentTimeMillis()}") 51 52 //需求二:求各个返回值出现的数据个数 53 val resultDataFrame2=sqlContext.sql( 54 """ 55 |SELECT 56 |responseCode AS code, 57 |COUNT(1) AS count 58 |FROM log_analysis_temp_table 59 |GROUP BY responseCode 60 """.stripMargin 61 ) 62 resultDataFrame2.show() 63 64 //需求三:求访问次数大于N的IP地址,并对黑名单进行限制 65 val blackIP=Array("200-55-104-193.ds1.prima.net.ar","10.0.0.153","208-38-57-205.ip.cal.radiant.net") 66 val N=10 67 val resultDataFrame3=sqlContext.sql( 68 s""" 69 |SELECT 70 |ipAddress AS ip, 71 |COUNT(1) AS count 72 |FROM log_analysis_temp_table 73 |WHERE not(ipAddress in(${blackIP.map(ip=>s"\'${ip}\'").mkString(",")})) 74 |GROUP BY ipAddress 75 |HAVING count>${N} 76 """.stripMargin) 77 resultDataFrame3.show() 78 79 //需求四:求访问次数最多的前k个endpoint的值 80 val k=10 81 val resultDataFrame4=sqlContext.sql( 82 s""" 83 |SELECT 84 | t.endpoint, 85 | t.count 86 |FROM( 87 |SELECT 88 | endpoint, 89 | COUNT(1) AS count 90 |FROM log_analysis_temp_table 91 |GROUP BY endpoint) t 92 |ORDER BY t.count DESC 93 |limit ${k} 94 """.stripMargin) 95 resultDataFrame4.show() 96 } 97 }
4.运行结果
以上是关于spark中使用sparksql对日志进行分析(属于小案例)的主要内容,如果未能解决你的问题,请参考以下文章