Apache Spark 如何将新列从列表/数组附加到 Spark 数据帧

Posted

技术标签:

【中文标题】Apache Spark 如何将新列从列表/数组附加到 Spark 数据帧【英文标题】:Apache Spark how to append new column from list/array to Spark dataframe 【发布时间】:2017-06-06 17:03:55 【问题描述】:

我正在使用 Apache Spark 2.0 数据帧/数据集 API 我想从值列表中向我的数据框添加一个新列。我的列表具有与给定数据框相同数量的值。

val list = List(4,5,10,7,2)
val df   = List("a","b","c","d","e").toDF("row1")

我想做这样的事情:

val appendedDF = df.withColumn("row2",somefunc(list))
df.show()
// +----+------+
// |row1 |row2 |
// +----+------+
// |a    |4    |
// |b    |5    |
// |c    |10   |
// |d    |7    |
// |e    |2    |
// +----+------+

对于任何我会很高兴的想法,我的数据框实际上包含更多列。

【问题讨论】:

【参考方案1】:

你可以这样做:

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._    

// create rdd from the list
val rdd = sc.parallelize(List(4,5,10,7,2))
// rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[31] at parallelize at <console>:28

// zip the data frame with rdd
val rdd_new = df.rdd.zip(rdd).map(r => Row.fromSeq(r._1.toSeq ++ Seq(r._2)))
// rdd_new: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[33] at map at <console>:32

// create a new data frame from the rdd_new with modified schema
spark.createDataFrame(rdd_new, df.schema.add("new_col", IntegerType)).show
+----+-------+
|row1|new_col|
+----+-------+
|   a|      4|
|   b|      5|
|   c|     10|
|   d|      7|
|   e|      2|
+----+-------+

【讨论】:

【参考方案2】:

为了完整性而添加:输入list(存在于驱动程序内存中)与DataFrame 具有相同大小的事实表明这是一个小的DataFrame,因此您可以考虑collect()-使用它,使用list 压缩,并在需要时转换回DataFrame

df.collect()
  .map(_.getAs[String]("row1"))
  .zip(list).toList
  .toDF("row1", "row2")

这不会更快,但如果数据真的很小,它可能可以忽略不计,并且代码(可以说)更清晰。

【讨论】:

以上是关于Apache Spark 如何将新列从列表/数组附加到 Spark 数据帧的主要内容,如果未能解决你的问题,请参考以下文章

如何使用Java UDF将新列添加到Spark数据帧

如何使用 apache commons 将新列添加到 csv 文件

如何使用apache commons将新列添加到csv文件中

如何将新列和相应的行特定值添加到火花数据帧?

将 Pyspark Dataframe 列从数组转换为新列

如何向 pandas df 添加一个新列,该列从另一个数据帧返回同一组中更大的最小值