将 spark 数据框映射列转换为 json

Posted

技术标签:

【中文标题】将 spark 数据框映射列转换为 json【英文标题】:Convert spark dataframe map column to json 【发布时间】:2018-04-11 16:27:31 【问题描述】:

我有一个具有以下架构和示例记录的数据框

root
 |-- name: string (nullable = true)
 |-- matches: map (nullable = true)
 |    |-- key: string
 |    |-- value: integer (valueContainsNull = false)

+---------------+------------------------------------------------------------------------------------------+
|name           |matches                                                                                   |
+---------------+------------------------------------------------------------------------------------------+
|CVS_Extra      |Map(MLauer -> 1, MichaelBColeman -> 1, OhioFoodbanks -> 1, 700wlw -> 1, cityofdayton -> 1)|

我正在尝试使用以下代码(json4s 库)将地图类型列转换为 json:

val d = countDF.map( row => (row(0),convertMapToJSON(row(1).asInstanceOf[Map[String, Int]]).toString()))

但是失败了

java.lang.ClassNotFoundException: scala.Any
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.javaClass(JavaMirrors.scala:555)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1210)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1202)
    at scala.reflect.runtime.TwoWayCaches$TwoWayCache$$anonfun$toJava$1.apply(TwoWayCaches.scala:50)
    at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
    at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
    at scala.reflect.runtime.TwoWayCaches$TwoWayCache.toJava(TwoWayCaches.scala:45)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.classToJava(JavaMirrors.scala:1202)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:194)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:54)
    at org.apache.spark.sql.catalyst.ScalaReflection$.getClassFromType(ScalaReflection.scala:682)
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor(ScalaReflection.scala:84)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$10.apply(ScalaReflection.scala:614)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$10.apply(ScalaReflection.scala:607)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.immutable.List.flatMap(List.scala:344)
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:607)
    at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:438)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
    at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
    at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:233)
    at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:33)

Scala Version - 2.11, json4s-jackson_2.11  & spark 2.2.0

谁能建议如何克服这个错误。提前致谢。

【问题讨论】:

【参考方案1】:

您的代码失败是因为您错误地使用了apply 方法。您应该使用例如:

countDF.map(row => 
  (row.getString(0), convertMapToJSON(getMap[String, Int](1)).toString())
)

更多信息请见Spark extracting values from a Row。

但你只需要select / withColumnto_json

import org.apache.spark.sql.functions.to_json

countDF.withColumn("matches", to_json($"matches"))

如果您的函数使用更复杂的逻辑,请使用udf

import org.apache.spark.sql.functions.udf

val convert_map_to_json = udf(
  (map: Map[String, Int]) => convertMapToJSON(map).toString
)

countDF.withColumn("matches", convert_map_to_json($"matches"))

【讨论】:

以上是关于将 spark 数据框映射列转换为 json的主要内容,如果未能解决你的问题,请参考以下文章

将带有 JSON 对象数组的 Spark 数据框列转换为多行

在 spark 和 scala 中,如何将数据框转换或映射到特定列信息?

通过将键作为列将 json 字典转换为 spark 数据帧

使用 pyspark 将 Spark 数据框中的列转换为数组 [重复]

Spark 数据框将嵌套的 JSON 转换为单独的列

如何将StructType从Spark中的json数据框分解为行而不是列