将带有数组的 RDD 转换为 DataFrame

Posted

技术标签:

【中文标题】将带有数组的 RDD 转换为 DataFrame【英文标题】:Converting RDD with an Array to DataFrame 【发布时间】:2017-04-12 14:52:28 【问题描述】:

我有以下 RDD

RDD[(Long, Array[(Long, Double)])]

我想把它转换成一个数据框。 我正在使用以下代码

val aStruct = new StructType(Array(
    StructField("id", LongType,nullable = true),
    StructField("neighbors",ArrayType(
        StructType(Array(
            StructField("nid", LongType),
            StructField("distance", DoubleType)
    ))),nullable = true)))
val rowRDD = neighbors.map(p => Row(p._1, p._2))
val neighborsDF = sqlContext.createDataFrame(rowRDD,aStruct)

这编译正确,但给我一个运行时错误

 Error while encoding: java.lang.RuntimeException: scala.Tuple2$mcJD$sp is not a valid external type for schema of struct<nid:bigint,distance:double>

我的架构不合适吗?

我也试过了

import spark.implicits._
val neighborsDF = neighbors.toDF()

但为此我得到以下运行时错误

Exception in thread "main" java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaMirrors$JavaMirror;

在我调用 toDF() 的那一行

我在这里做错了什么? (我期待很多:p)

所以我确实理解了这个问题,我在 RDD 中有一个元组数组,但我似乎找不到 Spark SQL 模式的元组类型

【问题讨论】:

第二个问题是因为编译和运行时的 Scala 版本不匹配和/或 Scala 版本的库 @T.Gawęda 实际上你是对的,这个问题确实存在。在我的 build.sbt 中,我安装了 scala 版本 2.10.5,而我的系统上安装了 scala 版本 2.12.1。我刚刚下载了 2.10.5 并设置了我的路径并通过 scala -version 检查,它是 2.10.5。但我仍然遇到同样的错误 还要检查 Spark 版本,如果名称中包含 2.11,它是为 Scala 2.11 编译的。 Spark 2.x 默认基于 Scala 2.11 @T.Gawęda Spark 2.10 使用 Scala 版本 2.11.8。但是当我在 build.sbt 中使用 2.11.8 并使用 sbt compile 进行编译时。我的代码中有两个错误......我猜是时候调试了 @T.Gawęda 所以问题是,我正在使用 N 个最近邻居的库。他们使用 scala 版本 2.10.5。我尝试在 build.sbt 中使用 scala 版本 2.11.8 编译他们的代码,它给出了错误。我似乎无法调试它。所以是的,这是一个死胡同 【参考方案1】:

正如@T.Gawęda 指出的那样,我使用单独版本的 Scala 进行编译 (2.10.6),并且在运行时预构建的 Spark 2.1.0 附带 Scala 版本 2.11.8。这导致了以下错误 Exception in thread "main" java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaMirrors$JavaMirror;

使用时

neighborsDF = neighbors.toDF()

所以我必须用正确版本的 scala 编译我的代码,但是由于我正在使用的库(spark-neighbors)在使用 Scala 版本 2.11.8 时会引发编译错误,所以我的项目更新 scala 版本并不是真的可能的。 所以我决定使用链接Building for Scala 2.10中的指南使用Scala 2.10.6 构建一个Spark 版本@

所以现在我的 Spark 使用 Scala 版本 2.10.6 运行,我使用相同的版本使用 sbt 编译我的代码。因此,RDD 被转换为数据帧而没有任何错误。 希望它也能帮助其他人

【讨论】:

以上是关于将带有数组的 RDD 转换为 DataFrame的主要内容,如果未能解决你的问题,请参考以下文章

将带有嵌套标签的 XML 读入 Spark RDD,并转换为 JSON

如何在 Scala 中将 RDD 转换为二维数组?

如何在pyspark中将rdd行转换为带有json结构的数据框?

将 RDD 转换为 DataFrame Spark Streaming 时的 ClassCastException

Spark使用类将rdd转换为数据框

DataFrame:将列内的数组转换为 RDD[Array[String]]