049 DSL语句
Posted juncaoit
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了049 DSL语句相关的知识,希望对你有一定的参考价值。
1.说明
2.sql程序
1 package com.scala.it 2 3 4 import org.apache.spark.sql.hive.HiveContext 5 import org.apache.spark.{SparkConf, SparkContext} 6 7 import scala.math.BigDecimal.RoundingMode 8 9 object SparkSQLDSLDemo { 10 def main(args: Array[String]): Unit = { 11 val conf = new SparkConf() 12 .setMaster("local[*]") 13 .setAppName("dsl") 14 val sc = SparkContext.getOrCreate(conf) 15 val sqlContext = new HiveContext(sc) 16 17 // ================================================= 18 sqlContext.sql( 19 """ 20 | SELECT 21 | deptno as no, 22 | SUM(sal) as sum_sal, 23 | AVG(sal) as avg_sal, 24 | SUM(mgr) as sum_mgr, 25 | AVG(mgr) as avg_mgr 26 | FROM hadoop09.emp 27 | GROUP BY deptno 28 | ORDER BY deptno DESC 29 """.stripMargin).show() 30 } 31 }
3.效果
4.DSL对上面程序重构
1 package com.scala.it 2 3 4 import org.apache.spark.sql.hive.HiveContext 5 import org.apache.spark.{SparkConf, SparkContext} 6 7 import scala.math.BigDecimal.RoundingMode 8 9 object SparkSQLDSLDemo { 10 def main(args: Array[String]): Unit = { 11 val conf = new SparkConf() 12 .setMaster("local[*]") 13 .setAppName("dsl") 14 val sc = SparkContext.getOrCreate(conf) 15 val sqlContext = new HiveContext(sc) 16 17 // ================================================= 18 sqlContext.sql( 19 """ 20 | SELECT 21 | deptno as no, 22 | SUM(sal) as sum_sal, 23 | AVG(sal) as avg_sal, 24 | SUM(mgr) as sum_mgr, 25 | AVG(mgr) as avg_mgr 26 | FROM hadoop09.emp 27 | GROUP BY deptno 28 | ORDER BY deptno DESC 29 """.stripMargin).show() 30 31 //================================================= 32 // 读取数据形成DataFrame,并缓存DataFrame 33 val df = sqlContext.read.table("hadoop09.emp") 34 df.cache() 35 //================================================= 36 import sqlContext.implicits._ 37 import org.apache.spark.sql.functions._ 38 39 //=================================================对上面sql进行DSL 40 df.select("deptno", "sal", "mgr") 41 .groupBy("deptno") 42 .agg( 43 sum("sal").as("sum_sal"), 44 avg("sal").as("avg_sal"), 45 sum("mgr").as("sum_mgr"), 46 avg("mgr").as("avg_mgr") 47 ) 48 .orderBy($"deptno".desc) 49 .show() 50 } 51 }
5.效果
6.Select语句
可以使用string,也可以使用col,或者$。
在Select中可以使用自定义的函数进行使用。
1 package com.scala.it 2 3 4 import org.apache.spark.sql.hive.HiveContext 5 import org.apache.spark.{SparkConf, SparkContext} 6 7 import scala.math.BigDecimal.RoundingMode 8 9 object SparkSQLDSLDemo { 10 def main(args: Array[String]): Unit = { 11 val conf = new SparkConf() 12 .setMaster("local[*]") 13 .setAppName("dsl") 14 val sc = SparkContext.getOrCreate(conf) 15 val sqlContext = new HiveContext(sc) 16 17 // ================================================= 18 sqlContext.sql( 19 """ 20 | SELECT 21 | deptno as no, 22 | SUM(sal) as sum_sal, 23 | AVG(sal) as avg_sal, 24 | SUM(mgr) as sum_mgr, 25 | AVG(mgr) as avg_mgr 26 | FROM hadoop09.emp 27 | GROUP BY deptno 28 | ORDER BY deptno DESC 29 """.stripMargin).show() 30 31 //================================================= 32 // 读取数据形成DataFrame,并缓存DataFrame 33 val df = sqlContext.read.table("hadoop09.emp") 34 df.cache() 35 //================================================= 36 import sqlContext.implicits._ 37 import org.apache.spark.sql.functions._ 38 53 //=================================================Select语句 54 df.select("empno", "ename", "deptno").show() 55 df.select(col("empno").as("id"), $"ename".as("name"), df("deptno")).show() 56 df.select($"empno".as("id"), substring($"ename", 0, 1).as("name")).show() 57 df.selectExpr("empno as id", "substring(ename,0,1) as name").show() 58 59 //使用自定义的函数 60 sqlContext.udf.register( 61 "doubleValueFormat", // 自定义函数名称 62 (value: Double, scale: Int) => { 63 // 自定义函数处理的代码块 64 BigDecimal.valueOf(value).setScale(scale, RoundingMode.HALF_DOWN).doubleValue() 65 }) 66 df.selectExpr("doubleValueFormat(sal,2)").show() 67 } 68 }
7.Where语句
1 //=================================================Where语句 2 df.where("sal > 1000 and sal < 2000").show() 3 df.where($"sal" > 1000 && $"sal" < 2000).show()
8.groupBy语句
建议使用第三种方式,也是最常见的使用方式。
同样是支持自定义函数。
package com.scala.it import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{SparkConf, SparkContext} import scala.math.BigDecimal.RoundingMode object SparkSQLDSLDemo { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local[*]") .setAppName("dsl") val sc = SparkContext.getOrCreate(conf) val sqlContext = new HiveContext(sc) // ================================================= sqlContext.sql( """ | SELECT | deptno as no, | SUM(sal) as sum_sal, | AVG(sal) as avg_sal, | SUM(mgr) as sum_mgr, | AVG(mgr) as avg_mgr | FROM hadoop09.emp | GROUP BY deptno | ORDER BY deptno DESC """.stripMargin).show() //================================================= // 读取数据形成DataFrame,并缓存DataFrame val df = sqlContext.read.table("hadoop09.emp") df.cache() //================================================= import sqlContext.implicits._ import org.apache.spark.sql.functions._ //=================================================GroupBy语句 //这种方式不推荐使用,下面也说明了问题 df.groupBy("deptno").agg( "sal" -> "min", // 求min(sal) "sal" -> "max", // 求max(sal) ===> 会覆盖同列的其他聚合函数,解决方案:重新命名 "mgr" -> "max" // 求max(mgr) ).show() sqlContext.udf.register("selfAvg", AvgUDAF) df.groupBy("deptno").agg( "sal" -> "selfAvg" ).toDF("deptno", "self_avg_sal").show() df.groupBy("deptno").agg( min("sal").as("min_sal"), max("sal").as("max_sal"), max("mgr") ).where("min_sal > 1200").show() } }
9.sort、orderBy排序
1 //=================================================数据排序 2 // sort、orderBy ==> 全局有序 3 // repartition ==> 局部数据有序 4 df.sort("sal").select("empno", "sal").show() 5 df.repartition(3).sort($"sal".desc).select("empno", "sal").show() 6 df.repartition(3).orderBy($"sal".desc).select("empno", "sal").show() 7 df.repartition(3).sortWithinPartitions($"sal".desc).select("empno", "sal").show()
10.窗口函数
1 //=================================================Hive的窗口分析函数 2 // 必须使用HiveContext来构建DataFrame 3 // 通过row_number函数来实现分组排序TopN的需求: 先按照某些字段进行数据分区,然后分区的数据在分区内进行topN的获取 4 val window = Window.partitionBy("deptno").orderBy($"sal".desc) 5 df.select( 6 $"empno", 7 $"ename", 8 $"deptno", 9 row_number().over(window).as("rnk") 10 ).where("rnk <= 3").show()
二:总程序总览
1 package com.scala.it 2 3 4 import org.apache.spark.sql.expressions.Window 5 import org.apache.spark.sql.hive.HiveContext 6 import org.apache.spark.{SparkConf, SparkContext} 7 8 import scala.math.BigDecimal.RoundingMode 9 10 object SparkSQLDSLDemo { 11 def main(args: Array[String]): Unit = { 12 val conf = new SparkConf() 13 .setMaster("local[*]") 14 .setAppName("dsl") 15 val sc = SparkContext.getOrCreate(conf) 16 val sqlContext = new HiveContext(sc) 17 18 // ================================================= 19 sqlContext.sql( 20 """ 21 | SELECT 22 | deptno as no, 23 | SUM(sal) as sum_sal, 24 | AVG(sal) as avg_sal, 25 | SUM(mgr) as sum_mgr, 26 | AVG(mgr) as avg_mgr 27 | FROM hadoop09.emp 28 | GROUP BY deptno 29 | ORDER BY deptno DESC 30 """.stripMargin).show() 31 32 //================================================= 33 // 读取数据形成DataFrame,并缓存DataFrame 34 val df = sqlContext.read.table("hadoop09.emp") 35 df.cache() 36 //================================================= 37 import sqlContext.implicits._ 38 import org.apache.spark.sql.functions._ 39 40 //=================================================对上面sql进行DSL 41 df.select("deptno", "sal", "mgr") 42 .groupBy("deptno") 43 .agg( 44 sum("sal").as("sum_sal"), 45 avg("sal").as("avg_sal"), 46 sum("mgr").as("sum_mgr"), 47 avg("mgr").as("avg_mgr") 48 ) 49 .orderBy($"deptno".desc) 50 .show() 51 52 //=================================================Select语句 53 df.select("empno", "ename", "deptno").show() 54 df.select(col("empno").as("id"), $"ename".as("name"), df("deptno")).show() 55 df.select($"empno".as("id"), substring($"ename", 0, 1).as("name")).show() 56 df.selectExpr("empno as id", "substring(ename,0,1) as name").show() 57 58 //使用自定义的函数 59 sqlContext.udf.register( 60 "doubleValueFormat", // 自定义函数名称 61 (value: Double, scale: Int) => { 62 // 自定义函数处理的代码块 63 BigDecimal.valueOf(value).setScale(scale, RoundingMode.HALF_DOWN).doubleValue() 64 }) 65 df.selectExpr("doubleValueFormat(sal,2)").show() 66 67 //=================================================Where语句 68 df.where("sal > 1000 and sal < 2000").show() 69 df.where($"sal" > 1000 && $"sal" < 2000).show() 70 71 //=================================================GroupBy语句 72 //这种方式不推荐使用,下面也说明了问题 73 df.groupBy("deptno").agg( 74 "sal" -> "min", // 求min(sal) 75 "sal" -> "max", // 求max(sal) ===> 会覆盖同列的其他聚合函数,解决方案:重新命名 76 "mgr" -> "max" // 求max(mgr) 77 ).show() 78 79 sqlContext.udf.register("selfAvg", AvgUDAF) 80 df.groupBy("deptno").agg( 81 "sal" -> "selfAvg" 82 ).toDF("deptno", "self_avg_sal").show() 83 84 df.groupBy("deptno").agg( 85 min("sal").as("min_sal"), 86 max("sal").as("max_sal"), 87 max("mgr") 88 ).where("min_sal > 1200").show() 89 90 91 //=================================================数据排序 92 // sort、orderBy ==> 全局有序 93 // repartition ==> 局部数据有序 94 df.sort("sal").select("empno", "sal").show() 95 df.repartition(3).sort($"sal".desc).select("empno", "sal").show() 96 df.repartition(3).orderBy($"sal".desc).select("empno", "sal").show() 97 df.repartition(3).sortWithinPartitions($"sal".desc).select("empno", "sal").show() 98 99 //=================================================Hive的窗口分析函数 100 // 必须使用HiveContext来构建DataFrame 101 // 通过row_number函数来实现分组排序TopN的需求: 先按照某些字段进行数据分区,然后分区的数据在分区内进行topN的获取 102 val window = Window.partitionBy("deptno").orderBy($"sal".desc) 103 df.select( 104 $"empno", 105 $"ename", 106 $"deptno", 107 row_number().over(window).as("rnk") 108 ).where("rnk <= 3").show() 109 } 110 }
以上是关于049 DSL语句的主要内容,如果未能解决你的问题,请参考以下文章
ES中查询语句DSL(domain specific language)