无法使用 PySpark 插入 SQL,但可以在 SQL 中使用

Posted

技术标签:

【中文标题】无法使用 PySpark 插入 SQL,但可以在 SQL 中使用【英文标题】:Cannot Insert into SQL using PySpark, but works in SQL 【发布时间】:2020-05-15 23:00:40 【问题描述】:

我使用以下 SQL 在下面创建了一个表:

CREATE TABLE [dbo].[Validation](
    [RuleId] [int] IDENTITY(1,1) NOT NULL,
    [AppId] [varchar](255) NOT NULL,
    [Date] [date] NOT NULL,
    [RuleName] [varchar](255) NOT NULL,
    [Value] [nvarchar](4000) NOT NULL
)

注意身份密钥 (RuleId)

在 SQL 中将值插入到表中时,如下所示:

注意:如果表为空并递增,则不按原样插入主键会自动填充

INSERT INTO dbo.Validation VALUES ('TestApp','2020-05-15','MemoryUsageAnomaly','2300MB')

但是,当在 databricks 上创建临时表并执行下面的相同查询时,在 PySpark 上运行此查询如下:

       %python

        driver = <Driver>
        url = "jdbc:sqlserver:<URL>"
        database = "<db>"
        table = "dbo.Validation"
        user = "<user>"
        password = "<pass>"

        #import the data
        remote_table = spark.read.format("jdbc")\
        .option("driver", driver)\
        .option("url", url)\
        .option("database", database)\
        .option("dbtable", table)\
        .option("user", user)\
        .option("password", password)\
        .load()

        remote_table.createOrReplaceTempView("YOUR_TEMP_VIEW_NAMES")

        sqlcontext.sql("INSERT INTO YOUR_TEMP_VIEW_NAMES VALUES ('TestApp','2020-05-15','MemoryUsageAnomaly','2300MB')")

我收到以下错误:

AnalysisException: '未知要求插入的数据与目标表的列数相同:目标表有5列但插入的数据有4列,包括0个分区列) 具有恒定值。;'

为什么它可以在 SQL 上工作,但在通过数据块传递查询时却不行?如何通过 pyspark 插入而不出现此错误?

【问题讨论】:

@DaleK,我试过 sqlContext.sql("INSERT INTO YOUR_TEMP_VIEW_NAMES (Appid,Date,RuleName,Value) VALUES (1,'2020-05-15','MemoryUsageAnomaly','2300MB') ") 但是我得到一个 Parse Exception: ParseException: "\nmismatched input 'Appid' Expecting '(', 'SELECT', 'FROM', 'DESC', 'VALUES', 'TABLE', 'INSERT', ' DESCRIBE', 'MAP', 'MERGE', 'UPDATE', 'REDUCE'(第 1 行,位置 34)\n\n== SQL ==\nINSERT INTO YOUR_TEMP_VIEW_NAMES(Appid,Date,RuleName,Value)值( 1,'2020-05-15','MemoryUsageAnomaly','2300MB')\n----------------------------- ----^^^\n" 【参考方案1】:

这里最直接的解决方案是使用 Scala 单元中的 JDBC。 EG

%scala

import java.util.Properties
import java.sql.DriverManager

val jdbcUsername = dbutils.secrets.get(scope = "kv", key = "sqluser")
val jdbcPassword = dbutils.secrets.get(scope = "kv", key = "sqlpassword")
val driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver"

// Create the JDBC URL without passing in the user and password parameters.
val jdbcUrl = s"jdbc:sqlserver://xxxx.database.windows.net:1433;database=AdventureWorks;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"

// Create a Properties() object to hold the parameters.

val connectionProperties = new Properties()

connectionProperties.put("user", s"$jdbcUsername")
connectionProperties.put("password", s"$jdbcPassword")
connectionProperties.setProperty("Driver", driverClass)

val connection = DriverManager.getConnection(jdbcUrl, jdbcUsername, jdbcPassword)
val stmt = connection.createStatement()
val sql = "INSERT INTO dbo.Validation VALUES ('TestApp','2020-05-15','MemoryUsageAnomaly','2300MB')"

stmt.execute(sql)
connection.close()

您也可以使用 pyodbc,但默认情况下不安装 SQL Server ODBC 驱动程序,而 JDBC 驱动程序是。

Spark 解决方案是在 SQL Server 中创建一个视图并插入该视图。例如

create view Validation2 as
select AppId,Date,RuleName,Value
from Validation

然后

tableName = "Validation2"
df = spark.read.jdbc(url=jdbcUrl, table=tableName, properties=connectionProperties)
df.createOrReplaceTempView(tableName)
sqlContext.sql("INSERT INTO Validation2 VALUES ('TestApp','2020-05-15','MemoryUsageAnomaly','2300MB')")

如果您想封装 Scala 并从其他语言(如 Python)调用它,您可以使用 scala package cell。

例如

%scala

package example

import java.util.Properties
import java.sql.DriverManager

object JDBCFacade 

  def runStatement(url : String, sql : String, userName : String, password: String): Unit = 
  
    val connection = DriverManager.getConnection(url, userName, password)
    val stmt = connection.createStatement()
    try
    
      stmt.execute(sql)  
    
    finally
    
      connection.close()  
    
  

然后你可以这样称呼它:

jdbcUsername = dbutils.secrets.get(scope = "kv", key = "sqluser")
jdbcPassword = dbutils.secrets.get(scope = "kv", key = "sqlpassword")

jdbcUrl = "jdbc:sqlserver://xxxx.database.windows.net:1433;database=AdventureWorks;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"

sql = "select 1 a into #foo from sys.objects"

sc._jvm.example.JDBCFacade.runStatement(jdbcUrl,sql, jdbcUsername, jdbcPassword)

【讨论】:

因为我在我的 python 中使用了 sqlcontext.sql,所以我不会得到与我上面的评论相同的解析异常吗?解析异常: ParseException: "\nmismatched input 'Appid' 期望 '(', 'SELECT', 'FROM', 'DESC', 'VALUES', 'TABLE', 'INSERT', 'DESCRIBE', 'MAP', 'MERGE', 'UPDATE', 'REDUCE'(第 1 行,第 34 位)\n\n== SQL ==\nINSERT INTO YOUR_TEMP_VIEW_NAMES(Appid,Date,RuleName,Value)VALUES (1,'2020-05- 15','MemoryUsageAnomaly','2300MB')\n---------------------------------^^^ \n" Spark SQL 不支持在 INSERT 中指定目标列。见docs.databricks.com/spark/latest/spark-sql/language-manual/…。在 Scala 示例中,这是 TSQL 而不是 Spark SQL。并且可以指定输入列,或者让 SQL Server 自动忽略 IDENTITY 列。 Browne,有什么建议可以在 python 环境中运行 Scala 吗?我需要在 python 函数中使用 scala。 好问题。是的!我刚刚了解了 Scala 封装单元,它们在这里工作。查看更新。 效果很好。感谢您的帮助!

以上是关于无法使用 PySpark 插入 SQL,但可以在 SQL 中使用的主要内容,如果未能解决你的问题,请参考以下文章

如何解析sql语句插入以使用pyspark获取值

如何在 Pyspark 的动态列列表中转义列名

在 PySpark 中使用拆分功能

通过pyspark更新hive中的插入数据

如何在 pyspark 中使用“不存在”的 SQL 条件?

无法在Pyspark中使用更新插入字符串到Delta表。