使用 Apache Spark 1.4.0 写入 Oracle 数据库

Posted

技术标签:

【中文标题】使用 Apache Spark 1.4.0 写入 Oracle 数据库【英文标题】:Writing to Oracle Database using Apache Spark 1.4.0 【发布时间】:2015-07-08 08:22:45 【问题描述】:

我正在尝试使用 Spark 1.4.0 DataFrame.write.jdbc() 函数将一些数据写入我们的 Oracle 数据库。

用于将数据从 Oracle 数据库读取到 DataFrame 对象的对称 read.jdbc() 函数运行良好。但是,当我写回数据帧时(我也尝试编写与从数据库设置 CverWrite 获得的完全相同的对象)给出以下异常:

Exception in thread "main" java.sql.SQLSyntaxErrorException: ORA-00902: Ungültiger Datentyp

    at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:450)
    at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:399)
    at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1017)
    at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:655)
    at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:249)
    at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:566)
    at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:215)
    at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:58)
    at oracle.jdbc.driver.T4CPreparedStatement.executeForRows(T4CPreparedStatement.java:943)
    at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1075)
    at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3820)
    at oracle.jdbc.driver.OraclePreparedStatement.executeUpdate(OraclePreparedStatement.java:3897)
    at oracle.jdbc.driver.OraclePreparedStatementWrapper.executeUpdate(OraclePreparedStatementWrapper.java:1361)
    at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:252)
    at main3$.main(main3.scala:72)
    at main3.main(main3.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

该表有 2 个基本字符串列。当它们是Integer时,它也可以写它。

实际上,当我更深入时,我意识到它将 StringType 映射到 Oracle 无法识别的“TEXT”(应该改为“VARCHAR”)。代码来自 jdbc.scala,可以在 GitHub 找到:

def schemaString(df: DataFrame, url: String): String = 
      val sb = new StringBuilder()
      val dialect = JdbcDialects.get(url)
      df.schema.fields foreach  field => 
        val name = field.name
        val typ: String =
          dialect.getJDBCType(field.dataType).map(_.databaseTypeDefinition).getOrElse(
          field.dataType match 
            case IntegerType => "INTEGER"
            case LongType => "BIGINT"
            case DoubleType => "DOUBLE PRECISION"
            case FloatType => "REAL"
            case ShortType => "INTEGER"
            case ByteType => "BYTE"
            case BooleanType => "BIT(1)"
            case StringType => "TEXT"
            case BinaryType => "BLOB"
            case TimestampType => "TIMESTAMP"
            case DateType => "DATE"
            case DecimalType.Unlimited => "DECIMAL(40,20)"
            case _ => throw new IllegalArgumentException(s"Don't know how to save $field to JDBC")
          )
        val nullable = if (field.nullable) "" else "NOT NULL"
        sb.append(s", $name $typ $nullable")
      
      if (sb.length < 2) "" else sb.substring(2)
    

所以问题是我在什么地方弄错了,或者 SparkSQL 不支持 Oracle,我应该安装一个插件来将 SparkSQL 与 Oracle 一起使用吗?

我的简单主要是:

val conf = new SparkConf().setAppName("Parser").setMaster("local[*]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)



val reader = sqlContext.read
val frame = reader.jdbc(url,"STUDENTS",connectionprop)

frame.printSchema()
frame.show()


val row = Row("3","4")


val struct =
  StructType(
    StructField("ONE", StringType, true) ::
      StructField("TWO", StringType, true) :: Nil)

val arr = Array(row)
val rddRow = sc.parallelize(arr)
val dframe = sqlContext.createDataFrame(rddRow,struct
)
dframe.printSchema()
dframe.show()

dframe.write.jdbc(url,"STUDENTS",connectionprop)

【问题讨论】:

【参考方案1】:

实际答案 - 使用 1.4.0 中现有的 DataFrame.write.jdbc() 实现回写 Oracle 是不可能的,但如果您不介意升级到 Spark 1.5,则可以使用一些 hackish 方法. 如here所述有两个问题:

检查表是否存在的简单方法与 oracle 不兼容

SELECT 1 FROM $table LIMIT 1

可以通过直接保存表实用程序方法轻松避免

org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.saveTable(df, url, table, props)

而且很难(正如您已经猜到的那样)- 没有开箱即用的 Oracle 特定数据类型方言。取自同一篇解决方案:

import org.apache.spark.sql.jdbc.JdbcDialects, JdbcType, JdbcDialect
import org.apache.spark.sql.types._

  val OracleDialect = new JdbcDialect 
    override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle") || url.contains("oracle")

    override def getJDBCType(dt: DataType): Option[JdbcType] = dt match 
      case StringType => Some(JdbcType("VARCHAR2(255)", java.sql.Types.VARCHAR))
      case BooleanType => Some(JdbcType("NUMBER(1)", java.sql.Types.NUMERIC))
      case IntegerType => Some(JdbcType("NUMBER(10)", java.sql.Types.NUMERIC))
      case LongType => Some(JdbcType("NUMBER(19)", java.sql.Types.NUMERIC))
      case DoubleType => Some(JdbcType("NUMBER(19,4)", java.sql.Types.NUMERIC))
      case FloatType => Some(JdbcType("NUMBER(19,4)", java.sql.Types.NUMERIC))
      case ShortType => Some(JdbcType("NUMBER(5)", java.sql.Types.NUMERIC))
      case ByteType => Some(JdbcType("NUMBER(3)", java.sql.Types.NUMERIC))
      case BinaryType => Some(JdbcType("BLOB", java.sql.Types.BLOB))
      case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE))
      case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))
//      case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC))
      case DecimalType.Unlimited => Some(JdbcType("NUMBER(38,4)", java.sql.Types.NUMERIC))
      case _ => None
    
  

    JdbcDialects.registerDialect(OracleDialect)

所以,最后,工作示例应该类似于

  val url: String = "jdbc:oracle:thin:@your_domain:1521/dbname"
  val driver: String = "oracle.jdbc.OracleDriver"
  val props = new java.util.Properties()
  props.setProperty("user", "username")
  props.setProperty("password", "userpassword")
  org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.saveTable(dataFrame, url, "table_name", props)

【讨论】:

不幸的是,这个 API 是 Spark 内部的。因此可以为以后的版本破坏代码。情况如下:在 Spark 2.1 中,第四个参数已从 Property 更改为 JDBCOptions。请看issues.apache.org/jira/browse/SPARK-19296?filter=-2【参考方案2】:

更新:从 Spark 2.x 开始

在创建 jdbc 表时,Spark 中的每个 columnName 都被双引号括起来,因此当您尝试通过 sqlPlus 查询所有 Oracle 表 columnNames 时,它们都会区分大小写。

select colA from myTable; => doesn't works anymore
select "colA" from myTable; =>  works

[解决方案] Dataframe to Oracle creates table with case sensitive column

【讨论】:

【参考方案3】:

您可以使用org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.saveTable。就像 Aerondir 说的那样。

【讨论】:

以上是关于使用 Apache Spark 1.4.0 写入 Oracle 数据库的主要内容,如果未能解决你的问题,请参考以下文章

Apache spark - JDBC 写入,中间提交

使用经 EMRFS S3 优化的提交器提高 Apache Spark 写入 Apache Parquet 格式文件的性能

Apache Spark 结构化流 (DataStreamWriter) 写入 Hive 表

从 Azure Synapse 中的 Apache Spark 将数据写入 SQL DW

在 Apache Spark 中,用 Java 将数据帧写入 Hive 表

使用 Apache Spark 和 Java 按列分组并将每组字符串写入文本文件