如何在 Azure Databricks PySpark 中执行存储过程?

Posted

技术标签:

【中文标题】如何在 Azure Databricks PySpark 中执行存储过程?【英文标题】:How to execute a stored procedure in Azure Databricks PySpark? 【发布时间】:2020-06-06 19:28:45 【问题描述】:

我能够在 Azure Databricks 中使用 PySpark 执行简单的 SQL 语句,但我想改为执行存储过程。下面是我试过的 PySpark 代码。

#initialize pyspark
import findspark
findspark.init('C:\Spark\spark-2.4.5-bin-hadoop2.7')
#import required modules
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import *
import pandas as pd

#Create spark configuration object
conf = SparkConf()
conf.setMaster("local").setAppName("My app")
#Create spark context and sparksession
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)

table = "dbo.test"
#read table data into a spark dataframe
jdbcDF = spark.read.format("jdbc") \
    .option("url", f"jdbc:sqlserver://localhost:1433;databaseName=Demo;integratedSecurity=true;") \
    .option("dbtable", table) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .load()

#show the data loaded into dataframe
#jdbcDF.show()
sqlQueries="execute testJoin"
resultDF=spark.sql(sqlQueries)
resultDF.show(resultDF.count(),False)

这不起作用——我该怎么做?

【问题讨论】:

spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html 的文档(来自pyspark-sql 标签页)没有提到存储过程。我怀疑你必须更加努力地探索。我还没有检查当前 1.6.2 的情况。 【参考方案1】:

目前不支持通过 JDBC 连接从 azure databricks 运行存储过程。但您的选择是:

    使用pyodbc 库连接并执行您的过程。但是通过使用这个库,这意味着您将在所有工作人员空闲时在驱动程序节点上运行您的代码。有关详细信息,请参阅本文。 https://datathirst.net/blog/2018/10/12/executing-sql-server-stored-procedures-on-databricks-pyspark

    使用SQL 表函数而不是过程。从某种意义上说,您可以使用可以在 SQL 查询的FORM 子句中使用的任何内容。

    由于您处于 azure 环境中,因此结合使用 azure 数据工厂(执行您的过程)和 azure databricks 可以帮助您构建非常强大的管道。

【讨论】:

@BIcube-感谢您的回复。如果我从 Databricks 执行 SQL select 语句,它会比在数据库服务器上直接执行更快吗?换句话说,在数据库中直接执行 select/insert 语句或使用 databricks sparks 节点哪个更好?【参考方案2】:

如果有人仍在寻找如何做到这一点的方法,可以使用 spark session 的内置 jdbc-connector。以下代码示例可以解决问题:

import msal

# Set url & credentials
jdbc_url = ...
tenant_id = ...
sp_client_id = ...
sp_client_secret = ...

# Write your SQL statement as a string
name = "Some passed value"

statement = f"""
EXEC Staging.SPR_InsertDummy
  @Name = 'name'
"""

# Generate an OAuth2 access token for service principal
authority = f"https://login.windows.net/tenant_id"
app = msal.ConfidentialClientApplication(sp_client_id, sp_client_secret, authority)
token = app.acquire_token_for_client(scopes="https://database.windows.net/.default")["access_token"]

# Create a spark properties object and pass the access token
properties = spark._sc._gateway.jvm.java.util.Properties()
properties.setProperty("accessToken", token)

# Fetch the driver manager from your spark context
driver_manager = spark._sc._gateway.jvm.java.sql.DriverManager

# Create a connection object and pass the properties object
con = driver_manager.getConnection(jdbc_url, properties)

# Create callable statement and execute it
exec_statement = con.prepareCall(statement)
exec_statement.execute()

# Close connections
exec_statement.close()
con.close()

有关更多信息和使用 SQL 用户凭据通过 JDBC 连接的类似方法,或有关如何获取返回参数的信息,我建议您查看这篇博文:

https://medium.com/delaware-pro/executing-ddl-statements-stored-procedures-on-sql-server-using-pyspark-in-databricks-2b31d9276811

【讨论】:

以上是关于如何在 Azure Databricks PySpark 中执行存储过程?的主要内容,如果未能解决你的问题,请参考以下文章

如何获取 Azure Databricks 笔记本运行详细信息

如何在 Python 中从 Azure Databricks 插入 Azure SQL 数据库

如何在 Azure 数据工厂的 Databricks 上运行 .Net spark 作业?

如何使用 Azure Synapse 在 Databricks 上删除表或删除行?

如何在 Azure Databricks PySpark 中执行存储过程?

如何在 Databricks 上将 Azure Synapse Dataframe 转换为 JSON?