如何使用 JDBC 源在 (Pyspark?

Posted

技术标签:

【中文标题】如何使用 JDBC 源在 (Pyspark?【英文标题】:How to use JDBC source to write and read data in (Py)Spark? 【发布时间】:2015-06-22 15:30:15 【问题描述】:

本题的目的是记录:

在 PySpark 中使用 JDBC 连接读取和写入数据所需的步骤

JDBC 源和已知解决方案可能存在的问题

只需稍加改动,这些方法就可以与其他支持的语言(包括 Scala 和 R)一起使用。

【问题讨论】:

【参考方案1】:

写入数据

    在提交应用程序或启动 shell 时包含适用的 JDBC 驱动程序。例如,您可以使用--packages:

     bin/pyspark --packages group:name:version  
    

或者结合driver-class-pathjars

    bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR

这些属性也可以在JVM实例启动前使用PYSPARK_SUBMIT_ARGS环境变量设置,或者使用conf/spark-defaults.conf设置spark.jars.packagesspark.jars/spark.driver.extraClassPath

    选择所需的模式。 Spark JDBC writer 支持以下模式:

    append:将此 :class:DataFrame 的内容附加到现有数据中。 overwrite:覆盖现有数据。 ignore:如果数据已经存在,则忽略此操作。 error(默认情况):如果数据已经存在,则抛出异常。

    Upserts 或其他细粒度修改are not supported

     mode = ...
    

    准备JDBC URI,例如:

     # You can encode credentials in URI or pass
     # separately using properties argument
     # of jdbc method or options
    
     url = "jdbc:postgresql://localhost/foobar"
    

    (可选)创建 JDBC 参数字典。

     properties = 
         "user": "foo",
         "password": "bar"
     
    

    properties/options也可以用来设置supported JDBC connection properties。

    使用DataFrame.write.jdbc

     df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
    

保存数据(详见pyspark.sql.DataFrameWriter)。

已知问题

使用--packages (java.sql.SQLException: No suitable driver found for jdbc: ...) 包含驱动程序时找不到合适的驱动程序

假设没有驱动程序版本不匹配来解决这个问题,您可以将driver 类添加到properties。例如:

  properties = 
      ...
      "driver": "org.postgresql.Driver"
  

使用df.write.format("jdbc").options(...).save() 可能会导致:

java.lang.RuntimeException: org.apache.spark.sql.execution.datasources.jdbc.DefaultSource 不允许创建表作为选择。

解决方案未知。

在 Pyspark 1.3 中你可以尝试直接调用 Java 方法:

  df._jdf.insertIntoJDBC(url, "baz", True)

读取数据

    按照写入数据中的步骤 1-4

    使用sqlContext.read.jdbc

     sqlContext.read.jdbc(url=url, table="baz", properties=properties)
    

sqlContext.read.format("jdbc"):

    (sqlContext.read.format("jdbc")
        .options(url=url, dbtable="baz", **properties)
        .load())

已知问题和陷阱

找不到合适的驱动程序 - 请参阅:写入数据

Spark SQL 支持使用 JDBC 源进行谓词下推,但并非所有谓词都可以下推。它也不委托限制或聚合。可能的解决方法是将 dbtable / table 参数替换为有效的子查询。例如:

Does spark predicate pushdown work with JDBC? More than one hour to execute pyspark.sql.DataFrame.take(4) How to use SQL query to define table in dbtable?

默认情况下,JDBC 数据源使用单个执行线程顺序加载数据。为确保分布式数据加载,您可以:

提供分区column(必须是IntegerType)、lowerBoundupperBoundnumPartitions。 提供一个互斥谓词列表predicates,每个所需分区一个。

见:

Partitioning in spark while reading from RDBMS via JDBC, How to optimize partitioning when migrating data from JDBC source?, How to improve performance for slow Spark jobs using DataFrame and JDBC connection? How to partition Spark RDD when importing Postgres using JDBC?

在分布式模式下(使用分区列或谓词),每个执行程序都在自己的事务中运行。如果同时修改源数据库,则无法保证最终视图一致。

在哪里可以找到合适的驱动程序:

Maven Repository(要获得--packages 所需的坐标,请选择所需的版本并以compile-group:name:version 的形式从Gradle 选项卡复制数据,替换相应的字段)或Maven Central Repository:

PostgreSQL mysql

其他选项

根据数据库,可能存在专门的源,并且在某些情况下是首选的:

Greenplum - Pivotal Greenplum-Spark Connector Apache Phoenix - Apache Spark Plugin Microsoft SQL Server - Spark connector for Azure SQL Databases and SQL Server Amazon Redshift - Databricks Redshift connector(当前版本仅在专有 Databricks 运行时可用。Discontinued open source version, available on GitHub)。

【讨论】:

mode="overwrite" 使用这个命令:spark_submit --driver-class-path /xx/yy/postgresql-xx.jar my-script.py【参考方案2】:

下载mysql-connector-java驱动并保存在spark jar文件夹中,观察下面的python代码将数据写入“acotr1”,我们必须在mysql数据库中创建acotr1表结构

    spark = SparkSession.builder.appName("prasadad").master('local').config('spark.driver.extraClassPath','D:\spark-2.1.0-bin-hadoop2.7\jars\mysql-connector-java-5.1.41-bin.jar').getOrCreate()

    sc = spark.sparkContext

    from pyspark.sql import SQLContext

    sqlContext = SQLContext(sc)

    df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/sakila",driver="com.mysql.jdbc.Driver",dbtable="actor",user="root",password="****").load()

    mysql_url="jdbc:mysql://localhost:3306/sakila?user=root&password=****"

    df.write.jdbc(mysql_url,table="actor1",mode="append")

【讨论】:

【参考方案3】:

参考此链接下载 jdbc for postgres 并按照步骤下载 jar 文件

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/exercises/spark-exercise-dataframe-jdbc-postgresql.html jar 文件将在这样的路径中下载。 "/home/anand/.ivy2/jars/org.postgresql_postgresql-42.1.1.jar"

如果你的 spark 版本是 2

from pyspark.sql import SparkSession

spark = SparkSession.builder
        .appName("sparkanalysis")
        .config("spark.driver.extraClassPath",
         "/home/anand/.ivy2/jars/org.postgresql_postgresql42.1.1.jar")
        .getOrCreate()

//for localhost database//

pgDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:postgres") \
.option("dbtable", "public.user_emp_tab") \
.option("user", "postgres") \
.option("password", "Jonsnow@100") \
.load()


print(pgDF)

pgDF.filter(pgDF["user_id"]>5).show()

将文件另存为python并运行“python相应文件名.py”

【讨论】:

以上是关于如何使用 JDBC 源在 (Pyspark?的主要内容,如果未能解决你的问题,请参考以下文章

通过 JDBC 进行并行化 - Pyspark - 并行化如何使用 JDBC 工作?

PySpark:如何使用带有 JDBC 连接的 MySQL 函数?

如何使用带有 PySpark 的 WHERE 子句在 AWS Glue 中查询 JDBC 数据库?

如何使用 spark.read.jdbc 读取不同 Pyspark 数据帧中的多个文件

如何在 PySpark 中连接到 Presto JDBC?

我们如何使用 jdbc 执行连接查询,而不是使用 pyspark 获取多个表