[Scala][Spark]: transform a column in dataframe, keeping other columns, using withColumn and map [er
Posted
技术标签:
【中文标题】[Scala][Spark]: transform a column in dataframe, keeping other columns, using withColumn and map [error: missing parameter type]【英文标题】: 【发布时间】:2021-11-30 04:48:01 【问题描述】:我有数据框 df
| name| languagesAtSchool|currentState|
+----------------+------------------+------------+
| James,,Smith|[Java, Scala, C++]| CA|
| Michael,Rose,|[Spark, Java, C++]| NJ|
|Robert,,Williams| [CSharp, VB, R]| NV|
+----------------+------------------+------------+
我想要
+----------------+--------+-----+
|Name |language|State|
+----------------+--------+-----+
|James,,Smith |Java |CA |
|James,,Smith |Scala |CA |
|James,,Smith |C++ |CA |
|Michael,Rose, |Spark |NJ |
|Michael,Rose, |Java |NJ |
|Michael,Rose, |C++ |NJ |
|Robert,,Williams|CSharp |NV |
|Robert,,Williams|VB |NV |
|Robert,,Williams|R |NV |
+----------------+--------+-----+
我已经尝试了以下完美的工作
val df2=df.flatMap(f=> f.getSeq[String](1).map((f.getString(0),_,f.getString(2))))
.toDF("Name","language","State")
但我希望在不指定要保留的其他列的情况下工作,因此我尝试了
val df2 = df.withColumn("laguage", df.flatMap(f=>f.getSeq[String](1)))
然后它给出
Unknown Error: <console>:40: error: missing parameter type
val df3 = df.withColumn("laguage", df.flatMap(f=>f.getSeq[String](1)))
^
因此,我希望 Spark 中的某些内容可以在不丢弃其他列的情况下转换列。 我猜原因是 scala 无法确定类型,但我无法修复它。 我是 scala 的新手,感谢您的帮助!
【问题讨论】:
【参考方案1】:你要找的方法是explode
:
def explode(e: Column): 列 为给定数组或映射列中的每个元素创建一个新行。除非另有说明,否则对数组中的元素使用默认列名 col,对映射中的元素使用键和值。 自从 1.3.0
df.withColumn("language", explode(col("language"))
【讨论】:
【参考方案2】:explode
正好适用于这种情况 - 它拆分数组列,因此列表中的每个元素都将位于单独的行中。
这是一个完整的输出示例:
package org.example
import org.apache.spark.sql._
import org.apache.spark.sql.functions.col, explode
import org.apache.spark.sql.types.ArrayType, StringType, StructType
object App
def main(args: Array[String]): Unit =
val spark: SparkSession = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
import spark.implicits._
// create dataframe with test data
val data = Seq(
Row("James,,Smith", List("java", "scala"), "ca"),
Row("Robert,,Williams", List("c", "c++"), "nv")
)
val schema = new StructType()
.add("name", StringType)
.add("languages", ArrayType(StringType))
.add("current_state", StringType)
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
df.show(false)
// +----------------+-------------+-------------+
// |name |languages |current_state|
// +----------------+-------------+-------------+
// |James,,Smith |[java, scala]|ca |
// |Robert,,Williams|[c, c++] |nv |
// +----------------+-------------+-------------+
// use explode to split the array values into different rows
df.withColumn("language", explode(col("languages"))).drop("languages").show()
// +----------------+-------------+--------+
// | name|current_state|language|
// +----------------+-------------+--------+
// | James,,Smith| ca| java|
// | James,,Smith| ca| scala|
// |Robert,,Williams| nv| c|
// |Robert,,Williams| nv| c++|
// +----------------+-------------+--------+
【讨论】:
以上是关于[Scala][Spark]: transform a column in dataframe, keeping other columns, using withColumn and map [er的主要内容,如果未能解决你的问题,请参考以下文章