spark sql - 如何在 spark sql 中编写动态查询
Posted
技术标签:
【中文标题】spark sql - 如何在 spark sql 中编写动态查询【英文标题】:spark sql - how to write dynamic query in spark sql 【发布时间】:2018-10-08 18:34:21 【问题描述】:我有一张蜂巢桌。我想创建动态火花 SQL 查询。在火花提交时,我指定规则名称。基于规则名称的查询应该生成。在提交火花时,我必须指定规则名称。例如:
sparks-submit <RuleName> IncorrectAge
它应该触发我的 scala 目标代码:
select tablename, filter, condition from all_rules where rulename="IncorrectAge"
我的表:规则(输入表)
|---------------------------------------------------------------------------|
| rowkey| rule_name|rule_run_status| tablename |condition|filter |level|
|--------------------------------------------------------------------------|
| 1 |IncorrectAge| In_Progress | VDP_Vendor_List| age>18 gender=Male|NA|
|---------------------------------------------------------------------------
|2 | Customer_age| In_Progress | Customer_List | age<25 gender=Female|NA|
|----------------------------------------------------------------------------
我获取规则名称:
select tablename, filter, condition from all_rules where rulename="IncorrectAge";
执行此查询后,我得到如下结果:
|----------------------------------------------|
|tablename | filter | condition |
|----------------------------------------------|
|VDP_Vendor_List | gender=Male | age>18 |
|----------------------------------------------|
现在我想动态进行 spark sql 查询
select count(*) from VDP_Vendor_List // first column --tablename
select count(*) from VDP_Vendor_List where gender=Male --tablename and filter
select * from EMP where gender=Male AND age >18 --tablename, filter, condition
我的代码-Spark 2.2 版本代码:
import org.apache.spark.sql. Row, SparkSession
import org.apache.log4j._
object allrules
def main(args: Array[String])
val spark = SparkSession.builder().master("local[*]")
.appName("Spark Hive")
.enableHiveSupport().getOrCreate();
import spark.implicits._
val sampleDF = spark.read.json("C:/software/sampletableCopy.json") // for testing purpose i converted hive table to json data
sampleDF.registerTempTable("sampletable")
val allrulesDF = spark.sql("SELECT * FROM sampletable")
allrulesDF.show()
val TotalCount: Long = allrulesDF.count()
println("==============> Total count ======>" + allrulesDF.count())
val df1 = allrulesDF.select(allrulesDF.col("tablename"),allrulesDF.col("condition"),allrulesDF.col("filter"),allrulesDF.col("rule_name"))
df1.show()
val df2= df1.where(df1.col("rule_name").equalTo("IncorrectAge")).show()
println(df2)
// var table_name = ""
// var condition =""
// var filter = "";
// df1.foreach(row=>
// table_name = row.get(1).toString();
// condition = row.get(2).toString();
// filter = row.get(3).toString();
// )
【问题讨论】:
我认为您的问题并不完全清楚。您能否在末尾添加几行来准确说明您要查找的内容? 从 all_rules 中选择表名、过滤器、条件 where rulename="IncorrectAge";这里 IncorrectAge 是我的规则名称。我正在使用 3 个属性(表名、过滤器、条件)。对于第一个查询,我只使用一个属性。例如,从 VDP_Vendor_List // 第一列 --tablename 中选择 count()。在第二个查询中,我使用了 2 个属性。例如- 从 VDP_Vendor_List 中选择 count(),其中 gender=Male --tablename 并过滤。 欢迎来到 Stack Overflow!其他用户将您的问题标记为低质量和需要改进。我重新措辞/格式化您的输入,使其更易于阅读/理解。请查看我的更改以确保它们反映您的意图。但我认为你的问题仍然无法回答。 你现在应该edit你的问题,添加缺失的细节(见minimal reproducible example)。如果您对我有其他问题或反馈,请随时给我留言。 请:A)永远不要在评论中添加更多信息,他们应该进入问题。 B) 尝试提出一个仅包含与您的问题相关的信息的最小问题。 【参考方案1】:您可以将参数从 spark-submit 传递到您的应用程序:
bin/spark-submit --class allrules something.jar tablename filter condition
然后,在您的主要功能中,您将拥有您的参数:
def main(args: Array[String]) : Unit =
// args(0), args(1) ... there are your params
【讨论】:
【参考方案2】:您可以像这样将参数传递给您的驱动程序类:
object DriverClass
val log = Logger.getLogger(getClass.getName)
def main(args: Array[String]): Unit =
val spark = SparkSession.builder().master("yarn").config("spark.sql.warehouse.dir", "path").enableHiveSupport().getOrCreate()
if (args == null || args.isEmpty || args.length != 2)
log.error("Invalid number of arguments passed.")
log.error("Arguments Usage: <Rule Name> <Rule Type>)
log.error("Stopping the flow")
System.exit(1)
import spark.implicits._
val ruleName: String = String.valueOf(args(0).trim())
val ruleType: String = String.valueOf(args(1).trim())
val processSQL: String="Select tablename, filter, condition from all_rules where $ruleName=$ruleType"
val metadataDF=spark.sql(processSQL)
val (tblnm,fltr,cndtn) =metadataDF.rdd.map(f=>(f.get(0).toString(),f.get(1).toString(),f.get(2).toString())).collect()(0)
val finalSql_1="select count(*) from $tblnm" // first column
val finalSql_2="select count(*) from $tblnm" where $fltr"
val finalSql_3="select * from EMP where $fltr AND $cndtn"
spark.sql(finalSql_1).show()
spark.sql(finalSql_2).show()
spark.sql(finalSql_3).show()
【讨论】:
以上是关于spark sql - 如何在 spark sql 中编写动态查询的主要内容,如果未能解决你的问题,请参考以下文章
如何在 spark dataframes/spark sql 中使用模式读取 json