从 Python 连接和测试 JDBC 驱动程序
Posted
技术标签:
【中文标题】从 Python 连接和测试 JDBC 驱动程序【英文标题】:Connecting and testing a JDBC driver from Python 【发布时间】:2018-03-03 08:49:12 【问题描述】:我正在尝试使用 Python 对我们的 JDBC 驱动程序进行一些测试。
最初弄清楚 JPype,我最终设法连接驱动程序并像这样执行选择查询(再现一个通用的 sn-p):
from __future__ import print_function
from jpype import *
#Start JVM, attach the driver jar
jvmpath = 'path/to/libjvm.so'
classpath = 'path/to/JDBC_Driver.jar'
startJVM(jvmpath, '-ea', '-Djava.class.path=' + classpath)
# Magic line 1
driver = JPackage('sql').Our_Driver
# Initiating a connection via DriverManager()
jdbc_uri = 'jdbc:our_database://localhost:port/database','user', 'passwd')
conn = java.sql.DriverManager.getConnection(jdbc_uri)
# Executing a statement
stmt = conn.createStatement()
rs = stmt.executeQuery ('select top 10 * from some_table')
# Extracting results
while rs.next():
''' Magic #2 - rs.getStuff() only works inside a while loop '''
print (rs.getString('col_name'))
但是,我未能批量插入,这是我想要测试的。即使executeBatch()
返回一个 jpype int[],这应该表明插入成功,表也没有更新。
然后我决定尝试 py4j。
我的困境 - 我很难弄清楚如何做与上述相同的事情。据说py4j不会自己启动JVM,Java代码需要预先安排一个GatewayServer(),所以我不确定它是否可行。
另一方面,有一个名为py4jdbc 的库就是这样做的。
我修改了 dbapi.py 代码,但不太了解流程,而且几乎卡住了。
如果有人了解如何使用 py4j 从 .jar 文件加载 JDBC 驱动程序并能指出正确的方向,我将不胜感激。
【问题讨论】:
【参考方案1】:在添加记录之后和检索之前添加一个提交。
conn.commit()
【讨论】:
好吧,嘘**..谢谢。但是,返回 int[] 是罕见且随机的,大多数情况下我不会在 executeBatch() 之后获得返回值,即使 setInt()、setString()、addBatch() 都通过了。 默认情况下,JDBC 驱动程序必须处于自动提交模式。在这种情况下调用commit()
应该会引发异常。
这里有一个用于自动提交的字符串:java.sql.Connection.setAutocommit()。此外,像 sqlalchemy 这样的东西可能会有用。
docs.sqlalchemy.org/en/latest/core/tutorial.html 在“选择”标题下。展示如何使用他们的模块来查询 Python 原生的数据库,并避免将字符串作为查询发送。我自己只是学习这个模块。
docs.sqlalchemy.org/en/rel_1_0/orm/… - 如果你有兴趣的话。【参考方案2】:
我在airflow中遇到过类似的问题,我用teradata jdbc jars和jaydebeapi连接teradata数据库并执行sql:
[root@myhost transfer]# cat test_conn.py
import jaydebeapi
from contextlib import closing
jclassname='com.teradata.jdbc.TeraDriver'
jdbc_driver_loc = '/opt/spark-2.3.1/jars/terajdbc4-16.20.00.06.jar,/opt/spark-2.3.1/jars/tdgssconfig-16.20.00.06.jar'
jdbc_driver_name = 'com.teradata.jdbc.TeraDriver'
host='my_teradata.address'
url='jdbc:teradata://' + host + '/TMODE=TERA'
login="teradata_user_name"
psw="teradata_passwd"
sql = "SELECT COUNT(*) FROM A_TERADATA_TABLE_NAME where month_key='202009'"
conn = jaydebeapi.connect(jclassname=jdbc_driver_name,
url=url,
driver_args=[login, psw],
jars=jdbc_driver_loc.split(","))
with closing(conn) as conn:
with closing(conn.cursor()) as cur:
cur.execute(sql)
print(cur.fetchall())
[root@myhost transfer]# python test_conn.py
[(7734133,)]
[root@myhost transfer]#
【讨论】:
【参考方案3】:在 py4j 中,使用您各自的 JDBC uri:
from py4j.JavaGateway import java_gateway
# Open JVM interface with the JDBC Jar
jdbc_jar_path = '/path/to/jdbc_driver.jar'
gateway = java_gateway(classpath=jdbc_jar_path)
# Load the JDBC Jar
jdbc_class = "com.vendor.VendorJDBC"
gateway.jvm.class.forName(jdbc_class)
# Initiate connection
jdbc_uri = "jdbc://vendor:192.168.x.y:zzzz;..."
con = gateway.jvm.DriverManager.getConnection(jdbc_uri)
# Run a query
sql = "select this from that"
stmt = con.createStatement(sql)
rs = stmt.executeQuery()
while rs.next():
rs.getInt(1)
rs.getFloat(2)
.
.
rs.close()
stmt.close()
【讨论】:
以上是关于从 Python 连接和测试 JDBC 驱动程序的主要内容,如果未能解决你的问题,请参考以下文章