将 spark current_timestamp 值推送到带有 timezone 列的 postgres 时间戳
Posted
技术标签:
【中文标题】将 spark current_timestamp 值推送到带有 timezone 列的 postgres 时间戳【英文标题】:Push spark current_timestamp value to postgres timestamp with timezone column 【发布时间】:2022-01-11 14:26:04 【问题描述】:我需要将“start_time”值推送到具有“timestamp with timezone”数据类型的列的 postgres 表中。我只需要 java jdbc 连接中的解决方案。
import spark.implicits._
val df1 =Seq(("Process Name","Process Description"))
.toDF("process_nm","process_desc")
val df2 = df1.withColumn("start_time",current_timestamp)
df2.show(false)
df2.printSchema
df2.collect().foreach(row=>
println("Before calling-"+row.getString(0)+" "+row.getString(1)+"
"+row.getTimestamp(2))
var process_name:String=row.getString(0)
var process_description:String=row.getString(1)
var start_time=row.getTimestamp(2)
var insertSql="""insert into test_log(process_nm,start_time,process_desc)
values('$process_name','$start_time','$process_description')"""
import com.typesafe.config.ConfigFactory
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.concat, lit
import java.io.File
import java.sql.Connection, DriverManager
var db_conn_string = "jdbc:" + db_type + "://" + db_host + ":" + db_port + "/" + db_database
val direct_conn = DriverManager.getConnection(db_conn_string, db_user, db_pass)
val statement = direct_conn.createStatement()
val result=statement.executeUpdate(insertSql)
println("inserted-"+result)
println("insert_sql-"+insertSql)
)
将 start_time 推送到 postgres 表时出现以下错误
Before calling-Process Name Process Description 2021-12-06 05:51:12.278559
org.postgresql.util.PSQLException: ERROR: invalid input syntax for type timestamp with time zone: "$start_time"
Position: 93
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2552)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2284)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:322)
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:481)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:401)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:322)
at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:308)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:284)
at org.postgresql.jdbc.PgStatement.executeUpdate(PgStatement.java:258)
at $anonfun$res19$1(<pastie>:57)
at $anonfun$res19$1$adapted(<pastie>:32)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
【问题讨论】:
【参考方案1】:用途:
var insertSql="insert into test_log(process_nm,start_time,process_desc)
values('$process_name','$start_time','$process_description')"
您不想使用“”,因为它不允许变量替换。
顺便说一句,我还建议在可能的情况下使用“val”而不是“var”来提高性能。
【讨论】:
我相信它不会抱怨报价。 Spark 无法将“start_time”转换为具有时区数据类型的 postgres 时间戳。有什么办法可以将其转换为时区格式的时间戳? 这不是在抱怨我同意的报价,而且您是正确的,因为 postgress 无法解释您发送的内容,但问题是您实际上是在发送“$start_time”。您的错误消息在上面说明了这一点。仅供参考,“””它是 scala 的原始插值器,不进行变量插值。 如果您觉得这个答案有帮助并且您感到舒服,您可以将其标记为正确吗? 我用单双引号试过了,还是一样的问题。 你能用你更新的代码更新问题吗?以上是关于将 spark current_timestamp 值推送到带有 timezone 列的 postgres 时间戳的主要内容,如果未能解决你的问题,请参考以下文章
如何通过 python 将 current_timestamp 插入 Postgres
在 Redshift 中,如何使用 UTC 时区将 current_timestamp 转换为时间戳?
有没有办法让 CURRENT_TIMESTAMP 只在数据库中存储日期、小时和分钟?
MySQL TIMESTAMP 为默认 NULL,而不是 CURRENT_TIMESTAMP