使用 Pyspark 将 SQL 查询从 DataBricks 发送到 SQL Server [重复]

Posted

技术标签:

【中文标题】使用 Pyspark 将 SQL 查询从 DataBricks 发送到 SQL Server [重复]【英文标题】:Send SQL queries from DataBricks to a SQL Server using Pyspark [duplicate] 【发布时间】:2020-09-20 18:51:47 【问题描述】:

将自定义 SQL 查询发送到 Python 上的 SQL 数据库非常简单。

connection = mysql.connector.connect(host='localhost',
                                     database='Electronics',
                                     user='pynative',
                                     password='pynative@#29')

sql_select_Query = "select * from Laptop" #any custom sql statement not particularly select statement
cursor = connection.cursor()
cursor.execute(sql_select_Query)
records = cursor.fetchall()

但是,我在互联网上搜索了 Databricks 上的类似任务,但没有找到任何解决方案。值得一提的是,我可以使用 JDBC 读取和写入 SQL Server 数据库,但我想发送一个自定义 SQL 语句,例如我想在 SQL Server 数据库中执行的“批量插入”语句。

这是我使用 JDBC 从 SQL Server 读取数据的方法。

table_name="dbo.myTable"
spark.read.jdbc(url=jdbcUrl, table=table_name, properties=connectionProperties)

【问题讨论】:

这个答案对你有帮助吗? 【参考方案1】:

请参考此文档:SQL Databases using JDBC:

Databricks Runtime 包含适用于 Microsoft SQL Server 和 Azure SQL 数据库的 JDBC 驱动程序。有关 Databricks 运行时中包含的 JDBC 库的完整列表,请参阅 Databricks 运行时发行说明。

本文介绍如何使用 DataFrame API 连接 SQL 使用 JDBC 的数据库以及如何控制读取的并行性 通过 JDBC 接口。本文提供了详细的示例 使用 Scala API,带有缩写的 Python 和 Spark SQL 示例 在末尾。对于连接到 SQL 的所有受支持的参数 使用 JDBC 的数据库,请参阅 JDBC To Other Databases。

Python 示例:

jdbcHostname = "<hostname>"
jdbcDatabase = "employees"
jdbcPort = 1433
jdbcUrl = "jdbc:sqlserver://0:1;database=2;user=3;password=4".format(jdbcHostname, jdbcPort, jdbcDatabase, username, password)


pushdown_query = "(select * from employees where emp_no < 10008) emp_alias"
df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
display(df)

但传统的 jdbc 连接器使用逐行插入将数据写入数据库。可以使用 Spark 连接器通过批量插入将数据写入 Azure SQL 和 SQL Server。在加载大型数据集或将数据加载到使用列存储索引的表中时,它显着提高了写入性能。

import com.microsoft.azure.sqldb.spark.bulkcopy.BulkCopyMetadata
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

/**
  Add column Metadata.
  If not specified, metadata is automatically added
  from the destination table, which may suffer performance.
*/
var bulkCopyMetadata = new BulkCopyMetadata
bulkCopyMetadata.addColumnMetadata(1, "Title", java.sql.Types.NVARCHAR, 128, 0)
bulkCopyMetadata.addColumnMetadata(2, "FirstName", java.sql.Types.NVARCHAR, 50, 0)
bulkCopyMetadata.addColumnMetadata(3, "LastName", java.sql.Types.NVARCHAR, 50, 0)

val bulkCopyConfig = Config(Map(
  "url"               -> "mysqlserver.database.windows.net",
  "databaseName"      -> "MyDatabase",
  "user"              -> "username",
  "password"          -> "*********",
  "dbTable"           -> "dbo.Clients",
  "bulkCopyBatchSize" -> "2500",
  "bulkCopyTableLock" -> "true",
  "bulkCopyTimeout"   -> "600"
))

df.bulkCopyToSqlDB(bulkCopyConfig, bulkCopyMetadata)
//df.bulkCopyToSqlDB(bulkCopyConfig) if no metadata is specified.

参考:Use Spark Connector

HTH。

【讨论】:

第一个Python代码sn-p,你是怎么定义connectionProperties的?我尝试像 scala 示例提供的那样导入 java.util.Properties,但它失败了。

以上是关于使用 Pyspark 将 SQL 查询从 DataBricks 发送到 SQL Server [重复]的主要内容,如果未能解决你的问题,请参考以下文章

将 SQL 连接查询转换为 pyspark 语法

PYSPARK:如何将带有多个 case 语句的 SQL 查询转换为 Pyspark/Pyspark-SQL?

在 pyspark 中加载 SQL 查询?

Pyspark:将 sql 查询转换为 pyspark?

无法从 databricks pyspark 工作人员写入 Azure Sql DataWarehouse

将 sql server jar 添加到 pyspark 的类路径后无法查询 hive