SparkSQL

Posted aidata

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkSQL相关的知识,希望对你有一定的参考价值。

一、概述

 

组件

技术图片

 

 

技术图片

 

 

 

运行机制

技术图片

 

 

转 SparkSQL – 从0到1认识Catalyst  https://blog.csdn.net/qq_36421826/article/details/81988157

深入研究Spark SQL的Catalyst优化器(原创翻译)

 

 

 

更高效

技术图片

 

 

 

 

 

查询优化

技术图片

技术图片

 

 

 优化:把filter提前

 

 数据源优化

技术图片

 

 

 编译优化 Code generation

技术图片

 

 

 

技术图片

 

 

 

DataSet和DataFrame

技术图片

 

 

 

技术图片

 

 

 

 

数据源

技术图片

 

 

 Parquet文件

技术图片

 

 

 Json文件

技术图片

 

 

 读取Hive中文件

技术图片

 

 

 外部数据源spark.read.format

技术图片

 

 

 

 

二、程序设计

常规流程

技术图片

 

 

 

API:SQL与DataFrame DSL

技术图片

 

 

 

 

 

技术图片

 

 

 

 

技术图片

 

 

 

 

 

 

统计分析内容大小-全部内容大小,日志条数,最小内容大小,最大内容大小

package org.sparkcourse.log

import org.apache.spark.sql.Row, SparkSession

object LogAnalyzerSQL 
  def main(args: Array[String]): Unit = 
    
    val spark = SparkSession.builder()
      .appName("Log Analyzer")
      .master("local")
      .getOrCreate()

    import spark.implicits._

    val accessLogs = spark
      .read
      .textFile("data/weblog/apache.access.log")
      .map(ApacheAccessLog.parseLogLine).toDF()

    accessLogs.createOrReplaceTempView("logs")

    // 统计分析内容大小-全部内容大小,日志条数,最小内容大小,最大内容大小
    val contentSizeStats: Row = spark.sql("SELECT SUM(contentSize), COUNT(*), MIN(contentSize), MAX(contentSize) FROM logs").first()
    val sum = contentSizeStats.getLong(0)
    val count = contentSizeStats.getLong(1)
    val min = contentSizeStats.getLong(2)
    val max = contentSizeStats.getLong(3)
    println("sum %s, count %s, min %s, max %s".format(sum, count, min, max))
    println("avg %s", sum / count)
    spark.close()


  

ApacheAccessLog

package org.sparkcourse.log

import sun.security.x509.IPAddressName

case class ApacheAccessLog(ipAddress: String,
                           clientIdentd: String,
                           userId: String,
                           dateTime: String,
                           method: String,
                           endpoint: String,
                           protocol: String,
                           responseCode: Int,
                           contentSize: Long)


object ApacheAccessLog 
  // 64.242.88.10 - - [07/Mar/2004:16:05:49 -0800] "GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1" 401 12846
  val PATTERN = """^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s+\\-\\d4)\\] "(\\S+) (\\S+) (\\S+)" (\\d3) (\\d+)""".r

  def parseLogLine(log: String): ApacheAccessLog = 
    log match 
      case PATTERN(ipAddress, clientIdentd, userId, dateTime, method, endpoint, protocol, responseCode, contentSize)
        => ApacheAccessLog(ipAddress, clientIdentd, userId, dateTime, method, endpoint, protocol, responseCode.toInt, contentSize.toLong)
      case _ => throw new RuntimeException(s"""Cannot parse log line: $log""")
    
  

 

统计每种返回码的数量

package org.sparkcourse.log

import org.apache.spark.sql.Row, SparkSession

object LogAnalyzerSQL 
  def main(args: Array[String]): Unit = 

    val spark = SparkSession.builder()
      .appName("Log Analyzer")
      .master("local")
      .getOrCreate()

    import spark.implicits._

    val accessLogs = spark
      .read
      .textFile("data/weblog/apache.access.log")
      .map(ApacheAccessLog.parseLogLine).toDF()

    accessLogs.createOrReplaceTempView("logs")



    // 统计每种返回码的数量.
    val responseCodeToCount = spark.sql("SELECT responseCode, COUNT(*) FROM logs GROUP BY responseCode LIMIT 100")
      .map(row => (row.getInt(0), row.getLong(1)))
      .collect()
    responseCodeToCount.foreach(print(_))
  

 

统计哪个IP地址访问服务器超过10次

package org.sparkcourse.log

import org.apache.spark.sql.Row, SparkSession

object LogAnalyzerSQL 
  def main(args: Array[String]): Unit = 

    val spark = SparkSession.builder()
      .appName("Log Analyzer")
      .master("local")
      .getOrCreate()

    import spark.implicits._

    val accessLogs = spark
      .read
      .textFile("data/weblog/apache.access.log")
      .map(ApacheAccessLog.parseLogLine).toDF()

    accessLogs.createOrReplaceTempView("logs")





    // 统计哪个IP地址访问服务器超过10次
    val ipAddresses = spark.sql("SELECT ipAddress, COUNT(*) AS total FROM logs GROUP BY ipAddress HAVING total > 10 LIMIT 100")
      .map(row => row.getString(0))
      .collect()
    ipAddresses.foreach(println(_))
  

 

查询访问量最大的访问目的地址

package org.sparkcourse.log

import org.apache.spark.sql.Row, SparkSession

object LogAnalyzerSQL 
  def main(args: Array[String]): Unit = 

    val spark = SparkSession.builder()
      .appName("Log Analyzer")
      .master("local")
      .getOrCreate()

    import spark.implicits._

    val accessLogs = spark
      .read
      .textFile("data/weblog/apache.access.log")
      .map(ApacheAccessLog.parseLogLine).toDF()

    accessLogs.createOrReplaceTempView("logs")




    // 查询访问量最大的访问目的地址
    val topEndpoints = spark.sql("SELECT endpoint, COUNT(*) AS total FROM logs GROUP BY endpoint ORDER BY total DESC LIMIT 10")
      .map(row => (row.getString(0), row.getLong(1)))
      .collect()
    topEndpoints.foreach(println(_))

  

 

以上是关于SparkSQL的主要内容,如果未能解决你的问题,请参考以下文章

Spark SQL 介绍

Spark SQL

寒假第十天

Spark学习 Spark SQL

Spark SQL知识点大全与实战

大数据高级开发工程师——Spark学习笔记