Spark Scala:将任意 N 列转换为 Map

Posted

技术标签:

【中文标题】Spark Scala:将任意 N 列转换为 Map【英文标题】:Spark Scala: convert arbitrary N columns into Map 【发布时间】:2018-03-31 14:30:39 【问题描述】:

我有以下数据结构,表示电影 ID(第一列)和其余列中不同用户对该电影的评分 - 类似这样:

+-------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|movieId|   1|   2|   3|   4|   5|   6|   7|   8|   9|  10|  11|  12|  13|  14|  15|
+-------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|   1580|null|null| 3.5| 5.0|null|null|null|null|null|null|null|null|null|null|null|
|   3175|null|null|null|null|null|null|null|null|null|null|null|null|null| 5.0|null|
|   3794|null|null|null|null|null|null|null|null|null|null|null| 3.0|null|null|null|
|   2659|null|null|null| 3.0|null|null|null|null|null|null|null|null|null|null|null|

我想把这个DataFrame转成一个DataSet

final case class MovieRatings(movie_id: Long, ratings: Map[Long, Double])

所以它会是这样的

[1580, [1 -> null, 2 -> null, 3 -> 3.5, 4 -> 5.0, 5 -> null, 6 -> null, 7 -> null,...]]

等等

如何做到这一点?

这里的问题是用户数量是任意的。我想将它们压缩到一个单独的列中,使第一列保持不变。

【问题讨论】:

Spark 2.0 - Convert DataFrame to DataSet的可能重复 我不认为这是重复的,因为这个问题是我该怎么做,而那个问题是我正在尝试这样做但它不起作用,哦等等,我需要升级火花。这个问题要求教程,因此是题外话。 【参考方案1】:

首先,您必须将 DataFrame 转换为与您的案例类匹配的架构,然后您可以使用 .as[MovieRatings] 将 DataFrame 转换为 Dataset[MovieRatings]

import org.apache.spark.sql.functions._
import spark.implicits._

// define a new MapType column using `functions.map`, passing a flattened-list of
// column name (as a Long column) and column value
val mapColumn: Column = map(df.columns.tail.flatMap(name => Seq(lit(name.toLong), $"$name")): _*)

// select movie id and map column with names matching the case class, and convert to Dataset:
df.select($"movieId" as "movie_id", mapColumn as "ratings")
  .as[MovieRatings]
  .show(false)

【讨论】:

【参考方案2】:

您可以使用 spark.sql.functions.map 从任意列创建映射。它期望在键和值之间交替的序列,可以是列类型或字符串。这是一个例子:

import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions

case class Input(movieId: Int, a: Option[Double], b: Option[Double], c: Option[Double])

val data = Input(1, None, Option(3.5), Option(1.4)) :: 
        Input(2, Option(4.2), Option(1.34), None) :: 
        Input(3, Option(1.11), None, Option(3.32)) :: Nil

val df = sc.parallelize(data).toDF

// Exclude the PK column from the map
val mapKeys = df.columns.filterNot(_ == "movieId")

// Build the sequence of key, value, key, value, ..
val pairs = mapKeys.map(k => Seq(lit(k), col(k))).flatten

val mapped = df.select($"movieId", functions.map(pairs:_*) as "map")
mapped.show(false) 

产生这个输出:

+-------+------------------------------------+
|movieId|map                                 |
+-------+------------------------------------+
|1      |Map(a -> null, b -> 3.5, c -> 1.4)  |
|2      |Map(a -> 4.2, b -> 1.34, c -> null) |
|3      |Map(a -> 1.11, b -> null, c -> 3.32)|
+-------+------------------------------------+

【讨论】:

以上是关于Spark Scala:将任意 N 列转换为 Map的主要内容,如果未能解决你的问题,请参考以下文章

使用 Scala 将多列转换为 Spark Dataframe 上的一列地图

将 spark.sql 查询转换为 spark/scala 查询

在 spark 和 scala 中,如何将数据框转换或映射到特定列信息?

如何在 Spark 2 Scala 中将 Row 转换为 json

如何使用scala将特定函数转换为apache spark中的udf函数? [复制]

SPARK 数据框错误:在使用 UDF 拆分列中的字符串时无法转换为 scala.Function2