将 spark 数据帧写入 postgres 数据库

Posted

技术标签:

【中文标题】将 spark 数据帧写入 postgres 数据库【英文标题】:Write spark dataframe to postgres Database 【发布时间】:2016-08-08 09:40:13 【问题描述】:

spark集群设置如下:

conf['SparkConfiguration'] = SparkConf() \
.setMaster('yarn-client') \
.setAppName("test") \
.set("spark.executor.memory", "20g") \
.set("spark.driver.maxResultSize", "20g") \
.set("spark.executor.instances", "20")\
.set("spark.executor.cores", "3") \
.set("spark.memory.fraction", "0.2") \
.set("user", "test_user") \
.set("spark.executor.extraClassPath", "/usr/share/java/postgresql-jdbc3.jar")

当我尝试使用以下代码将数据帧写入 Postgres 数据库时:

from pyspark.sql import DataFrameWriter
my_writer = DataFrameWriter(df)

url_connect = "jdbc:postgresql://198.123.43.24:1234"
table = "test_result"
mode = "overwrite"
properties = "user":"postgres", "password":"password"

my_writer.jdbc(url_connect, table, mode, properties)

我遇到以下错误:

Py4JJavaError: An error occurred while calling o1120.jdbc.   
:java.sql.SQLException: No suitable driver
    at java.sql.DriverManager.getDriver(DriverManager.java:278)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$2.apply(JdbcUtils.scala:50)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$2.apply(JdbcUtils.scala:50)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createConnectionFactory(JdbcUtils.scala:49)
at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:278)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)

谁能就此提供一些建议? 谢谢!

【问题讨论】:

【参考方案1】:

尝试 write.jdbc 并传递在 write.jdbc() 之外单独创建的参数。 还要检查可用于编写我的 postgres 的端口,对于 Postgres 9.6,端口是 5432,对于 Postgres 8.4,端口是 5433。

mode = "overwrite"
url = "jdbc:postgresql://198.123.43.24:5432/kockpit"
properties = "user": "postgres","password": "password","driver": "org.postgresql.Driver"
data.write.jdbc(url=url, table="test_result", mode=mode, properties=properties)

【讨论】:

【参考方案2】:

您是否下载了 PostgreSQL JDBC 驱动程序?在此处下载:https://jdbc.postgresql.org/download.html

对于 pyspark shell,您使用 SPARK_CLASSPATH 环境变量:

$ export SPARK_CLASSPATH=/path/to/downloaded/jar
$ pyspark

要通过 spark-submit 提交脚本,请使用 --driver-class-path 标志:

$ spark-submit --driver-class-path /path/to/downloaded/jar script.py

【讨论】:

【参考方案3】:

也许您可以尝试显式传递 JDBC 驱动程序类(请注意,您可能需要将驱动程序 jar 放在所有 spark 节点的类路径中):

df.write.option('driver', 'org.postgresql.Driver').jdbc(url_connect, table, mode, properties)

【讨论】:

感谢您的回复。它给出了以下错误消息: TypeError: 'DataFrameWriter' object is not callable @Yiliang,对不起,在 pyspark 中 write 不是函数,你应该使用 df.write 而不是 df.write()。我的错误 谢谢丹尼尔。现在我遇到了该行的 java.lang.NullPointerException。知道会出什么问题吗?

以上是关于将 spark 数据帧写入 postgres 数据库的主要内容,如果未能解决你的问题,请参考以下文章

Spark写入postgres慢

如何将流式数据帧写入 PostgreSQL?

将大型 Spark 数据帧从数据块写入 csv 失败

如何将 Spark 数据帧写入 impala 数据库

将 Spark 数据帧写入带分区的 CSV

将 Spark 数据帧写入带分区的 CSV