通过 JDBC 将数组从 spark 数据帧输入到 postgreSQL

Posted

技术标签:

【中文标题】通过 JDBC 将数组从 spark 数据帧输入到 postgreSQL【英文标题】:Inputting an array from spark dataframe to postgreSQL through JDBC 【发布时间】:2017-12-25 17:58:44 【问题描述】:

所以我需要维护一个带有结果的表并每隔一定时间向其中输入信息,因为 JDBC 和 spark 没有内置的 UPSERT 选项,而且我不能让自己在输入时让表空置结果还是要加倍,我自己构建了一个 UPSERT 函数。问题是我的dataFrame中有一个整数的WrappedArray,我似乎无法将它转换为一个java对象,让我将它插入到PreparedStatement中。 我的代码中的相关部分如下所示:

import java.sql._
val st: PreparedStatement = dbc.prepareStatement("""
INSERT INTO """ + table + """ as tb """ + sliced_columns + """
VALUES"""+"(" + "?, " * (columns.size - 1) + "?)"+"""
ON CONFLICT (id) 
DO UPDATE SET """ + column_name + """= CAST (? AS _int4), count_win=?, occurrences=?, "sumOccurrences"=?, win_rate=?  Where tb.id=?;
""")

如您所见,我尝试将 WrappedArray 编写为字符串,然后将其转换为 SQL 代码本身,但这感觉是一个非常糟糕的解决方案。

我将此作为输入部分,根据它是哪种列类型执行不同的操作:

for (single_type <- types)
      single_type._2 match 
        case "IntegerType" => st.setInt(counter + 1, x.getInt(counter))
        case "StringType" => st.setString(counter + 1, x.getString(counter))
        case "DoubleType" => st.setDouble(counter + 1, x.getDouble(counter))
        case "LongType" => st.setLong(counter + 1, x.getLong(counter))
        case _ => st.setArray(counter + 1, x.getList(counter).toArray().asInstanceOf[Array])
      

这会返回一个错误 Ljava.lang.Object;不能强制转换为 java.sql.Array。非常感谢任何帮助!

【问题讨论】:

【参考方案1】:

Array 是类型构造函数而不是类型:

import org.apache.spark.sql.Row

Row(Seq(1, 2, 3)).getList(0).toArray.asInstanceOf[Array[_]]

toArray(带类型)应该足够了

Row(Seq(1, 2, 3)).getList[Int](0).toArray

【讨论】:

感谢您的回答,我确实尝试过这个,但我得到的是错误:找到:Array[Object] (in scala) required: Array (in java.sql)【参考方案2】:

这个问题最终被命令 createArrayOf 解决了

st.setArray(counter + 1, conn.createArrayOf("int4", x.getList[Int](4).toArray()))

【讨论】:

以上是关于通过 JDBC 将数组从 spark 数据帧输入到 postgreSQL的主要内容,如果未能解决你的问题,请参考以下文章

如何将 Spark 数据帧写入 impala 数据库

由于 PySpark 时间戳,将 Spark 数据帧保存到 Azure Synapse 时出现问题

如何使用 spark.read.jdbc 读取不同 Pyspark 数据帧中的多个文件

Spark SQL .distinct()性能

从spark jdbc连接向数据库发送选项

将 Spark 数据帧写入 postgres db 时出错