将行值转换为火花数据框中的列数组

Posted

技术标签:

【中文标题】将行值转换为火花数据框中的列数组【英文标题】:Converting row values into a column array in spark dataframe 【发布时间】:2016-03-31 11:33:23 【问题描述】:

我正在处理 spark 数据帧,我需要对列进行分组,并将分组行的列值转换为元素数组作为新列。 示例:

Input:

employee | Address
------------------
Micheal  |  NY
Micheal  |  NJ

Output:

employee | Address
------------------
Micheal  | (NY,NJ)

非常感谢任何帮助。!

【问题讨论】:

似乎您可以使用 groupByKey 来获取您想要的内容,这将为您提供 [Address] 的 Iterable。 @Manas 这是我得到的错误 groupByKey is not a member of org.apache.spark.sql.DataFrame 向我们展示您的代码..... 嗨@vds说你的数据框是输入=>你写Input.groupBy(Input.col("employee"))。您可以查看 spark API 参考。 collect_list() 【参考方案1】:

这是另一种解决方案 我已将数据帧转换为 rdd 以进行转换,并使用 sqlContext.createDataFrame() 将其转换回数据帧

示例.json

"employee":"Michale","Address":"NY"
"employee":"Michale","Address":"NJ"
"employee":"Sam","Address":"NY"
"employee":"Max","Address":"NJ"

Spark 应用程序

val df = sqlContext.read.json("sample.json")

// Printing the original Df
df.show()

//Defining the Schema for the aggregated DataFrame
val dataSchema = new StructType(
  Array(
    StructField("employee", StringType, nullable = true),
    StructField("Address", ArrayType(StringType, containsNull = true), nullable = true)
  )
)
// Converting the df to rdd and performing the groupBy operation
val aggregatedRdd: RDD[Row] = df.rdd.groupBy(r =>
          r.getAs[String]("employee")
        ).map(row =>
          // Mapping the Grouped Values to a new Row Object
          Row(row._1, row._2.map(_.getAs[String]("Address")).toArray)
        )

// Creating a DataFrame from the aggregatedRdd with the defined Schema (dataSchema)
val aggregatedDf = sqlContext.createDataFrame(aggregatedRdd, dataSchema)

// Printing the aggregated Df
aggregatedDf.show()

输出:

+-------+--------+---+
|Address|employee|num|
+-------+--------+---+
|     NY| Michale|  1|
|     NJ| Michale|  2|
|     NY|     Sam|  3|
|     NJ|     Max|  4|
+-------+--------+---+

+--------+--------+
|employee| Address|
+--------+--------+
|     Sam|    [NY]|
| Michale|[NY, NJ]|
|     Max|    [NJ]|
+--------+--------+

【讨论】:

这个答案还可以,但是使用 RDD API 比使用 DataFrame API 慢很多(由于缺少查询优化器和钨)【参考方案2】:

如果您使用的是 Spark 2.0+,则可以使用 collect_listcollect_set。 您的查询将类似于(假设您的数据框称为 input):

import org.apache.spark.sql.functions._

input.groupBy('employee).agg(collect_list('Address))

如果您可以重复使用,请使用collect_list。如果您对重复项不满意并且只需要列表中的唯一项,请使用collect_set

希望这会有所帮助!

【讨论】:

以上是关于将行值转换为火花数据框中的列数组的主要内容,如果未能解决你的问题,请参考以下文章

将预定义的数字分配给数据框中的列行值

使用 scala 使用布尔运算折叠火花数据框中的列

使用 pyspark 将 Spark 数据框中的列转换为数组 [重复]

折叠火花数据框中的列值

在火花中比较两个数据框中的列

将数组和元组元素转换为 Pandas 数据框中的列 [重复]