使用 PySpark JDBC 将数据帧写入 Azure SQL 数据库时性能下降
Posted
技术标签:
【中文标题】使用 PySpark JDBC 将数据帧写入 Azure SQL 数据库时性能下降【英文标题】:Slow performance while writing data frame to Azure SQL database using PySpark JDBC 【发布时间】:2020-01-13 09:32:25 【问题描述】:我在PySpark
中使用JDBC URL
下方将data frame
写入Azure SQL Database
。但是,我觉得这个写操作的性能并不达标,可以通过设置一些额外的属性来提高。是否有任何变通方法或任何参数可以添加以提高 JDBC 写入性能?
jdbcUrl = "jdbc:sqlserver://server.database.windows.net:1433;databaseName=test;enablePrepareOnFirstPreparedStatementCall=false"
下面是实际的数据帧写入语句。
data_frame.write \
.mode('overwrite') \
.format('jdbc') \
.option('driver', jdbc_driver) \
.option('user', user) \
.option('password', password) \
.option('url', jdbcUrl) \
.option('dbtable', table + '_STG') \
.save()
【问题讨论】:
尝试通过将动态分配属性指定为 true 来将作业设置为使用全部资源。 【参考方案1】:您可以尝试使用 Spark 到 SQL DB 连接器,在 Scala 中使用批量插入将数据写入 SQL 数据库,请参阅 Azure 官方文档Accelerate real-time big data analytics with Spark connector for Azure SQL Database and SQL Server
的 Write data to Azure SQL database or SQL Server using Bulk Insert
部分,如下图所示。
所以我认为你现在的问题是如何将 Python 中的 PySpark 数据帧 data_frame
传递给 Scala 中的代码。您可以在databricks python notebook中使用表名如temp_table
的数据框的函数registerTempTable
作为代码和下图。
# register a temp table for a dataframe in Python
data_frame.registerTempTable("temp_table")
%scala
val scalaDF = table("temp_table")
然后在%scala
之后在Scala中运行批量插入代码
%scala
import com.microsoft.azure.sqldb.spark.bulkcopy.BulkCopyMetadata
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
/**
Add column Metadata.
If not specified, metadata is automatically added
from the destination table, which may suffer performance.
*/
var bulkCopyMetadata = new BulkCopyMetadata
bulkCopyMetadata.addColumnMetadata(1, "Title", java.sql.Types.NVARCHAR, 128, 0)
bulkCopyMetadata.addColumnMetadata(2, "FirstName", java.sql.Types.NVARCHAR, 50, 0)
bulkCopyMetadata.addColumnMetadata(3, "LastName", java.sql.Types.NVARCHAR, 50, 0)
val bulkCopyConfig = Config(Map(
"url" -> "mysqlserver.database.windows.net",
"databaseName" -> "MyDatabase",
"user" -> "username",
"password" -> "*********",
"dbTable" -> "dbo.Clients",
"bulkCopyBatchSize" -> "2500",
"bulkCopyTableLock" -> "true",
"bulkCopyTimeout" -> "600"
))
scalaDF.bulkCopyToSqlDB(bulkCopyConfig, bulkCopyMetadata)
【讨论】:
【参考方案2】:可以使用 Apache Spark 连接器优化性能:SQL Server 和 Azure SQL -
首先在 Data-bricks 集群中使用 Maven Coordinate 安装 com.microsoft.sqlserver.jdbc.spark 库,然后使用以下代码。
[https://docs.microsoft.com/en-us/sql/connect/spark/connector?view=sql-server-ver15][1]
df.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("overwrite or append") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.option("mssqlIsolationLevel", "READ_UNCOMMITTED")\
.option("batchsize", as per your need)\
.save()
【讨论】:
以上是关于使用 PySpark JDBC 将数据帧写入 Azure SQL 数据库时性能下降的主要内容,如果未能解决你的问题,请参考以下文章
将 Pyspark 数据帧加载到 postgres RDS 中的表中时出错
如何使用 spark.read.jdbc 读取不同 Pyspark 数据帧中的多个文件