将 spark 数据框映射列转换为 json
Posted
技术标签:
【中文标题】将 spark 数据框映射列转换为 json【英文标题】:Convert spark dataframe map column to json 【发布时间】:2018-04-11 16:27:31 【问题描述】:我有一个具有以下架构和示例记录的数据框
root
|-- name: string (nullable = true)
|-- matches: map (nullable = true)
| |-- key: string
| |-- value: integer (valueContainsNull = false)
+---------------+------------------------------------------------------------------------------------------+
|name |matches |
+---------------+------------------------------------------------------------------------------------------+
|CVS_Extra |Map(MLauer -> 1, MichaelBColeman -> 1, OhioFoodbanks -> 1, 700wlw -> 1, cityofdayton -> 1)|
我正在尝试使用以下代码(json4s 库)将地图类型列转换为 json:
val d = countDF.map( row => (row(0),convertMapToJSON(row(1).asInstanceOf[Map[String, Int]]).toString()))
但是失败了
java.lang.ClassNotFoundException: scala.Any
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at scala.reflect.runtime.JavaMirrors$JavaMirror.javaClass(JavaMirrors.scala:555)
at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1210)
at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1202)
at scala.reflect.runtime.TwoWayCaches$TwoWayCache$$anonfun$toJava$1.apply(TwoWayCaches.scala:50)
at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
at scala.reflect.runtime.TwoWayCaches$TwoWayCache.toJava(TwoWayCaches.scala:45)
at scala.reflect.runtime.JavaMirrors$JavaMirror.classToJava(JavaMirrors.scala:1202)
at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:194)
at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:54)
at org.apache.spark.sql.catalyst.ScalaReflection$.getClassFromType(ScalaReflection.scala:682)
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor(ScalaReflection.scala:84)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$10.apply(ScalaReflection.scala:614)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$10.apply(ScalaReflection.scala:607)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:344)
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:607)
at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:438)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:233)
at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:33)
Scala Version - 2.11, json4s-jackson_2.11 & spark 2.2.0
谁能建议如何克服这个错误。提前致谢。
【问题讨论】:
【参考方案1】:您的代码失败是因为您错误地使用了apply
方法。您应该使用例如:
countDF.map(row =>
(row.getString(0), convertMapToJSON(getMap[String, Int](1)).toString())
)
更多信息请见Spark extracting values from a Row。
但你只需要select
/ withColumn
和to_json
:
import org.apache.spark.sql.functions.to_json
countDF.withColumn("matches", to_json($"matches"))
如果您的函数使用更复杂的逻辑,请使用udf
import org.apache.spark.sql.functions.udf
val convert_map_to_json = udf(
(map: Map[String, Int]) => convertMapToJSON(map).toString
)
countDF.withColumn("matches", convert_map_to_json($"matches"))
【讨论】:
以上是关于将 spark 数据框映射列转换为 json的主要内容,如果未能解决你的问题,请参考以下文章
将带有 JSON 对象数组的 Spark 数据框列转换为多行
在 spark 和 scala 中,如何将数据框转换或映射到特定列信息?