PySpark读写PostgreSQL
Posted PostgreSQL
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了PySpark读写PostgreSQL相关的知识,希望对你有一定的参考价值。
之前写过一篇,是以mysql和SQLite为例,有网友反应使用PostgreSQL报错,本文我就整理下PySpark如何读写PostgreSQL数据库。
下载PostgreSQL JDBC,
!wget https://jdbc.postgresql.org/download/postgresql-42.2.5.jar \
-O /Users/steven/spark/jars/postgresql-42.2.5.jar
连接Spark
记得设置 spark.driver.extraClassPath 指向刚刚下载的PostgreSQL JDBC Driver。
from
pyspark.sql import SparkSessionfrom pyspark import SparkConf,SparkContext
path_jdbc="/Users/steven/spark/jars/postgresql-42.2.5.jar"
conf=SparkConf()
conf.set("spark.driver.extraClassPath"
,path_jdbc)
spark = SparkSession.builder\
.master("local[*]")\
.appName('jdbc PG')\
.config(conf=conf)\
.getOrCreate()
准备数据
本文仍然以鸢尾花数据为例(iris.csv)
df = spark.read.csv(path = 'iris.csv'
, header = True
,inferSchema = True)
#字段名转小写,并把.替换成_
df=df.toDF(*(c.replace('.', '_').lower() for c in df.columns))
df.limit(5).toPandas()
把Spark DataFrame写入PostgreSQL及从PostgreSQL读取数据到Spark DataFrame
db_host = "127.0.0.1"
db_port = 5432
table_name = "iris"
db_name = "steven"
user = "steven"
password = "password"
db_url = "jdbc:postgresql://{}:{}/{}".format(
db_host
, db_port
, db_name)
options = {
"url": db_url,
"dbtable": table_name,
"user": user,
"password": password,
"driver": "org.postgresql.Driver",
"numPartitions": 10,
}
overwrite模式,如果表存在,删除,重建
df.write.format('jdbc')\
.options(**options)\
.mode("overwrite").save()
df1=spark.read.format('jdbc')\
.options(**options).load()
df1.count()
#返回
150
append模式,表存在,追加
df.write.format('jdbc')\
.options(**options)\
.mode("append").save()
df1=spark.read.format('jdbc')\
.options(**options).load()
df1.count()
#返回
300
读取下推查询(可以理解为视图)
table_setosa="""(select * from iris
where species='setosa') as setosa"""
options['dbtable']=table_setosa
df1=spark.read.format('jdbc').options(**options).load()
df1.count()
#返回
50
换个API,试试
properties = {
"user": user,
"password": password,
"driver": "org.postgresql.Driver",
"numPartitions": "10",
}
#表名带schema的测试
df.write.jdbc(
url=db_url,
table='plp.iris12',
properties=properties,
mode="overwrite")
df2=spark.read.jdbc(
url=db_url,
table='plp.iris12',
properties=properties
)
df2.count()
#返回
150
df2=spark.read.jdbc(
url=db_url,
table="(select * from plp.iris12 where species='setosa') as setosa ",
properties=properties
)
df2.count()
#返回
50
关闭Spark
spark.stop()
参考,
https://jdbc.postgresql.org/download/
https://github.com/alitrack/pyspark/blob/master/iris.csv
https://github.com/alitrack/pyspark/blob/master/iris.csv
衡数提供下列服务,有意请留言
PySpark培训
Excel培训
数据挖掘咨询
数据挖掘外包
人才推荐
以上是关于PySpark读写PostgreSQL的主要内容,如果未能解决你的问题,请参考以下文章
从 Spark/pyspark 连接到 PostgreSQL
postgresql已经配置好主从,java中怎么进行读写分离