通过 JDBC 将数组从 spark 数据帧输入到 postgreSQL
Posted
技术标签:
【中文标题】通过 JDBC 将数组从 spark 数据帧输入到 postgreSQL【英文标题】:Inputting an array from spark dataframe to postgreSQL through JDBC 【发布时间】:2017-12-25 17:58:44 【问题描述】:所以我需要维护一个带有结果的表并每隔一定时间向其中输入信息,因为 JDBC 和 spark 没有内置的 UPSERT 选项,而且我不能让自己在输入时让表空置结果还是要加倍,我自己构建了一个 UPSERT 函数。问题是我的dataFrame中有一个整数的WrappedArray,我似乎无法将它转换为一个java对象,让我将它插入到PreparedStatement中。 我的代码中的相关部分如下所示:
import java.sql._
val st: PreparedStatement = dbc.prepareStatement("""
INSERT INTO """ + table + """ as tb """ + sliced_columns + """
VALUES"""+"(" + "?, " * (columns.size - 1) + "?)"+"""
ON CONFLICT (id)
DO UPDATE SET """ + column_name + """= CAST (? AS _int4), count_win=?, occurrences=?, "sumOccurrences"=?, win_rate=? Where tb.id=?;
""")
如您所见,我尝试将 WrappedArray 编写为字符串,然后将其转换为 SQL 代码本身,但这感觉是一个非常糟糕的解决方案。
我将此作为输入部分,根据它是哪种列类型执行不同的操作:
for (single_type <- types)
single_type._2 match
case "IntegerType" => st.setInt(counter + 1, x.getInt(counter))
case "StringType" => st.setString(counter + 1, x.getString(counter))
case "DoubleType" => st.setDouble(counter + 1, x.getDouble(counter))
case "LongType" => st.setLong(counter + 1, x.getLong(counter))
case _ => st.setArray(counter + 1, x.getList(counter).toArray().asInstanceOf[Array])
这会返回一个错误 Ljava.lang.Object;不能强制转换为 java.sql.Array。非常感谢任何帮助!
【问题讨论】:
【参考方案1】:Array
是类型构造函数而不是类型:
import org.apache.spark.sql.Row
Row(Seq(1, 2, 3)).getList(0).toArray.asInstanceOf[Array[_]]
但toArray
(带类型)应该足够了
Row(Seq(1, 2, 3)).getList[Int](0).toArray
【讨论】:
感谢您的回答,我确实尝试过这个,但我得到的是错误:找到:Array[Object] (in scala) required: Array (in java.sql)【参考方案2】:这个问题最终被命令 createArrayOf 解决了
st.setArray(counter + 1, conn.createArrayOf("int4", x.getList[Int](4).toArray()))
【讨论】:
以上是关于通过 JDBC 将数组从 spark 数据帧输入到 postgreSQL的主要内容,如果未能解决你的问题,请参考以下文章
由于 PySpark 时间戳,将 Spark 数据帧保存到 Azure Synapse 时出现问题