spark-sql 中的更新语句

Posted

技术标签:

【中文标题】spark-sql 中的更新语句【英文标题】:update statement in spark-sql 【发布时间】:2018-08-09 15:48:22 【问题描述】:

有没有办法使用 spark-sql(使用 scala 语言)在 sql server 表上执行更新语句?

我需要执行以下查询:

update  MyLog_table
set     Log_FileQueue = xx,
        Log_TotalLine = xx
where   Log_ID = xxx

我尝试了以下语法:

 val jdbcUrl = s"jdbc:sqlserver://$jdbcHostname:$jdbcPort;database=$jdbcDatabase"
    val Log_FileIn = spark.read.jdbc(jdbcUrl, s"(select Log_FileIn from log Where   Log_ID = '$Process1Log_ID' ) as sq", connectionProperties)
    val newLog_FileIn = Log_FileIn.collectAsList().toString().replace("[", "").replace("]", "")

 spark.sql(s"(select '$newLog_FileIn' as Log_FileQueue, $NbLine as Log_TotalLine where Log_ID = '$newLog_id')")
  .write
  .mode(SaveMode.Append)
  .jdbc(jdbcUrl, "Log", connectionProperties)

但它会产生以下错误:

org.apache.spark.sql.AnalysisException: cannot resolve '`Log_ID`' given input columns: []; line 1 pos 115;
'Project [test_141001.csv AS Log_FileQueue#290, 5 AS Log_TotalLine#29

我也尝试使用“where”方法:

spark.sql(s"(select '$newLog_FileIn' as Log_FileQueue, $NbLine as Log_TotalLine where Log_ID = '$newLog_id')")
  .where(s"Log_ID = '$newLog_id'")
  .write
  .mode(SaveMode.Append)
  .jdbc(jdbcUrl, "Log", connectionProperties)

但它也不起作用。我收到以下错误:

org.apache.spark.sql.AnalysisException: cannot resolve '`Log_ID`' given input columns: [Log_FileQueue, Log_TotalLine]; line 1 pos 0;
'Filter ('Log_ID = 157456)
+- AnalysisBarrier
      +- Project [ANNONCE-FNAC-VIGICOLIS-GRX-BIZ-2018hfgr071eyzdtrf2_141001.csv AS Log_FileQueue#290, 5 AS Log_TotalLine#291]

任何帮助将不胜感激

【问题讨论】:

【参考方案1】:

不是它的工作原理。尝试 executeBatch。

【讨论】:

感谢蓝色幻影;我找到了一个解决方案: import com.microsoft.azure.sqldb.spark.config.Config import com.microsoft.azure.sqldb.spark.query._ val query = s""" |UPDATE Log |SET Log_FileQueue = '$newLog_FileIn', Log_TotalLine= $NbLine |WHERE Log_ID = 157457; """.stripMargin val config = Config(Map("url" -> "localhost", "databaseName" -> jdbcDatabase, "user" -> jdbcUsername, "password" -> jdbcPassword, "queryCustom" -> 查询)) sqlContext.sqlDBQuery(config) 那么这是一种 AZURE 方法吗? 是的。这是一种与 azure DB 或 MSSQL DB 交互的方式 有兴趣知道以备将来参考。我在 SPARK 意义上给出的答案也是正确的。

以上是关于spark-sql 中的更新语句的主要内容,如果未能解决你的问题,请参考以下文章

Spark-sql 中的 NullPointerException

spark-sql 与 spark-shell REPL 中的 Spark SQL 性能差异

控制 spark-sql 和数据帧中的字段可空性

Spark-sql 数据砖中的变量动态分配值

spark-sql/Scala 中的反透视列名是数字

Spark-SQL之DataFrame操作大全