Spark DataFrame 映射错误
Posted
技术标签:
【中文标题】Spark DataFrame 映射错误【英文标题】:Spark DataFrame map error 【发布时间】:2016-12-05 14:09:20 【问题描述】:对不起,我需要再问一个问题。我希望这个不要重复。我编辑了last one,但我认为没有人看到编辑后的版本。这是问题的一个简短示例:
val spark = SparkSession
.builder()
.appName("test")
.getOrCreate()
val field = StructField("1", BooleanType, false)
val schema = StructType(field::Nil)
val rowRDD = spark.sparkContext.parallelize(Array(Row(true),Row(false)))
val df = spark.createDataFrame(rowRDD, schema)
val new_df = //Add hundred of new columns
//here is the error
val df_2 = new_df.flatMap(row => if(test(row)) row::Nil else Nil)
错误:
error: Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._
Support for serializing other types will be added in future releases.
我想要做的是,修改每一行。在这种情况下,我知道只有 1 列,我可以像 Encoder error while trying to map dataframe row to updated row 一样处理它。 但是,如果我有数百列,我该如何解决这个问题? 如果某些行不满足条件,我想删除它们。 目前我使用:
val df_2 = new_df.rdd.flatMap(row => if(test(row)) row::Nil else Nil)
但我不认为这是最好的解决方案。我也遇到了***Error:
Exception in thread "main" java.lang.***Error
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)
求帮助:)
【问题讨论】:
我想使用 DF 因为我需要架构。 DF 是否有类似地图的功能?我想在某些条件下删除或扩展 DF 中的行。 【参考方案1】:添加新列的 withColumn() 选项将适用于整个数据集。如果你有更多的列,它会让事情变得更糟。 您可以使用 Spark SQL 并使用 SQL 样式的查询来添加新列。这将需要更多的 sql 技能而不仅仅是 spark。而且有 100 列,维护起来可能会很困难。
您可以采用另一种方法。
您可以将 rdd 转换为数据框。然后在数据框上使用 map 并根据需要处理每一行。内部map方法,
一个。您可以根据计算收集新值
b.将这些新列值添加到主 rdd,如下所示
val newColumns: Seq[Any] = Seq(newcol1,newcol2)
Row.fromSeq(row.toSeq.init ++ newColumns)
这里的row,是map方法中row的引用
c。如下创建新架构
val newColumnsStructType = StructTypeSeq(new StructField("newcolName1",IntegerType),new StructField("newColName2", IntegerType))
d。添加到旧架构
val newSchema = StructType(mainDataFrame.schema.init ++ newColumnsStructType)
e。使用新列创建新数据框
val newDataFrame = sqlContext.createDataFrame(newRDD, newSchema)
【讨论】:
感谢您的回答,但是如何在 DataFrame 上使用具有很多列的 map() ?我收到上面的错误。我所有的列都是布尔值。 您的问题是指向现有数据框添加新列和更多列。所以上述步骤会有所帮助 问题是,如何使用具有数百列的 map() 而不会出现错误?以上是关于Spark DataFrame 映射错误的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Spark 中对嵌套的 Dataframe 进行平面映射
如何使用 JSON 映射文件在 Spark 中使用 Scala 生成新的 DataFrame