使用 PySpark JDBC 将数据帧写入 Azure SQL 数据库时性能下降

Posted

技术标签:

【中文标题】使用 PySpark JDBC 将数据帧写入 Azure SQL 数据库时性能下降【英文标题】:Slow performance while writing data frame to Azure SQL database using PySpark JDBC 【发布时间】:2020-01-13 09:32:25 【问题描述】:

我在PySpark 中使用JDBC URL 下方将data frame 写入Azure SQL Database。但是,我觉得这个写操作的性能并不达标,可以通过设置一些额外的属性来提高。是否有任何变通方法或任何参数可以添加以提高 JDBC 写入性能?

jdbcUrl = "jdbc:sqlserver://server.database.windows.net:1433;databaseName=test;enablePrepareOnFirstPreparedStatementCall=false"

下面是实际的数据帧写入语句。

  data_frame.write \
            .mode('overwrite') \
            .format('jdbc') \
            .option('driver', jdbc_driver) \
            .option('user', user) \
            .option('password', password) \
            .option('url', jdbcUrl) \
            .option('dbtable', table + '_STG') \
            .save()

【问题讨论】:

尝试通过将动态分配属性指定为 true 来将作业设置为使用全部资源。 【参考方案1】:

您可以尝试使用 Spark 到 SQL DB 连接器,在 Scala 中使用批量插入将数据写入 SQL 数据库,请参阅 Azure 官方文档Accelerate real-time big data analytics with Spark connector for Azure SQL Database and SQL ServerWrite data to Azure SQL database or SQL Server using Bulk Insert 部分,如下图所示。

所以我认为你现在的问题是如何将 Python 中的 PySpark 数据帧 data_frame 传递给 Scala 中的代码。您可以在databricks python notebook中使用表名如temp_table的数据框的函数registerTempTable作为代码和下图。

# register a temp table for a dataframe in Python
data_frame.registerTempTable("temp_table")

%scala
val scalaDF = table("temp_table")

然后在%scala之后在Scala中运行批量插入代码

%scala
import com.microsoft.azure.sqldb.spark.bulkcopy.BulkCopyMetadata
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

/** 
  Add column Metadata.
  If not specified, metadata is automatically added
  from the destination table, which may suffer performance.
*/
var bulkCopyMetadata = new BulkCopyMetadata
bulkCopyMetadata.addColumnMetadata(1, "Title", java.sql.Types.NVARCHAR, 128, 0)
bulkCopyMetadata.addColumnMetadata(2, "FirstName", java.sql.Types.NVARCHAR, 50, 0)
bulkCopyMetadata.addColumnMetadata(3, "LastName", java.sql.Types.NVARCHAR, 50, 0)

val bulkCopyConfig = Config(Map(
  "url"               -> "mysqlserver.database.windows.net",
  "databaseName"      -> "MyDatabase",
  "user"              -> "username",
  "password"          -> "*********",
  "dbTable"           -> "dbo.Clients",
  "bulkCopyBatchSize" -> "2500",
  "bulkCopyTableLock" -> "true",
  "bulkCopyTimeout"   -> "600"
))

scalaDF.bulkCopyToSqlDB(bulkCopyConfig, bulkCopyMetadata)

【讨论】:

【参考方案2】:

可以使用 Apache Spark 连接器优化性能:SQL Server 和 Azure SQL -

首先在 Data-bricks 集群中使用 Maven Coordinate 安装 com.microsoft.sqlserver.jdbc.spark 库,然后使用以下代码。

[https://docs.microsoft.com/en-us/sql/connect/spark/connector?view=sql-server-ver15][1]

df.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("overwrite or append") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.option("mssqlIsolationLevel", "READ_UNCOMMITTED")\
.option("batchsize", as per your need)\
.save()

【讨论】:

以上是关于使用 PySpark JDBC 将数据帧写入 Azure SQL 数据库时性能下降的主要内容,如果未能解决你的问题,请参考以下文章

将 Pyspark 数据帧加载到 postgres RDS 中的表中时出错

使用 pyspark 将数据帧写入 Kafka 时出现异常

Pyspark 将数据帧结果写入文本文件

如何使用 spark.read.jdbc 读取不同 Pyspark 数据帧中的多个文件

Pyspark - 将数据帧写入 2 个不同的 csv 文件

将 PySpark 数据帧写入分区 Hive 表