如何在查询中调用 oracle 存储过程?
Posted
技术标签:
【中文标题】如何在查询中调用 oracle 存储过程?【英文标题】:How to call oracle stored procedure in queries? 【发布时间】:2019-12-23 12:43:56 【问题描述】:在我的 spark 项目中,我使用的是 spark-sql-2.4.1v。
作为我代码的一部分,我需要在我的 spark 作业中调用 oracle 存储过程。
我们正在将一个旧项目转换为具有大量基于 oracle 存储过程的逻辑的 spark。我们正在转换为 spark 的中间件逻辑......所以希望保持 procs 逻辑原样,因为有其他应用程序使用它们......因此需要在 spark 代码中调用现有的 procs。
如何调用oracle存储过程?
【问题讨论】:
为什么需要调用oracle存储过程?为什么不创建一个视图或表来执行此操作,以便 Spark 不必处理特定于 oracle 的功能? 【参考方案1】:python 中的 cx_Oracle 模块可用于从 python / pyspark 脚本调用 oracle 存储过程。 文档在这里 - https://cx-oracle.readthedocs.io/en/latest/user_guide/plsql_execution.html
如果由于任何原因,cx_Oracle 在 hadoop 环境中无法工作(因为它需要安装 oracle 客户端二进制文件),我们可以使用下面的 Spark JDBC 方法。
在 PySpark JDBC 选项中,有一个名为 sessionInitStatement 的属性,可用于在 JDBC 进程开始读取数据之前执行自定义 SQL 语句或 PL/SQL 块。 spark JDBC read 中的此选项可用于调用存储过程,如下所示。
这里我们首先使用 sessionInitStatement 执行 PL/SQL 过程,然后使用 spark/jdbc read 从存储过程中读取最终数据集。
from pyspark.sql import SparkSession, HiveContext
spark = (SparkSession.builder.enableHiveSupport().getOrCreate())
# Provide PL/SQL code here - call the stored proc within BEGIN and END block.
plsql_block = """
BEGIN
SCHEMA.STORED_PROC_NAME;
END;
"""
# Read the final table that is created / updated within the stored proc.
count_query = """
(
select count(*) from SCHMA.TABLE_NAME
) t1
"""
df = spark.read \
.format("jdbc") \
.option("url", "JDBC_URL") \
.option('driver', 'oracle.jdbc.driver.OracleDriver') \
.option("oracle.jdbc.timezoneAsRegion", "false") \
.option("sessionInitStatement", plsql_block) \
.option("dbtable", count_query) \
.option("user", "USER_ID") \
.option("password", "PASSWORD") \
.load()
print("Total Records")
df.show(10, False)
【讨论】:
【参考方案2】:Spark SQL 可用于读取/写入 Oracle 表,换句话说,您可以执行 select
、insert
和 delete
。您可以通过delete+insert
进行更新。您也可以调用function
作为 SQL 的一部分。但我认为您不能使用 Spark SQL 调用 stored procedure
,但您可以使用普通的旧 java/scala/python 语法来调用。我使用的策略是使用 Spark SQL 填充表,然后使用标准 JDBC 连接和 java 代码基于该表运行stored procedure
。驱动程序将在单个线程中执行此存储过程,显然您不会期望这里有可伸缩性。
spark.write.mode(SaveMode.append).jdbc(jdbcURL, tableName, jdbcProperties)
val con = DriverManager.getConnection(jdbcURL)
//execute the stored procedure using JDBC connection
con.close()
【讨论】:
【参考方案3】:你可以尝试做这样的事情,虽然我从来没有在任何实现中亲自尝试过
query = "exec SP_NAME"
empDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:oracle:thin:username/password@//hostname:portnumber/SID") \
.option("dbtable", query) \
.option("user", "db_user_name") \
.option("password", "password") \
.option("driver", "oracle.jdbc.driver.OracleDriver") \
.load()
【讨论】:
我想知道这个答案如何帮助调用 Oracle 中的存储过程?!以上是关于如何在查询中调用 oracle 存储过程?的主要内容,如果未能解决你的问题,请参考以下文章
oracle中怎么执行带有输出参数的存储过程,在程序中我知道怎么调用,