Spark框架-离线数据统计

Posted 那人独钓寒江雪.

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark框架-离线数据统计相关的知识,希望对你有一定的参考价值。

数据清洗

任务简介:



第一小问:

第一步:输出日志(使用spark默认的log4j配置文件)

//此方法要放在主程序的首行,靠后对输出日志控制不起作用

Logger.getLogger("org").setLevel(Level.ERROR)

第二步:创建SparkSession对象(关闭严格模式,否则创建静态分区)

  val spark=SparkSession
      .builder()
      .appName("c_dataClear1")
      .master("local")
      .config("hive.exec.dynamic.partition.mode","nonstrict")//关闭严格模式,否则会有一个静态分区
      //hive元数据仓库目录
      .config("hive.metastore.warehouse.dir","hdfs://192.168.3.89:9000/user/hive/warehouse")//目录地址
      .enableHiveSupport()
      .getOrCreate()

第三步:拿出所有的表并进行清洗

val  tableArray=List("customer","lineitem","nation","orders","part","partsupp","region","supplier")//所有数据表的名称
    tableArray.foreach(table =>
   

第四步:删除分区并且统计

  //2.删除分区并统计  
      var odsData=spark.sql(s"select * from ods.$table").drop("part_date")//去除原来字段
      val num=odsData.count() //查看数据表结构
      
      println(odsData.schema)//查看数据表的结构

第五步:将对于字段的日期改为timestamp类型

  • TimeStamp类型为:yyyy-MM-dd HH:mm:ss
 //3.将对于字段的日期改为timestamp类型 格式为:yyyy-MM--dd HH:mm:ss
      odsData.columns.foreach(tableName => 
       val startIndex=tableName.length -4
       val endIndex=tableName.length
       
       if (endIndex >=4)
         val tamp=tableName.substring(startIndex,endIndex)//查看每个字段中最后四位是否带有date
         if (tamp.toLowerCase.equals("date"))
           println("==================带有日期字段=============="+tableName)
           odsData=odsData.withColumn(tableName,
             date_format(col(tableName),"yyyy-MM-dd HH:mm:ss").cast("timestamp"))
         

第六步:去除重复字段并创建临时视图

odsData.distinct().createOrReplaceTempView(table)//去除重复字段并创建临时视图
      
      spark.sql(s"drop table if exists dwd.$table")
      //4.创建表
      spark.sql(s"create table if not exists dwd.$table like ods.$table")
      
      //5.插入数据
      spark.sql(s"insert overwrite table dwd.$table select * from ods.$table")
      spark.sql(s"select * from dwd.$table limit 5").show //查询前五条数据
      
      spark.sql(s"desc dwd.$table").show//查看数据结构

第七步:查看去重启后数据的条数

//6.查看去重后数据的条数

val resultNum=spark.sql(s"select * from dwd.$table").count()
  print(s"================去除重复前数据条数$num=======================")
  print(s"================去除重复前数据条数$resultNum=======================")

第七步:打包集群

第八步:进入环境并运行

第九步:代码结果



第十步:代码源码

import org.apache.log4j.Level, Logger
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col,date_format
object C_DataClear1 
  def main(args: Array[String]): Unit = 
    Logger.getLogger("org").setLevel(Level.ERROR)
    //1:输出日志:使用spark默认的log4j配置文件设置日志输出级别 此方法要放在主程序的首行,靠后对输出日志控制不起作用
    //2.准备sparkSession对象
    val spark=SparkSession
      .builder()
      .appName("c_dataClear1")
      .master("local")
      .config("hive.exec.dynamic.partition.mode","nonstrict")//关闭严格模式,否则会有一个静态分区
      //hive元数据仓库目录
      .config("hive.metastore.warehouse.dir","hdfs://192.168.3.89:9000/user/hive/warehouse")//目录地址
      .enableHiveSupport()
      .getOrCreate()

    //拿出所有的表进行清洗
    val tableArray=List("customer","lineitem","nation","orders","part","partsupp","region","supplier")//所有数据表的名称
    tableArray.foreach(table =>
      //2.删除分区并统计
      var odsData=spark.sql(s"select * from ods.$table").drop("part_date")//去除原来字段
      val num=odsData.count() //查看数据表结构

      println(odsData.schema)//查看数据表的结构

      //3.将对于字段的日期改为timestamp类型 格式为:yyyy-MM--dd HH:mm:ss
      odsData.columns.foreach(tableName => 
       val startIndex=tableName.length -4
       val endIndex=tableName.length

       if (endIndex >=4)
         val tamp=tableName.substring(startIndex,endIndex)//查看每个字段中最后四位是否带有date
         if (tamp.toLowerCase.equals("date"))
           println("==================带有日期字段=============="+tableName)
           odsData=odsData.withColumn(tableName,
             date_format(col(tableName),"yyyy-MM-dd HH:mm:ss").cast("timestamp"))
         
       
      )
      odsData.distinct().createOrReplaceTempView(table)//去除重复字段并创建临时视图

      spark.sql(s"drop table if exists dwd.$table")
      //4.创建表
      spark.sql(s"create table if not exists dwd.$table like ods.$table")

      //5.插入数据
      spark.sql(s"insert overwrite table dwd.$table select * from ods.$table")
      spark.sql(s"select * from dwd.$table limit 5").show //查询前五条数据

      spark.sql(s"desc dwd.$table").show//查看数据结构

      //6.查看去重后数据的条数
      val resultNum=spark.sql(s"select * from dwd.$table").count()
      print(s"================去除重复前数据条数$num=======================")
      print(s"================去除重复前数据条数$resultNum=======================")

    )
  

第二小问:

第一步:创建SparkSession对象

val spark=SparkSession
      .builder()
      .master("local")
      .appName("c1_dataClear")
      .config("hive.metastore.warehouse.dir","hdfs://192.168.3.89:9000/user/hive/warehouse")
      .enableHiveSupport()
      .getOrCreate()

第二步:输入SQL语句

spark.sql("show databases").show
    
    spark.sql("alter table ods.customer drop partition (part_date<'20220410')")//保留三天内的数据

整体代码如下:

import org.apache.spark.sql.SparkSession

object C1_DataClear 
  def main(args: Array[String]): Unit = 
    val spark=SparkSession
      .builder()
      .master("local")
      .appName("c1_dataClear")
      .config("hive.metastore.warehouse.dir","hdfs://192.168.3.89:9000/user/hive/warehouse")
      .enableHiveSupport()
      .getOrCreate()
      
    spark.sql("show databases").show
    
    spark.sql("alter table ods.customer drop partition (part_date<'20220410')")//保留三天内的数据
  

第三步:打包集群

第四步:进入hive集群并且查看前十条数据

第五步:输入(SQL条件)并查看结果

以上是关于Spark框架-离线数据统计的主要内容,如果未能解决你的问题,请参考以下文章

Spark离线开发框架设计与实现

Spark SQL实现日志离线批处理

Spark SQL实现日志离线批处理

大数据分析处理框架——离线分析(hive,pig,spark)近似实时分析(Impala)和实时分析(stormspark streaming)

Spark从入门到精通

大数据中的Spark指的是啥?