spark为每组动态创建struct/json

Posted

技术标签:

【中文标题】spark为每组动态创建struct/json【英文标题】:spark dynamically create struct/json per group 【发布时间】:2018-09-29 19:19:18 【问题描述】:

我有一个类似的 spark 数据框

+-----+---+---+---+------+
|group|  a|  b|  c|config|
+-----+---+---+---+------+
|    a|  1|  2|  3|   [a]|
|    b|  2|  3|  4|[a, b]|
+-----+---+---+---+------+
val df = Seq(("a", 1, 2, 3, Seq("a")),("b", 2, 3,4, Seq("a", "b"))).toDF("group", "a", "b","c", "config")

如何添加额外的列,即

df.withColumn("select_by_config", <<>>).show

作为一个结构或 JSON,它在类似于名为 struct / spark struct / json 列的配置单元中组合了许多列(由config 指定)?请注意,此结构是每个组特定的,而不是整个数据帧的常量;它在config 列中指定。

我可以想象df.map 可以解决问题,但序列化开销似乎并不高效。这如何通过仅 SQL 表达式来实现?也许作为一个地图类型的列?

编辑

2.2 的一个可能但非常笨拙的解决方案是:

val df = Seq((1,"a", 1, 2, 3, Seq("a")),(2, "b", 2, 3,4, Seq("a", "b"))).toDF("id", "group", "a", "b","c", "config")
  df.show
  import spark.implicits._
  final case class Foo(id:Int, c1:Int, specific:Map[String, Int])
  df.map(r => 
    val config = r.getAs[Seq[String]]("config")
    print(config)
    val others = config.map(elem => (elem, r.getAs[Int](elem))).toMap
    Foo(r.getAs[Int]("id"), r.getAs[Int]("c"), others)
  ).show

有没有更好的方法来解决 2.2 的问题?

【问题讨论】:

【参考方案1】:

如果您使用最近的版本(Spark 2.4.0 RC 1 或更高版本),则高阶函数的组合应该可以解决问题。创建列映射:

import org.apache.spark.sql.functions.
  array, col, expr, lit, map_from_arrays, map_from_entries


val cols = Seq("a", "b", "c")

val dfm = df.withColumn(
  "cmap", 
  map_from_arrays(array(cols map lit: _*), array(cols map col: _*))
)

transformconfig

dfm.withColumn(
  "config_mapped",
   map_from_entries(expr("transform(config, k -> struct(k, cmap[k]))"))
).show

// +-----+---+---+---+------+--------------------+----------------+
// |group|  a|  b|  c|config|                cmap|   config_mapped|
// +-----+---+---+---+------+--------------------+----------------+
// |    a|  1|  2|  3|   [a]|[a -> 1, b -> 2, ...|        [a -> 1]|
// |    b|  2|  3|  4|[a, b]|[a -> 2, b -> 3, ...|[a -> 2, b -> 3]|
// +-----+---+---+---+------+--------------------+----------------+

【讨论】:

以上是关于spark为每组动态创建struct/json的主要内容,如果未能解决你的问题,请参考以下文章

如何动态地将行转换为列,并为每列使用不同的列名

在 spark Dataframe 中动态创建多列

在 Spark Scala 中动态创建数据帧

Spark Scala动态创建可序列化对象

PowerPivot DAX - 每组动态排名(每组最小值)

Spark 中动态分区的 LeaseExpiredException