PySpark读写PostgreSQL

Posted PostgreSQL

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了PySpark读写PostgreSQL相关的知识,希望对你有一定的参考价值。

之前写过一篇,是以mysql和SQLite为例,有网友反应使用PostgreSQL报错,本文我就整理下PySpark如何读写PostgreSQL数据库。

  • 下载PostgreSQL JDBC, 


!wget https://jdbc.postgresql.org/download/postgresql-42.2.5.jar \-O /Users/steven/spark/jars/postgresql-42.2.5.jar 
  • 连接Spark

记得设置 spark.driver.extraClassPath 指向刚刚下载的PostgreSQL JDBC Driver。

from pyspark.sql import SparkSessionfrom pyspark import SparkConf,SparkContext path_jdbc="/Users/steven/spark/jars/postgresql-42.2.5.jar"conf=SparkConf()conf.set("spark.driver.extraClassPath",path_jdbc)spark = SparkSession.builder\.master("local[*]")\.appName('jdbc PG')\.config(conf=conf)\.getOrCreate()
  • 准备数据

本文仍然以鸢尾花数据为例(iris.csv)

df = spark.read.csv(path = 'iris.csv', header = True,inferSchema = True)#字段名转小写,并把.替换成_df=df.toDF(*(c.replace('.', '_').lower() for c in df.columns))
df.limit(5).toPandas()


  • 把Spark DataFrame写入PostgreSQL及从PostgreSQL读取数据到Spark DataFrame

db_host = "127.0.0.1"db_port = 5432table_name = "iris"db_name = "steven"user = "steven"password = "password"db_url = "jdbc:postgresql://{}:{}/{}".format( db_host , db_port , db_name)
options = { "url": db_url, "dbtable": table_name,    "user": user,    "password": password, "driver": "org.postgresql.Driver", "numPartitions": 10,}


    • overwrite模式,如果表存在,删除,重建

df.write.format('jdbc')\.options(**options)\.mode("overwrite").save()df1=spark.read.format('jdbc')\.options(**options).load()df1.count()#返回 150


    • append模式,表存在,追加

df.write.format('jdbc')\.options(**options)\.mode("append").save()df1=spark.read.format('jdbc')\.options(**options).load()df1.count()#返回 300


    • 读取下推查询(可以理解为视图)

table_setosa="""(select * from iris where species='setosa') as setosa"""
options['dbtable']=table_setosadf1=spark.read.format('jdbc').options(**options).load()df1.count()#返回50
  • 换个API,试试

properties = {    "user": user,    "password": password, "driver": "org.postgresql.Driver", "numPartitions": "10",}
#表名带schema的测试df.write.jdbc( url=db_url, table='plp.iris12', properties=properties,          mode="overwrite")          df2=spark.read.jdbc( url=db_url, table='plp.iris12', properties=properties)df2.count()#返回150


df2=spark.read.jdbc( url=db_url, table="(select * from plp.iris12 where species='setosa') as setosa ", properties=properties)
df2.count()#返回50


关闭Spark

spark.stop()


参考,

  • https://jdbc.postgresql.org/download/

  • https://github.com/alitrack/pyspark/blob/master/iris.csv


  • https://github.com/alitrack/pyspark/blob/master/iris.csv


衡数提供下列服务,有意请留言

  • PySpark培训

  • Excel培训

  • 数据挖掘咨询

  • 数据挖掘外包

  • 人才推荐



以上是关于PySpark读写PostgreSQL的主要内容,如果未能解决你的问题,请参考以下文章

从 Spark/pyspark 连接到 PostgreSQL

pyspark对Mysql数据库进行读写

postgresql已经配置好主从,java中怎么进行读写分离

PostgreSQL 是不是支持表(片段)的透明压缩?

pyspark文件读写示例-(CSV/JSON/Parquet-单个或多个)

如何使用 PySpark 将 JSON 列类型写入 Postgres?