无法使用 PySpark 插入 SQL,但可以在 SQL 中使用

Posted

技术标签:

【中文标题】无法使用 PySpark 插入 SQL,但可以在 SQL 中使用【英文标题】:Cannot Insert into SQL using PySpark, but works in SQL 【发布时间】:2020-05-15 23:00:40 【问题描述】:

我使用以下 SQL 在下面创建了一个表:

CREATE TABLE [dbo].[Validation](
    [RuleId] [int] IDENTITY(1,1) NOT NULL,
    [AppId] [varchar](255) NOT NULL,
    [Date] [date] NOT NULL,
    [RuleName] [varchar](255) NOT NULL,
    [Value] [nvarchar](4000) NOT NULL
)

注意身份密钥 (RuleId)

在 SQL 中将值插入到表中时,如下所示:

注意:如果表为空并递增,则不按原样插入主键会自动填充

INSERT INTO dbo.Validation VALUES ('TestApp','2020-05-15','MemoryUsageAnomaly','2300MB')

但是,当在 databricks 上创建临时表并执行下面的相同查询时,在 PySpark 上运行此查询如下:

       %python

        driver = <Driver>
        url = "jdbc:sqlserver:<URL>"
        database = "<db>"
        table = "dbo.Validation"
        user = "<user>"
        password = "<pass>"

        #import the data
        remote_table = spark.read.format("jdbc")\
        .option("driver", driver)\
        .option("url", url)\
        .option("database", database)\
        .option("dbtable", table)\
        .option("user", user)\
        .option("password", password)\
        .load()

        remote_table.createOrReplaceTempView("YOUR_TEMP_VIEW_NAMES")

        sqlcontext.sql("INSERT INTO YOUR_TEMP_VIEW_NAMES VALUES ('TestApp','2020-05-15','MemoryUsageAnomaly','2300MB')")

我收到以下错误:

AnalysisException: '未知要求插入的数据与目标表的列数相同:目标表有5列但插入的数据有4列,包括0个分区列) 具有恒定值。;'

为什么它可以在 SQL 上工作,但在通过数据块传递查询时却不行?如何通过 pyspark 插入而不出现此错误?

【问题讨论】:

@DaleK,我试过 sqlContext.sql("INSERT INTO YOUR_TEMP_VIEW_NAMES (Appid,Date,RuleName,Value) VALUES (1,'2020-05-15','MemoryUsageAnomaly','2300MB') ") 但是我得到一个 Parse Exception: ParseException: "\nmismatched input 'Appid' Expecting '(', 'SELECT', 'FROM', 'DESC', 'VALUES', 'TABLE', 'INSERT', ' DESCRIBE', 'MAP', 'MERGE', 'UPDATE', 'REDUCE'(第 1 行,位置 34)\n\n== SQL ==\nINSERT INTO YOUR_TEMP_VIEW_NAMES(Appid,Date,RuleName,Value)值( 1,'2020-05-15','MemoryUsageAnomaly','2300MB')\n----------------------------- ----^^^\n" 【参考方案1】:

这里最直接的解决方案是使用 Scala 单元中的 JDBC。 EG

%scala

import java.util.Properties
import java.sql.DriverManager

val jdbcUsername = dbutils.secrets.get(scope = "kv", key = "sqluser")
val jdbcPassword = dbutils.secrets.get(scope = "kv", key = "sqlpassword")
val driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver"

// Create the JDBC URL without passing in the user and password parameters.
val jdbcUrl = s"jdbc:sqlserver://xxxx.database.windows.net:1433;database=AdventureWorks;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"

// Create a Properties() object to hold the parameters.

val connectionProperties = new Properties()

connectionProperties.put("user", s"$jdbcUsername")
connectionProperties.put("password", s"$jdbcPassword")
connectionProperties.setProperty("Driver", driverClass)

val connection = DriverManager.getConnection(jdbcUrl, jdbcUsername, jdbcPassword)
val stmt = connection.createStatement()
val sql = "INSERT INTO dbo.Validation VALUES ('TestApp','2020-05-15','MemoryUsageAnomaly','2300MB')"

stmt.execute(sql)
connection.close()

您也可以使用 pyodbc,但默认情况下不安装 SQL Server ODBC 驱动程序,而 JDBC 驱动程序是。

Spark 解决方案是在 SQL Server 中创建一个视图并插入该视图。例如

create view Validation2 as
select AppId,Date,RuleName,Value
from Validation

然后

tableName = "Validation2"
df = spark.read.jdbc(url=jdbcUrl, table=tableName, properties=connectionProperties)
df.createOrReplaceTempView(tableName)
sqlContext.sql("INSERT INTO Validation2 VALUES ('TestApp','2020-05-15','MemoryUsageAnomaly','2300MB')")

如果您想封装 Scala 并从其他语言(如 Python)调用它,您可以使用 scala package cell。

例如

%scala

package example

import java.util.Properties
import java.sql.DriverManager

object JDBCFacade 

  def runStatement(url : String, sql : String, userName : String, password: String): Unit = 
  
    val connection = DriverManager.getConnection(url, userName, password)
    val stmt = connection.createStatement()
    try
    
      stmt.execute(sql)  
    
    finally
    
      connection.close()  
    
  

然后你可以这样称呼它:

jdbcUsername = dbutils.secrets.get(scope = "kv", key = "sqluser")
jdbcPassword = dbutils.secrets.get(scope = "kv", key = "sqlpassword")

jdbcUrl = "jdbc:sqlserver://xxxx.database.windows.net:1433;database=AdventureWorks;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"

sql = "select 1 a into #foo from sys.objects"

sc._jvm.example.JDBCFacade.runStatement(jdbcUrl,sql, jdbcUsername, jdbcPassword)

【讨论】:

因为我在我的 python 中使用了 sqlcontext.sql,所以我不会得到与我上面的评论相同的解析异常吗?解析异常: ParseException: "\nmismatched input 'Appid' 期望 '(', 'SELECT', 'FROM', 'DESC', 'VALUES', 'TABLE', 'INSERT', 'DESCRIBE', 'MAP', 'MERGE', 'UPDATE', 'REDUCE'(第 1 行,第 34 位)\n\n== SQL ==\nINSERT INTO YOUR_TEMP_VIEW_NAMES(Appid,Date,RuleName,Value)VALUES (1,'2020-05- 15','MemoryUsageAnomaly','2300MB')\n---------------------------------^^^ \n" Spark SQL 不支持在 INSERT 中指定目标列。见docs.databricks.com/spark/latest/spark-sql/language-manual/…。在 Scala 示例中,这是 TSQL 而不是 Spark SQL。并且可以指定输入列,或者让 SQL Server 自动忽略 IDENTITY 列。 Browne,有什么建议可以在 python 环境中运行 Scala 吗?我需要在 python 函数中使用 scala。 好问题。是的!我刚刚了解了 Scala 封装单元,它们在这里工作。查看更新。 效果很好。感谢您的帮助!

以上是关于无法使用 PySpark 插入 SQL,但可以在 SQL 中使用的主要内容,如果未能解决你的问题,请参考以下文章