Spark基础学习笔记30:Spark SQL案例分析

Posted howard2005

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark基础学习笔记30:Spark SQL案例分析相关的知识,希望对你有一定的参考价值。

文章目录

零、本讲学习目标

  1. 使用Spark SQL实现词频统计
  2. 掌握Spark SQL与Hive整合
  3. 掌握Spark SQL读写mysql
  4. 完成Spark热点搜索词统计
  5. Spark SQL智慧交通数据分析

一、使用Spark SQL实现词频统计

(一)数据源 - words.txt

(二)创建Maven项目

  • 创建Maven项目 - SparkSQLWordCount

(三)添加依赖和构建插件

  • pom.xml文件里添加依赖和构建插件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
         http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>net.hw.wc</groupId>
    <artifactId>SparkSQLWordCount</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>
    </dependencies>
    <build>        
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.3.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.3.2</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

(四)修改源目录名称

  • 将源目录名由java改成scala
  • pom.xml文件里,设置源目录

(五)创建日志属性文件

  • 在resources目录里创建log4j.properties文件
log4j.rootLogger=ERROR, stdout, logfile
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spark.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

(六)创建词频统计单例对象

  • 创建net.hw.wc包,在包里创建SparkSQLWordCount单例对象
package net.hw.wc

import org.apache.spark.sql.Dataset, SparkSession

/**
 * 功能:利用Spark SQL实现词频统计
 * 作者:华卫
 * 日期:2022年05月15日
 */
object SparkSQLWordCount 
  def main(args: Array[String]): Unit = 
    // 设置HADOOP用户名属性,否则本地运行访问会被拒绝
    System.setProperty("HADOOP_USER_NAME", "root")
    // 创建或得到SparkSession
    val spark = SparkSession.builder()
      .appName("SparkSQLWordCount")
      .master("local[*]")
      .getOrCreate()
    // 读取HDFS上的单词文件
    val lines: Dataset[String] = spark.read.textFile("hdfs://master:9000/input/words.txt")
    // 显示数据集lines内容
    lines.show()
    // 导入Spark会话对象的隐式转换
    import spark.implicits._
    // 将数据集中的数据按空格切分并合并
    val words: Dataset[String] = lines.flatMap(_.split(" "))
    // 显示数据集words内容
    words.show()
    // 将数据集默认列名由value改为word,并转换成数据帧
    val df = words.withColumnRenamed("value", "word").toDF()
    // 显示数据帧内容
    df.show()
    // 基于数据帧创建临时视图
    df.createTempView("v_words")
    // 执行SQL分组查询,实现词频统计
    val wc = spark.sql(
      """
        | select word, count(*) as count
        |    from v_words group by word
        |    order by count desc
        |""".stripMargin)
    // 显示词频统计结果
    wc.show()
    // 关闭会话
    spark.close()
  

(七)启动程序,查看结果

  • 运行SparkSQLWordCount单例对象

(八)词频统计数据转化流程图

  • 文本文件,转化成数据集,再转化成数据帧,最后基于表查询得到结果数据帧

以上是关于Spark基础学习笔记30:Spark SQL案例分析的主要内容,如果未能解决你的问题,请参考以下文章

Spark基础学习笔记26:Spark SQL数据源 - JSON数据集

Spark基础学习笔记13:Scala函数

Spark基础学习笔记29:Spark SQL内置函数

学习笔记Spark—— Spark SQL应用—— Spark DataFrame基础操作

Spark基础学习笔记27:Spark SQL数据源 - Hive表

学习笔记Spark—— Spark SQL应用—— Spark DataSet基础操作