Spark DataFrame 映射错误

Posted

技术标签:

【中文标题】Spark DataFrame 映射错误【英文标题】:Spark DataFrame map error 【发布时间】:2016-12-05 14:09:20 【问题描述】:

对不起,我需要再问一个问题。我希望这个不要重复。我编辑了last one,但我认为没有人看到编辑后的版本。这是问题的一个简短示例:

val spark = SparkSession
.builder()
.appName("test")
.getOrCreate()

val field = StructField("1", BooleanType, false)
val schema = StructType(field::Nil)
val rowRDD = spark.sparkContext.parallelize(Array(Row(true),Row(false)))
val df = spark.createDataFrame(rowRDD, schema)

val new_df = //Add hundred of new columns

//here is the error
val df_2 = new_df.flatMap(row => if(test(row)) row::Nil else Nil)

错误:

error: Unable to find encoder for type stored in a Dataset.  
Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  
Support for serializing other types will be added in future releases.

我想要做的是,修改每一行。在这种情况下,我知道只有 1 列,我可以像 Encoder error while trying to map dataframe row to updated row 一样处理它。 但是,如果我有数百列,我该如何解决这个问题? 如果某些行不满足条件,我想删除它们。 目前我使用:

val df_2 = new_df.rdd.flatMap(row => if(test(row)) row::Nil else Nil)

但我不认为这是最好的解决方案。我也遇到了***Error

Exception in thread "main" java.lang.***Error
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)

求帮助:)

【问题讨论】:

我想使用 DF 因为我需要架构。 DF 是否有类似地图的功能?我想在某些条件下删除或扩展 DF 中的行。 【参考方案1】:

添加新列的 withColumn() 选项将适用于整个数据集。如果你有更多的列,它会让事情变得更糟。 您可以使用 Spark SQL 并使用 SQL 样式的查询来添加新列。这将需要更多的 sql 技能而不仅仅是 spark。而且有 100 列,维护起来可能会很困难。

您可以采用另一种方法。

您可以将 rdd 转换为数据框。然后在数据框上使用 map 并根据需要处理每一行。内部map方法,

一个。您可以根据计算收集新值

b.将这些新列值添加到主 rdd,如下所示

val newColumns: Seq[Any] = Seq(newcol1,newcol2)
Row.fromSeq(row.toSeq.init ++ newColumns)

这里的row,是map方法中row的引用

c。如下创建新架构

val newColumnsStructType = StructTypeSeq(new StructField("newcolName1",IntegerType),new StructField("newColName2", IntegerType))

d。添加到旧架构

val newSchema = StructType(mainDataFrame.schema.init ++ newColumnsStructType)

e。使用新列创建新数据框

val newDataFrame = sqlContext.createDataFrame(newRDD, newSchema)

【讨论】:

感谢您的回答,但是如何在 DataFrame 上使用具有很多列的 map() ?我收到上面的错误。我所有的列都是布尔值。 您的问题是指向现有数据框添加新列和更多列。所以上述步骤会有所帮助 问题是,如何使用具有数百列的 map() 而不会出现错误?

以上是关于Spark DataFrame 映射错误的主要内容,如果未能解决你的问题,请参考以下文章

Spark Dataframe 映射函数

如何在 Spark 中对嵌套的 Dataframe 进行平面映射

Spark SQL仅映射一列DataFrame

如何使用 JSON 映射文件在 Spark 中使用 Scala 生成新的 DataFrame

JDBC的ResultSet游标转spark的DataFrame,数据类型的映射以TeraData数据库为例

在不使用 UDF 的情况下基于映射转换 Spark DataFrame 中的列