Spark DataFrame 映射错误

Posted

技术标签:

【中文标题】Spark DataFrame 映射错误【英文标题】:Spark DataFrame map error 【发布时间】:2016-12-05 14:09:20 【问题描述】:

对不起,我需要再问一个问题。我希望这个不要重复。我编辑了last one,但我认为没有人看到编辑后的版本。这是问题的一个简短示例:

val spark = SparkSession
.builder()
.appName("test")
.getOrCreate()

val field = StructField("1", BooleanType, false)
val schema = StructType(field::Nil)
val rowRDD = spark.sparkContext.parallelize(Array(Row(true),Row(false)))
val df = spark.createDataFrame(rowRDD, schema)

val new_df = //Add hundred of new columns

//here is the error
val df_2 = new_df.flatMap(row => if(test(row)) row::Nil else Nil)

错误:

error: Unable to find encoder for type stored in a Dataset.  
Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  
Support for serializing other types will be added in future releases.

我想要做的是,修改每一行。在这种情况下,我知道只有 1 列,我可以像 Encoder error while trying to map dataframe row to updated row 一样处理它。 但是,如果我有数百列,我该如何解决这个问题? 如果某些行不满足条件,我想删除它们。 目前我使用:

val df_2 = new_df.rdd.flatMap(row => if(test(row)) row::Nil else Nil)

但我不认为这是最好的解决方案。我也遇到了***Error

Exception in thread "main" java.lang.***Error
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)

求帮助:)

【问题讨论】:

我想使用 DF 因为我需要架构。 DF 是否有类似地图的功能?我想在某些条件下删除或扩展 DF 中的行。 【参考方案1】:

添加新列的 withColumn() 选项将适用于整个数据集。如果你有更多的列,它会让事情变得更糟。 您可以使用 Spark SQL 并使用 SQL 样式的查询来添加新列。这将需要更多的 sql 技能而不仅仅是 spark。而且有 100 列,维护起来可能会很困难。

您可以采用另一种方法。

您可以将 rdd 转换为数据框。然后在数据框上使用 map 并根据需要处理每一行。内部map方法,

一个。您可以根据计算收集新值

b.将这些新列值添加到主 rdd,如下所示

val newColumns: Seq[Any] = Seq(newcol1,newcol2)
Row.fromSeq(row.toSeq.init ++ newColumns)

这里的row,是map方法中row的引用

c。如下创建新架构

val newColumnsStructType = StructTypeSeq(new StructField("newcolName1",IntegerType),new StructField("newColName2", IntegerType))

d。添加到旧架构

val newSchema = StructType(mainDataFrame.schema.init ++ newColumnsStructType)

e。使用新列创建新数据框

val newDataFrame = sqlContext.createDataFrame(newRDD, newSchema)

【讨论】:

感谢您的回答,但是如何在 DataFrame 上使用具有很多列的 map() ?我收到上面的错误。我所有的列都是布尔值。 您的问题是指向现有数据框添加新列和更多列。所以上述步骤会有所帮助 问题是,如何使用具有数百列的 map() 而不会出现错误?

以上是关于Spark DataFrame 映射错误的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spark 中对嵌套的 Dataframe 进行平面映射

Spark SQL仅映射一列DataFrame

如何使用 JSON 映射文件在 Spark 中使用 Scala 生成新的 DataFrame

JDBC的ResultSet游标转spark的DataFrame,数据类型的映射以TeraData数据库为例

在不使用 UDF 的情况下基于映射转换 Spark DataFrame 中的列

Spark Dataframe API 选择多个列,将它们映射到一个固定的集合,然后联合所有