将 spark current_timestamp 值推送到带有 timezone 列的 postgres 时间戳

Posted

技术标签:

【中文标题】将 spark current_timestamp 值推送到带有 timezone 列的 postgres 时间戳【英文标题】:Push spark current_timestamp value to postgres timestamp with timezone column 【发布时间】:2022-01-11 14:26:04 【问题描述】:

我需要将“start_time”值推送到具有“timestamp with timezone”数据类型的列的 postgres 表中。我只需要 java jdbc 连接中的解决方案。

import spark.implicits._ 
val df1 =Seq(("Process Name","Process Description"))
                         .toDF("process_nm","process_desc") 
val df2 = df1.withColumn("start_time",current_timestamp)  
df2.show(false)             
df2.printSchema

df2.collect().foreach(row=>
     
     println("Before calling-"+row.getString(0)+"   "+row.getString(1)+"   
                                         "+row.getTimestamp(2))
    
     var process_name:String=row.getString(0)
     var process_description:String=row.getString(1) 
     var start_time=row.getTimestamp(2)
     var insertSql="""insert into test_log(process_nm,start_time,process_desc) 
                       values('$process_name','$start_time','$process_description')""" 

     import com.typesafe.config.ConfigFactory
     import org.apache.spark.sql.SparkSession
     import org.apache.spark.sql.functions.concat, lit

     import java.io.File
     import java.sql.Connection, DriverManager
 
     var db_conn_string = "jdbc:" + db_type + "://" + db_host + ":" + db_port + "/" + db_database
     val direct_conn = DriverManager.getConnection(db_conn_string, db_user, db_pass)
     val statement = direct_conn.createStatement()
     val result=statement.executeUpdate(insertSql)
     println("inserted-"+result)

     println("insert_sql-"+insertSql)  
)  

将 start_time 推送到 postgres 表时出现以下错误

Before calling-Process Name   Process Description   2021-12-06 05:51:12.278559
org.postgresql.util.PSQLException: ERROR: invalid input syntax for type timestamp with time zone: "$start_time"
  Position: 93
  at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2552)
  at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2284)
  at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:322)
  at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:481)
  at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:401)
  at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:322)
  at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:308)
  at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:284)
  at org.postgresql.jdbc.PgStatement.executeUpdate(PgStatement.java:258)
  at $anonfun$res19$1(<pastie>:57)
  at $anonfun$res19$1$adapted(<pastie>:32)
  at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
  at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)

【问题讨论】:

【参考方案1】:

用途:

 var insertSql="insert into test_log(process_nm,start_time,process_desc) 
                       values('$process_name','$start_time','$process_description')"

您不想使用“”,因为它不允许变量替换。

顺便说一句,我还建议在可能的情况下使用“val”而不是“var”来提高性能。

【讨论】:

我相信它不会抱怨报价。 Spark 无法将“start_time”转换为具有时区数据类型的 postgres 时间戳。有什么办法可以将其转换为时区格式的时间戳? 这不是在抱怨我同意的报价,而且您是正确的,因为 postgress 无法解释您发送的内容,但问题是您实际上是在发送“$start_time”。您的错误消息在上面说明了这一点。仅供参考,“””它是 scala 的原始插值器,不进行变量插值。 如果您觉得这个答案有帮助并且您感到舒服,您可以将其标记为正确吗? 我用单双引号试过了,还是一样的问题。 你能用你更新的代码更新问题吗?

以上是关于将 spark current_timestamp 值推送到带有 timezone 列的 postgres 时间戳的主要内容,如果未能解决你的问题,请参考以下文章

如何通过 python 将 current_timestamp 插入 Postgres

在 Redshift 中,如何使用 UTC 时区将 current_timestamp 转换为时间戳?

有没有办法让 CURRENT_TIMESTAMP 只在数据库中存储日期、小时和分钟?

MySQL TIMESTAMP 为默认 NULL,而不是 CURRENT_TIMESTAMP

mysql的java derby的current_timestamp

关键字“current_timestamp”附近的语法不正确 - 但仅在一个数据库上