如何爆炸列?

Posted

技术标签:

【中文标题】如何爆炸列?【英文标题】:How to explode columns? 【发布时间】:2016-05-23 12:38:12 【问题描述】:

之后:

val df = Seq((1, Vector(2, 3, 4)), (1, Vector(2, 3, 4))).toDF("Col1", "Col2")

我在 Apache Spark 中有这个 DataFrame:

+------+---------+
| Col1 | Col2    |
+------+---------+
|  1   |[2, 3, 4]|
|  1   |[2, 3, 4]|
+------+---------+

如何将其转换为:

+------+------+------+------+
| Col1 | Col2 | Col3 | Col4 |
+------+------+------+------+
|  1   |  2   |  3   |  4   |
|  1   |  2   |  3   |  4   |
+------+------+------+------+

【问题讨论】:

【参考方案1】:

不与 RDD 相互转换的解决方案:

df.select($"Col1", $"Col2"(0) as "Col2", $"Col2"(1) as "Col3", $"Col2"(2) as "Col3")

或者说更好:

val nElements = 3
df.select(($"Col1" +: Range(0, nElements).map(idx => $"Col2"(idx) as "Col" + (idx + 2)):_*))

Spark 数组列的大小不固定,例如:

+----+------------+
|Col1|        Col2|
+----+------------+
|   1|   [2, 3, 4]|
|   1|[2, 3, 4, 5]|
+----+------------+

因此,没有办法获得 列的数量并创建这些列。如果你知道大小总是一样的,你可以这样设置nElements

val nElements = df.select("Col2").first.getList(0).size

【讨论】:

不错的答案。有没有办法使用高级数据集 API 以更安全的方式实现上述目标?例如我怎样才能摆脱 $"Col1"? 你到底是什么意思?我想不会通过不将其添加到select 来摆脱它 :) 我猜你不想明确命名它?但我不确定您如何看待类型安全必然发挥作用。用你想做的最小例子来创建一个新问题可能是值得的。 确定我会添加另一个问题。但我的意思是 Spark 的 DataSet 提供了良好的编译时安全性。其中 $"col1" 将在运行时进行评估。因此,如果 col1 不存在于数据集中,我会在 select 语句中得到编译时错误,而不是在运行时得到。【参考方案2】:

只是为了提供sgvd's answer 的 Pyspark 版本。如果数组列在Col2,那么这个select语句会将Col2中每个数组的第一个nElements移动到它们自己的列中:

from pyspark.sql import functions as F            
df.select([F.col('Col2').getItem(i) for i in range(nElements)])

【讨论】:

如何将“Col1”添加到 select 语句中? @user422930,谢谢你的问题,抱歉我现在才看到。尝试执行以下操作:from pyspark.sql import functions as F df.select([F.col('Col1')]+[F.col('Col2').getItem(i) for i in range(nElements)]) 我还没有测试过,但请告诉我它是否有效。【参考方案3】:

只需添加到 sgvd 的解决方案:

如果大小不总是一样的,你可以这样设置nElements:

val nElements = df.select(size('Col2).as("Col2_count"))
                  .select(max("Col2_count"))
                  .first.getInt(0)

【讨论】:

【参考方案4】:

您可以使用地图:

df.map 
    case Row(col1: Int, col2: mutable.WrappedArray[Int]) => (col1, col2(0), col2(1), col2(2))
.toDF("Col1", "Col2", "Col3", "Col4").show()

【讨论】:

如果我不知道阵列上有多少列怎么办?我想要像explodeToColums这样的东西

以上是关于如何爆炸列?的主要内容,如果未能解决你的问题,请参考以下文章

Spark 在 Scala 中爆炸 - 将爆炸列添加到行

带有索引的爆炸列

解决 VAE 中的爆炸梯度

爆炸和拆分列导致数据不匹配

pandas DataFrame 爆炸列内容[重复]

在 Pyspark 中爆炸不是数组的结构列