从 Spark/pyspark 连接到 PostgreSQL
Posted
技术标签:
【中文标题】从 Spark/pyspark 连接到 PostgreSQL【英文标题】:Connecting from Spark/pyspark to PostgreSQL 【发布时间】:2015-06-19 14:22:36 【问题描述】:我在一台 Windows 机器上安装了 Spark,并希望通过 Spyder 使用它。经过一些故障排除后,基础似乎工作:
import os
os.environ["SPARK_HOME"] = "D:\Analytics\Spark\spark-1.4.0-bin-hadoop2.6"
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
spark_config = SparkConf().setMaster("local[8]")
sc = SparkContext(conf=spark_config)
sqlContext = SQLContext(sc)
textFile = sc.textFile("D:\\Analytics\\Spark\\spark-1.4.0-bin-hadoop2.6\\README.md")
textFile.count()
textFile.filter(lambda line: "Spark" in line).count()
sc.stop()
这按预期运行。我现在想连接到在同一台服务器上运行的 Postgres9.3 数据库。我已经从这里here 下载了 JDBC 驱动程序并将其放在文件夹 D:\Analytics\Spark\spark_jars 中。然后我创建了一个新文件 D:\Analytics\Spark\spark-1.4.0-bin-hadoop2.6\conf\spark-defaults.conf 包含这一行:
spark.driver.extraClassPath 'D:\\Analytics\\Spark\\spark_jars\\postgresql-9.3-1103.jdbc41.jar'
我已经运行了以下代码来测试连接
import os
os.environ["SPARK_HOME"] = "D:\Analytics\Spark\spark-1.4.0-bin-hadoop2.6"
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
spark_config = SparkConf().setMaster("local[8]")
sc = SparkContext(conf=spark_config)
sqlContext = SQLContext(sc)
df = (sqlContext
.load(source="jdbc",
url="jdbc:postgresql://[hostname]/[database]?user=[username]&password=[password]",
dbtable="pubs")
)
sc.stop()
但出现以下错误:
Py4JJavaError: An error occurred while calling o22.load.
: java.sql.SQLException: No suitable driver found for jdbc:postgresql://uklonana01/stonegate?user=analytics&password=pMOe8jyd
at java.sql.DriverManager.getConnection(Unknown Source)
at java.sql.DriverManager.getConnection(Unknown Source)
at org.apache.spark.sql.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:118)
at org.apache.spark.sql.jdbc.JDBCRelation.<init>(JDBCRelation.scala:128)
at org.apache.spark.sql.jdbc.DefaultSource.createRelation(JDBCRelation.scala:113)
at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:265)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Unknown Source)
如何检查我是否下载了正确的 .jar 文件或错误可能来自何处?
【问题讨论】:
我已经尝试过 postgresql-9.3-1103.jdbc41.jar 和很多其他 .jar 文件。我也尝试添加#s.environ["SPARK_CLASSPATH"]="D:\\Analytics\\Spark\\spark_jars\\*"
,但这会产生错误Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext. : org.apache.spark.SparkException: Found both spark.driver.extraClassPath and SPARK_CLASSPATH. Use only the former.
,这意味着上述版本应该可以工作。
【参考方案1】:
我尝试过 SPARK_CLASSPATH 环境变量,但它不适用于 Spark 1.6。
来自以下帖子的其他答案建议添加 pyspark 命令参数,它可以工作。
Not able to connect to postgres using jdbc in pyspark shell
Apache Spark : JDBC connection not working
pyspark --conf spark.executor.extraClassPath=<jdbc.jar> --driver-class-path <jdbc.jar> --jars <jdbc.jar> --master <master-URL>
【讨论】:
【参考方案2】:删除 spark-defaults.conf 并将 SPARK_CLASSPATH 添加到 python 中的系统环境,如下所示:
os.environ["SPARK_CLASSPATH"] = 'PATH\\TO\\postgresql-9.3-1101.jdbc41.jar'
【讨论】:
【参考方案3】:另一种将 pyspark 与您的 postrgresql 数据库连接的方法。
1) 使用 pip 安装 spark:pip install pyspark
2) 下载最新版本的 jdbc postgresql 连接器: https://jdbc.postgresql.org/download.html
3) 使用您的数据库凭据完成此代码:
from __future__ import print_function
from pyspark.sql import SparkSession
def jdbc_dataset_example(spark):
df = spark.read \
.jdbc("jdbc:postgresql://[your_db_host]:[your_db_port]/[your_db_name]",
"com_dim_city",
properties="user": "[your_user]", "password": "[your_password]")
df.createOrReplaceTempView("[your_table]")
sqlDF = spark.sql("SELECT * FROM [your_table] LIMIT 10")
sqlDF.show()
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Python Spark SQL data source example") \
.getOrCreate()
jdbc_dataset_example(spark)
spark.stop()
最后运行你的应用程序:
spark-submit --driver-class-path /path/to/your_jdbc_jar/postgresql-42.2.6.jar --jars postgresql-42.2.6.jar /path/to/your_jdbc_jar/test_pyspark_to_postgresql.py
【讨论】:
以上是关于从 Spark/pyspark 连接到 PostgreSQL的主要内容,如果未能解决你的问题,请参考以下文章
使用 sqlalchemy 连接到本地 postgresql