spark小案例——RDD,sparkSQL

Posted Z-hhhhh

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark小案例——RDD,sparkSQL相关的知识,希望对你有一定的参考价值。

分别使用RDD和SparkSQL两种方式解决相同的数据分析问题;

项目数据

项目需求
使用RDD和SQL两种方式对数据清洗

  • 清洗需求如下:
  • 统计每个店铺分别有多少商品
  • 统计每个店铺的总销售额
  • 统计每个店铺销售额最高的前三商品,输出内容包括:店铺名,商品名和销售额其
  • 中销售额为0的商品不进行统计计算,例如:如果某个店铺销售为 0则不进行统计 。

涉及到的pom依赖

  <properties>
    <scala.version>2.12.10</scala.version>
    <spark.version>3.1.2</spark.version>
  </properties>

<dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>$scala.version</version>
    </dependency>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-reflect</artifactId>
      <version>$scala.version</version>
    </dependency>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-compiler</artifactId>
      <version>$scala.version</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.12</artifactId>
      <version>$spark.version</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.12</artifactId>
      <version>$spark.version</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-jdbc</artifactId>
      <version>3.0.0</version>
    </dependency>
  </dependencies>

RDD方式

val session: SparkSession = SparkSession.builder().master("local[*]").appName("unname").getOrCreate()
    val sc: SparkContext = session.sparkContext

    //RDD方式
       sc.textFile("hdfs://ip地址:9820/目录/meituan_waimai_meishi.csv")
         .mapPartitionsWithIndex((ix,it)=>
           //删除表头
           if (ix == 0)it.drop(1)
           it.map(line=>
             //拆分数据,csv默认逗号拆分
             val ps: Array[String] = line.split(",")
             //获取有用的字段:店铺名,商品名,商品的总额
             //提前计算商品总额,完成数据的转换
             (ps(2),ps(4),ps(5).toFloat*ps(7).toInt)
           )
         )
       //根据店铺名分组
         .groupBy(_._1)
       //mapValues对键值对每个value都应用一个函数,但是,key不会发生变化
         .mapValues(itshop=>
           //迭代器不支持排序
           (
             itshop.size,itshop.map(_._3).sum,
             itshop.filter(_._3>0)
               .toArray
               .sortWith(_._3 > _._3)
               .take(3)
               .map(x=>
                 s"$x._2$x._3"
               ).mkString(";")
           )
         )
         .foreach(println)


 sc.stop()
 session.close()

SparkSQL方式

 val session: SparkSession = SparkSession.builder().master("local[*]").appName("unname").getOrCreate()
 val sc: SparkContext = session.sparkContext


import session.implicits._ //隐式转换
    /*
    这里的session不是Scala中的包名,
    而是创建的sparkSession对象的变量名称,
    所以必须先创建SparkSession对象再导入。
    这里的session对象不能使用var声明,因为Scala只支持val修饰的对象的引入
    * */

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.expressions.Window

    val frame: DataFrame = session.read.format("CSV")
      .option("inferSchema", true) //是否根据文件格式推断表结构
      .option("delimiter", ",") //指定分隔符,默认为逗号
      .option("nullValue", "NULL") //填充空值
      .option("header", true) //是否存在表头
      .load("hdfs://192.168.71.200:9820/test/data/meituan_waimai_meishi.csv")
      .select($"shop_name", $"spu_name", ($"spu_price" * $"month_sales").as("month_total"))
      .cache()  //避免重复计算=persist(StorageLevel.MEMORY_AND_DISK_2)

    val top3_shopname: DataFrame = frame
      .filter($"month_total" > 0)
      .select($"shop_name", $"spu_name", $"month_total",
        dense_rank().over(Window.partitionBy($"shop_name")
          .orderBy($"month_total".desc)).as("rnk")
      ).orderBy($"rnk".desc)
      .filter($"rnk" < 3)
      .groupBy($"shop_name".as("shop_name_top3"))
      .agg(collect_list(concat_ws("_", $"spu_name", $"month_total")).as("top3"))

    frame.groupBy($"shop_name")
        .agg(count($"spu_name").as("cmm_count"),sum($"month_total").as("shop_total"))
        .join(top3_shopname,$"shop_name_top3" === $"shop_name","inner")
        .select($"shop_name",$"cmm_count",$"shop_total",$"top3")
        .collect()  //合并分区
        .foreach(println)

    //    frame.collect().foreach(println)
    //    top3_shopname.foreach(x=>println(x.toString()))

sc.stop()
session.close()

以上是关于spark小案例——RDD,sparkSQL的主要内容,如果未能解决你的问题,请参考以下文章

spark小案例——RDD,sparkSQL

spark小案例——RDD,sparkSQL

sparksql 表定义 存储在哪

spark小案例——RDD,broadcast

spark小案例——RDD,broadcast

spark小案例——RDD,broadcast