写入数据帧时出错:java.lang.RuntimeException:scala.Tuple2 不是 struct<retailer:string,postcode:int> 架构的有效

Posted

技术标签:

【中文标题】写入数据帧时出错:java.lang.RuntimeException:scala.Tuple2 不是 struct<retailer:string,postcode:int> 架构的有效外部类型【英文标题】:Error in writing a dataFrame: java.lang.RuntimeException: scala.Tuple2 is not a valid external type for schema of struct<retailer:string,postcode:int> 【发布时间】:2019-08-08 07:33:24 【问题描述】:

我有一个数据集,我正在提取并应用一个特定的模式,然后再写成 json。

我的测试数据集如下所示:

cityID|retailer|postcode

123|a1|1

123|s1|2

123|d1|3

124|a1|4

124|s1|5

124|d1|6

我想按城市 ID 分组。然后我应用以下模式并将其放入数据框中。然后我想把数据写成json。我的代码如下:

按城市 ID 分组

val rdd1 = cridf.rdd.map(x=>(x(0).toString, (x(1).toString, x(2).toString))).groupByKey() 

将 RDD 映射到行

val final1 = rdd1.map(x=>Row(x._1,x._2.toList))

应用架构

val schema2 = new StructType()
.add("cityID", StringType)
.add("reads", ArrayType(new StructType()
.add("retailer", StringType)
.add("postcode", IntegerType)))

创建数据框

val parsedDF2 = spark.createDataFrame(final1, schema2)

写入 json 文件

parsedDF2.write.mode("overwrite")
.format("json")
.option("header", "false")
.save("/XXXX/json/testdata")

作业由于以下错误而中止:

java.lang.RuntimeException:编码时出错:

java.lang.RuntimeException: scala.Tuple2 不是结构模式的有效外部类型

【问题讨论】:

@JānisŠ。在 final1 x._2 中是零售商和邮政编码的列表 是的,我忽略了这一点。 【参考方案1】:

您可以直接从您的数据框中进行转换。给你:

   val rawData = spark.read.option("header", "true").option("delimiter", "|").csv("57407427.csv")

   import org.apache.spark.sql.functions._
   val readsDf = rawData.withColumn("reads",struct("retailer", "postcode")).drop("retailer", "postcode" )

   val finalJsonDf = readsDf.groupBy("cityID").agg(collect_list("reads").alias("reads"))

   finalJsonDf.printSchema() //for testing the schema

   finalJsonDf.coalesce(1).write.mode("overwrite")
     .format("json")
     .option("header", "false")
     .save("57407427_Op.json")

希望您也尝试写出相同的 json 输出:

 "cityID":"124","reads":["retailer":"a1","postcode":"4","retailer":"s1","postcode":"5","retailer":"d1","postcode":"6"]
 "cityID":"123","reads":["retailer":"a1","postcode":"1","retailer":"s1","postcode":"2","retailer":"d1","postcode":"3"]

【讨论】:

谢谢!我最终做了一些非常相似的事情,它给了我正确的结果:) 干杯【参考方案2】:

如果您无法避免使用 RDD,则可以使用案例类:

case class Read(retailer: String, postcode: Int)
case class Record(cityId: String, reads: List[Read])

...

val rdd1 = cridf.rdd
    .map(x => (x.head, Read(x(1), x(2).toInt)))
    .groupByKey

val final1 = rdd1
    .map(x => Record(x._1, x._2.toList))
    .toDF

final1
   .write
   .mode("overwrite")
   .format("json")
   .option("header", "false")
   .save("/XXXX/json/testdata")

final1 具有以下架构:

root
 |-- cityId: string (nullable = true)
 |-- reads: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- retailer: string (nullable = true)
 |    |    |-- postcode: integer (nullable = false)

不过,我认为@partha_devArch 解决方案要好得多。

更新

在您的代码中添加最少的内容并使用提供的架构,解决方案如下:

import org.apache.spark.sql.catalyst.encoders.RowEncoder

...

val rdd1 = cridf.rdd
    .map(x => (x.head, Row(x(1), x(2).toInt)))
    .groupByKey

val final1 = rdd1
    .map(x => Row(x._1, x._2.toList))(RowEncoder.apply(schema2).clsTag)

val parsedDF2 = spark.createDataFrame(final1, schema2)

parsedDF2
    .write
    .mode("overwrite")
    .format("json")
    .option("header", "false")
    .save("/XXXX/json/testdata")

【讨论】:

以上是关于写入数据帧时出错:java.lang.RuntimeException:scala.Tuple2 不是 struct<retailer:string,postcode:int> 架构的有效的主要内容,如果未能解决你的问题,请参考以下文章

启用 Java 8 时 Android Studio Ui 测试出错

在 Spark 中创建数据帧时出错

读取 csv 文件并在 python 中返回数据帧时出错

使用 pyspark 将 RDD 行转换为数据帧时出错

如何在使用转义或引号在pyspark中的文件中写入数据帧时获得完全匹配? [复制]

在 Tkinter 中切换帧时出错