用于 PostgreSQL 插入的 Spark Scala DataFrame 单行转换为 JSON

Posted

技术标签:

【中文标题】用于 PostgreSQL 插入的 Spark Scala DataFrame 单行转换为 JSON【英文标题】:Spark Scala DataFrame Single Row conversion to JSON for PostrgeSQL Insertion 【发布时间】:2016-04-29 00:04:59 【问题描述】:

使用名为 lastTail 的 DataFrame,我可以像这样进行迭代:

import scalikejdbc._
// ... 
// Do Kafka Streaming to create DataFrame lastTail
// ...

lastTail.printSchema

lastTail.foreachPartition(iter => 

// open database connection from connection pool
// with scalikeJDBC (to PostgreSQL) 

  while(iter.hasNext) 
    val item = iter.next()
    println("****")
    println(item.getClass)
    println(item.getAs("fileGid"))
    println("Schema: "+item.schema)
    println("String: "+item.toString())
    println("Seqnce: "+item.toSeq)

    // convert this item into an XXX format (like JSON)
    // write row to DB in the selected format
  
)

这会输出“类似的东西”(带有编辑): root |-- fileGid: string (nullable = true) |-- eventStruct: struct (nullable = false) | |-- eventIndex: integer (nullable = true) | |-- eventGid: string (nullable = true) | |-- eventType: string (nullable = true) |-- revisionStruct: struct (nullable = false) | |-- eventIndex: integer (nullable = true) | |-- eventGid: string (nullable = true) | |-- eventType: string (nullable = true)

并且(只有一个迭代项 - 已编辑,但希望语法也足够好)

**** class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema 12345 Schema: StructType(StructField(fileGid,StringType,true), StructField(eventStruct,StructType(StructField(eventIndex,IntegerType,true), StructField(eventGid,StringType,true), StructField(eventType,StringType,true)), StructField(revisionStruct,StructType(StructField(eventIndex,IntegerType,true), StructField(eventGid,StringType,true), StructField(eventType,StringType,true), StructField(editIndex,IntegerType,true)),false)) String: [12345,[1,4,edit],[1,4,revision]] Seqnce: WrappedArray(12345, [1,4,edit], [1,4,revision])

注意:我在 https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerPartition.scala 上做类似 val metric = iter.sum 的部分,但使用 DataFrames 代替。我也在关注http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning 上看到的“使用 foreachRDD 的设计模式”。

如何转换 org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema (见https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala) 将迭代项转换为易于写入(JSON 或 ...? - 我是开放的)到 PostgreSQL 中的东西。 (如果不是 JSON,请建议如何将此值读回 DataFrame 以供其他时间使用。)

【问题讨论】:

【参考方案1】:

好吧,我想出了一种不同的方法来解决这个问题。

val ltk = lastTail.select($"fileGid").rdd.map(fileGid => fileGid.toString)
val ltv = lastTail.toJSON
val kvPair = ltk.zip(ltv)

然后我会简单地遍历 RDD 而不是 DataFrame。

kvPair.foreachPartition(iter => 
  while(iter.hasNext) 
    val item = iter.next()
    println(item.getClass)
    println(item)
  
)

除了数据,我得到了class scala.Tuple2,这使得在 JDBC / PostgreSQL 中存储 KV 对成为一种更简单的方法。

我确信还有其他方法不是解决方法。

【讨论】:

甚至更好 - @zero323 向我指出了这个主题以改进我的答案的第一部分(即删除 zip) - ***.com/questions/36157810/spark-row-to-json

以上是关于用于 PostgreSQL 插入的 Spark Scala DataFrame 单行转换为 JSON的主要内容,如果未能解决你的问题,请参考以下文章

Postgresql:在某些类型的数字之间插入空格

如何将 DELETE 的返回值插入到 postgresql 中的 INSERT 中?

如何在postgresql插入查询中插入当前日期时间[重复]

PostgreSQL INSERT 插入一个枚举数组

PostgreSQL 9.3 触发函数以参数化名称插入表

7.PostgreSQL操作语句