如何使用 JDBC 源在 (Pyspark?
Posted
技术标签:
【中文标题】如何使用 JDBC 源在 (Pyspark?【英文标题】:How to use JDBC source to write and read data in (Py)Spark? 【发布时间】:2015-06-22 15:30:15 【问题描述】:本题的目的是记录:
在 PySpark 中使用 JDBC 连接读取和写入数据所需的步骤
JDBC 源和已知解决方案可能存在的问题
只需稍加改动,这些方法就可以与其他支持的语言(包括 Scala 和 R)一起使用。
【问题讨论】:
【参考方案1】:写入数据
在提交应用程序或启动 shell 时包含适用的 JDBC 驱动程序。例如,您可以使用--packages
:
bin/pyspark --packages group:name:version
或者结合driver-class-path
和jars
bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR
这些属性也可以在JVM实例启动前使用PYSPARK_SUBMIT_ARGS
环境变量设置,或者使用conf/spark-defaults.conf
设置spark.jars.packages
或spark.jars
/spark.driver.extraClassPath
。
选择所需的模式。 Spark JDBC writer 支持以下模式:
append
:将此 :class:DataFrame
的内容附加到现有数据中。overwrite
:覆盖现有数据。ignore
:如果数据已经存在,则忽略此操作。error
(默认情况):如果数据已经存在,则抛出异常。
Upserts 或其他细粒度修改are not supported
mode = ...
准备JDBC URI,例如:
# You can encode credentials in URI or pass
# separately using properties argument
# of jdbc method or options
url = "jdbc:postgresql://localhost/foobar"
(可选)创建 JDBC 参数字典。
properties =
"user": "foo",
"password": "bar"
properties
/options
也可以用来设置supported JDBC connection properties。
使用DataFrame.write.jdbc
df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
保存数据(详见pyspark.sql.DataFrameWriter
)。
已知问题:
使用--packages
(java.sql.SQLException: No suitable driver found for jdbc: ...
) 包含驱动程序时找不到合适的驱动程序
假设没有驱动程序版本不匹配来解决这个问题,您可以将driver
类添加到properties
。例如:
properties =
...
"driver": "org.postgresql.Driver"
使用df.write.format("jdbc").options(...).save()
可能会导致:
java.lang.RuntimeException: org.apache.spark.sql.execution.datasources.jdbc.DefaultSource 不允许创建表作为选择。
解决方案未知。
在 Pyspark 1.3 中你可以尝试直接调用 Java 方法:
df._jdf.insertIntoJDBC(url, "baz", True)
读取数据
按照写入数据中的步骤 1-4
使用sqlContext.read.jdbc
:
sqlContext.read.jdbc(url=url, table="baz", properties=properties)
或sqlContext.read.format("jdbc")
:
(sqlContext.read.format("jdbc")
.options(url=url, dbtable="baz", **properties)
.load())
已知问题和陷阱:
找不到合适的驱动程序 - 请参阅:写入数据
Spark SQL 支持使用 JDBC 源进行谓词下推,但并非所有谓词都可以下推。它也不委托限制或聚合。可能的解决方法是将 dbtable
/ table
参数替换为有效的子查询。例如:
默认情况下,JDBC 数据源使用单个执行线程顺序加载数据。为确保分布式数据加载,您可以:
提供分区column
(必须是IntegerType
)、lowerBound
、upperBound
、numPartitions
。
提供一个互斥谓词列表predicates
,每个所需分区一个。
见:
Partitioning in spark while reading from RDBMS via JDBC, How to optimize partitioning when migrating data from JDBC source?, How to improve performance for slow Spark jobs using DataFrame and JDBC connection? How to partition Spark RDD when importing Postgres using JDBC?在分布式模式下(使用分区列或谓词),每个执行程序都在自己的事务中运行。如果同时修改源数据库,则无法保证最终视图一致。
在哪里可以找到合适的驱动程序:
Maven Repository(要获得--packages
所需的坐标,请选择所需的版本并以compile-group:name:version
的形式从Gradle 选项卡复制数据,替换相应的字段)或Maven Central Repository:
其他选项
根据数据库,可能存在专门的源,并且在某些情况下是首选的:
Greenplum - Pivotal Greenplum-Spark Connector Apache Phoenix - Apache Spark Plugin Microsoft SQL Server - Spark connector for Azure SQL Databases and SQL Server Amazon Redshift - Databricks Redshift connector(当前版本仅在专有 Databricks 运行时可用。Discontinued open source version, available on GitHub)。【讨论】:
mode="overwrite" 使用这个命令:spark_submit --driver-class-path /xx/yy/postgresql-xx.jar my-script.py【参考方案2】:下载mysql-connector-java驱动并保存在spark jar文件夹中,观察下面的python代码将数据写入“acotr1”,我们必须在mysql数据库中创建acotr1表结构
spark = SparkSession.builder.appName("prasadad").master('local').config('spark.driver.extraClassPath','D:\spark-2.1.0-bin-hadoop2.7\jars\mysql-connector-java-5.1.41-bin.jar').getOrCreate()
sc = spark.sparkContext
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/sakila",driver="com.mysql.jdbc.Driver",dbtable="actor",user="root",password="****").load()
mysql_url="jdbc:mysql://localhost:3306/sakila?user=root&password=****"
df.write.jdbc(mysql_url,table="actor1",mode="append")
【讨论】:
【参考方案3】:参考此链接下载 jdbc for postgres 并按照步骤下载 jar 文件
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/exercises/spark-exercise-dataframe-jdbc-postgresql.html jar 文件将在这样的路径中下载。 "/home/anand/.ivy2/jars/org.postgresql_postgresql-42.1.1.jar"
如果你的 spark 版本是 2
from pyspark.sql import SparkSession
spark = SparkSession.builder
.appName("sparkanalysis")
.config("spark.driver.extraClassPath",
"/home/anand/.ivy2/jars/org.postgresql_postgresql42.1.1.jar")
.getOrCreate()
//for localhost database//
pgDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:postgres") \
.option("dbtable", "public.user_emp_tab") \
.option("user", "postgres") \
.option("password", "Jonsnow@100") \
.load()
print(pgDF)
pgDF.filter(pgDF["user_id"]>5).show()
将文件另存为python并运行“python相应文件名.py”
【讨论】:
以上是关于如何使用 JDBC 源在 (Pyspark?的主要内容,如果未能解决你的问题,请参考以下文章
通过 JDBC 进行并行化 - Pyspark - 并行化如何使用 JDBC 工作?
PySpark:如何使用带有 JDBC 连接的 MySQL 函数?
如何使用带有 PySpark 的 WHERE 子句在 AWS Glue 中查询 JDBC 数据库?