Spark:行到列(如转置或枢轴)

Posted

技术标签:

【中文标题】Spark:行到列(如转置或枢轴)【英文标题】:Spark: Rows to Columns (like transpose or pivot) 【发布时间】:2016-10-01 17:41:13 【问题描述】:

如何使用 RDD 或数据框将行转置为列。

SessionId,date,orig, dest, legind, nbr

1   9/20/16,abc0,xyz0,o,1
1   9/20/16,abc1,xyz1,o,2
1   9/20/16,abc2,xyz2,i,3
1   9/20/16,abc3,xyz3,i,4

所以我想生成新的架构,例如:

SessionId,date,orig1, orig2, orig3, orig4, dest1, dest2, dest3,dest4

1,9/20/16,abc0,abc1,null, null, xyz0,xyz1, null, null

逻辑是如果:

nbr 为 1,legind = o 然后 orig1 值(从第 1 行获取)...

nbr 为 3,legind = i,然后是 dest1 值(从第 3 行获取)

那么如何将行转置为列...

任何想法将不胜感激。

尝试使用以下选项,但它只是在单行中全部变平..

val keys = List("SessionId");
val selectFirstValueOfNoneGroupedColumns =
  df.columns
    .filterNot(keys.toSet)
    .map(_ -> "first").toMap
val grouped =
  df.groupBy(keys.head, keys.tail: _*)
    .agg(selectFirstValueOfNoneGroupedColumns).show()

【问题讨论】:

欢迎来到 SO。请花一些时间在help 上,了解您可以如何成熟地提出问题并巧妙地格式化 是的,它很简单,而且已经有了答案!按照我提供的链接。 @TheArchetypalPaul :您提供的链接,这是不同的问题和方法。 啊,那和转置无关。完全不清楚您想要的输出是什么 - 请提供更多数据,为什么输出中有 abc0 和 abc2 而 xyz0 和 xyz1 特别奇怪。您在这里所做的事情对您来说可能很明显,但对于不了解上下文的读者来说,这肯定是不明显的。 我的错误...感谢您指出,输出应该是:1,9/20/16,abc0,abc1,null, null, xyz0,xyz1, null, null 【参考方案1】:

如果使用pivot函数就比较简单了。首先让我们创建一个类似于您问题中的数据集:

import org.apache.spark.sql.functions.concat, first, lit, when

val df = Seq(
  ("1", "9/20/16", "abc0", "xyz0", "o", "1"),
  ("1", "9/20/16", "abc1", "xyz1", "o", "2"),
  ("1", "9/20/16", "abc2", "xyz2", "i", "3"),
  ("1", "9/20/16", "abc3", "xyz3", "i", "4")
).toDF("SessionId", "date", "orig", "dest", "legind", "nbr")

然后定义并附加辅助列:

// This will be the column name
val key = when($"legind" === "o", concat(lit("orig"), $"nbr"))
           .when($"legind" === "i", concat(lit("dest"), $"nbr"))

// This will be the value
val value = when($"legind" === "o", $"orig")     // If o take origin
              .when($"legind" === "i", $"dest")  // If i take dest

val withKV =  df.withColumn("key", key).withColumn("value", value)

这将导致DataFrame 像这样:

+---------+-------+----+----+------+---+-----+-----+
|SessionId|   date|orig|dest|legind|nbr|  key|value|
+---------+-------+----+----+------+---+-----+-----+
|        1|9/20/16|abc0|xyz0|     o|  1|orig1| abc0|
|        1|9/20/16|abc1|xyz1|     o|  2|orig2| abc1|
|        1|9/20/16|abc2|xyz2|     i|  3|dest3| xyz2|
|        1|9/20/16|abc3|xyz3|     i|  4|dest4| xyz3|
+---------+-------+----+----+------+---+-----+-----+

接下来让我们定义一个可能的级别列表:

val levels = Seq("orig", "dest").flatMap(x => (1 to 4).map(y => s"$x$y"))

最后是pivot

val result = withKV
  .groupBy($"sessionId", $"date")
  .pivot("key", levels)
  .agg(first($"value", true)).show

result 是:

+---------+-------+-----+-----+-----+-----+-----+-----+-----+-----+
|sessionId|   date|orig1|orig2|orig3|orig4|dest1|dest2|dest3|dest4|
+---------+-------+-----+-----+-----+-----+-----+-----+-----+-----+
|        1|9/20/16| abc0| abc1| null| null| null| null| xyz2| xyz3|
+---------+-------+-----+-----+-----+-----+-----+-----+-----+-----+

【讨论】:

以上是关于Spark:行到列(如转置或枢轴)的主要内容,如果未能解决你的问题,请参考以下文章

如何从表中选择数据以转置或交叉表方式输出?

将 QUERY 结果的 GROUPS 组转置为单列

没有聚合的 Spark 数据帧枢轴

用于转置的 Unpivot 枢轴

仅使用最小 COUNT() 转置行和列(又名枢轴)?

使用 SQL 将 XML 结构转置/展平到列