spark sql - 如何在 spark sql 中编写动态查询

Posted

技术标签:

【中文标题】spark sql - 如何在 spark sql 中编写动态查询【英文标题】:spark sql - how to write dynamic query in spark sql 【发布时间】:2018-10-08 18:34:21 【问题描述】:

我有一张蜂巢桌。我想创建动态火花 SQL 查询。在火花提交时,我指定规则名称。基于规则名称的查询应该生成。在提交火花时,我必须指定规则名称。例如:

sparks-submit  <RuleName> IncorrectAge 

它应该触发我的 scala 目标代码:

select tablename, filter, condition from all_rules where rulename="IncorrectAge"

我的表:规则(输入表)

|---------------------------------------------------------------------------|
| rowkey|  rule_name|rule_run_status| tablename     |condition|filter |level|
|--------------------------------------------------------------------------|
| 1    |IncorrectAge| In_Progress  | VDP_Vendor_List| age>18 gender=Male|NA|
|---------------------------------------------------------------------------
|2    | Customer_age| In_Progress  | Customer_List | age<25 gender=Female|NA| 
|----------------------------------------------------------------------------

我获取规则名称:

 select tablename, filter, condition from all_rules where rulename="IncorrectAge";

执行此查询后,我得到如下结果:

   |----------------------------------------------|
   |tablename        | filter         | condition |
   |----------------------------------------------|
   |VDP_Vendor_List  | gender=Male     | age>18   |
   |----------------------------------------------|

现在我想动态进行 spark sql 查询

select count(*) from VDP_Vendor_List  // first column --tablename     
       select count(*) from VDP_Vendor_List where gender=Male  --tablename and filter
        select * from EMP where gender=Male  AND  age >18       --tablename, filter, condition

我的代码-Spark 2.2 版本代码:

         import org.apache.spark.sql. Row, SparkSession 
         import org.apache.log4j._

object allrules 
  def main(args: Array[String])       
    val spark = SparkSession.builder().master("local[*]")
      .appName("Spark Hive")
      .enableHiveSupport().getOrCreate();

    import spark.implicits._
    val sampleDF = spark.read.json("C:/software/sampletableCopy.json") // for testing purpose i converted  hive table to json data
 sampleDF.registerTempTable("sampletable")
 val allrulesDF = spark.sql("SELECT * FROM sampletable")

  allrulesDF.show()
  val TotalCount: Long = allrulesDF.count()
  println("==============>  Total count ======>" + allrulesDF.count())

  val df1 =  allrulesDF.select(allrulesDF.col("tablename"),allrulesDF.col("condition"),allrulesDF.col("filter"),allrulesDF.col("rule_name"))
 df1.show()
 val df2=   df1.where(df1.col("rule_name").equalTo("IncorrectAge")).show()           
    println(df2)



//                             var table_name = ""
//                             var condition =""
   //                              var filter = "";
  //              df1.foreach(row=>    
  //                                   table_name = row.get(1).toString();
  //                                   condition = row.get(2).toString();
  //                                   filter = row.get(3).toString();                             
  //                              )

   
 

【问题讨论】:

我认为您的问题并不完全清楚。您能否在末尾添加几行来准确说明您要查找的内容? 从 all_rules 中选择表名、过滤器、条件 where rulename="IncorrectAge";这里 IncorrectAge 是我的规则名称。我正在使用 3 个属性(表名、过滤器、条件)。对于第一个查询,我只使用一个属性。例如,从 VDP_Vendor_List // 第一列 --tablename 中选择 count()。在第二个查询中,我使用了 2 个属性。例如- 从 VDP_Vendor_List 中选择 count(),其中 gender=Male --tablename 并过滤。 欢迎来到 Stack Overflow!其他用户将您的问题标记为低质量和需要改进。我重新措辞/格式化您的输入,使其更易于阅读/理解。请查看我的更改以确保它们反映您的意图。但我认为你的问题仍然无法回答。 现在应该edit你的问题,添加缺失的细节(见minimal reproducible example)。如果您对我有其他问题或反馈,请随时给我留言。 请:A)永远不要在评论中添加更多信息,他们应该进入问题。 B) 尝试提出一个仅包含与您的问题相关的信息的最小问题。 【参考方案1】:

您可以将参数从 spark-submit 传递到您的应用程序:

bin/spark-submit --class allrules something.jar tablename filter condition

然后,在您的主要功能中,您将拥有您的参数:

def main(args: Array[String]) : Unit = 

   // args(0), args(1) ... there are your params


【讨论】:

【参考方案2】:

您可以像这样将参数传递给您的驱动程序类:

    object DriverClass
    
       val log = Logger.getLogger(getClass.getName)
        def main(args: Array[String]): Unit = 
          val spark = SparkSession.builder().master("yarn").config("spark.sql.warehouse.dir", "path").enableHiveSupport().getOrCreate()
          if (args == null || args.isEmpty || args.length != 2) 
                log.error("Invalid number of arguments passed.")
                log.error("Arguments Usage: <Rule Name> <Rule Type>)
                log.error("Stopping the flow")
                System.exit(1)
            
         import spark.implicits._
         val ruleName: String = String.valueOf(args(0).trim())
         val ruleType: String = String.valueOf(args(1).trim())
         val processSQL: String="Select tablename, filter, condition from all_rules where $ruleName=$ruleType"
         val metadataDF=spark.sql(processSQL)
         val (tblnm,fltr,cndtn) =metadataDF.rdd.map(f=>(f.get(0).toString(),f.get(1).toString(),f.get(2).toString())).collect()(0)
    val finalSql_1="select count(*) from $tblnm"  // first column    
    val finalSql_2="select count(*) from $tblnm" where $fltr"
    val finalSql_3="select * from EMP where $fltr  AND  $cndtn"
    spark.sql(finalSql_1).show()
    spark.sql(finalSql_2).show()
    spark.sql(finalSql_3).show()
    

【讨论】:

以上是关于spark sql - 如何在 spark sql 中编写动态查询的主要内容,如果未能解决你的问题,请参考以下文章

如何在 spark-sql 查询中引用地图列?

如何在 spark dataframes/spark sql 中使用模式读取 json

如何在 Spark SQL 中格式化整数?

如何在 Spark Databricks 中注册 SQL 函数

了解如何在 Spark 中执行 Hive SQL

如何在数据块中使用 Spark sql 连接 Spark 数据框列