[Scala][Spark]: transform a column in dataframe, keeping other columns, using withColumn and map [er

Posted

技术标签:

【中文标题】[Scala][Spark]: transform a column in dataframe, keeping other columns, using withColumn and map [error: missing parameter type]【英文标题】: 【发布时间】:2021-11-30 04:48:01 【问题描述】:

我有数据框 df

|            name| languagesAtSchool|currentState|
+----------------+------------------+------------+
|    James,,Smith|[Java, Scala, C++]|          CA|
|   Michael,Rose,|[Spark, Java, C++]|          NJ|
|Robert,,Williams|   [CSharp, VB, R]|          NV|
+----------------+------------------+------------+

我想要


+----------------+--------+-----+
|Name            |language|State|
+----------------+--------+-----+
|James,,Smith    |Java    |CA   |
|James,,Smith    |Scala   |CA   |
|James,,Smith    |C++     |CA   |
|Michael,Rose,   |Spark   |NJ   |
|Michael,Rose,   |Java    |NJ   |
|Michael,Rose,   |C++     |NJ   |
|Robert,,Williams|CSharp  |NV   |
|Robert,,Williams|VB      |NV   |
|Robert,,Williams|R       |NV   |
+----------------+--------+-----+

我已经尝试了以下完美的工作

val df2=df.flatMap(f=> f.getSeq[String](1).map((f.getString(0),_,f.getString(2))))
    .toDF("Name","language","State")

但我希望在不指定要保留的其他列的情况下工作,因此我尝试了

val df2 = df.withColumn("laguage", df.flatMap(f=>f.getSeq[String](1)))

然后它给出

Unknown Error: <console>:40: error: missing parameter type
       val df3 = df.withColumn("laguage", df.flatMap(f=>f.getSeq[String](1)))
                                                     ^

因此,我希望 Spark 中的某些内容可以在不丢弃其他列的情况下转换列。 我猜原因是 scala 无法确定类型,但我无法修复它。 我是 scala 的新手,感谢您的帮助!

【问题讨论】:

【参考方案1】:

你要找的方法是explode:

def explode(e: Column): 列 为给定数组或映射列中的每个元素创建一个新行。除非另有说明,否则对数组中的元素使用默认列名 col,对映射中的元素使用键和值。 自从 1.3.0

df.withColumn("language", explode(col("language"))

【讨论】:

【参考方案2】:

explode 正好适用于这种情况 - 它拆分数组列,因此列表中的每个元素都将位于单独的行中。

这是一个完整的输出示例:

package org.example

import org.apache.spark.sql._
import org.apache.spark.sql.functions.col, explode
import org.apache.spark.sql.types.ArrayType, StringType, StructType


object App 

  def main(args: Array[String]): Unit = 
    val spark: SparkSession = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
    import spark.implicits._

    // create dataframe with test data
    val data = Seq(
      Row("James,,Smith", List("java", "scala"), "ca"),
      Row("Robert,,Williams", List("c", "c++"), "nv")
    )

    val schema = new StructType()
      .add("name", StringType)
      .add("languages", ArrayType(StringType))
      .add("current_state", StringType)

    val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)

    df.show(false)
//    +----------------+-------------+-------------+
//    |name            |languages    |current_state|
//    +----------------+-------------+-------------+
//    |James,,Smith    |[java, scala]|ca           |
//    |Robert,,Williams|[c, c++]     |nv           |
//    +----------------+-------------+-------------+

    // use explode to split the array values into different rows
    df.withColumn("language", explode(col("languages"))).drop("languages").show()

//    +----------------+-------------+--------+
//    |            name|current_state|language|
//    +----------------+-------------+--------+
//    |    James,,Smith|           ca|    java|
//    |    James,,Smith|           ca|   scala|
//    |Robert,,Williams|           nv|       c|
//    |Robert,,Williams|           nv|     c++|
//    +----------------+-------------+--------+

  

【讨论】:

以上是关于[Scala][Spark]: transform a column in dataframe, keeping other columns, using withColumn and map [er的主要内容,如果未能解决你的问题,请参考以下文章

Spark IMF传奇行动第17课Transformations实战总结

如何成为云计算大数据Spark高手

如何在 MLlib 中编写自定义 Transformer?

Spark——传递函数与闭包

scala为什么要清理闭包

等效于 Scala Dataset#transform 方法的 Pyspark 转换方法