分组并爆炸pyspark数组类型列

Posted

技术标签:

【中文标题】分组并爆炸pyspark数组类型列【英文标题】:Splitting into groups and exploding pyspark array type column 【发布时间】:2021-02-09 19:47:23 【问题描述】:

我有静态列表group_1group_2

group_1 = [a,b,c,d,e,f,g]
group_2 = [h,i,j,k]

我有 pyspark 数据框df1,如下所示。

示例 1:

df1:

+-----+----------------------------------------+-----------------------------------------+
|id   |array1                                  |array2                                   |
+-----+----------------------------------------+-----------------------------------------+
|id1  |[a,b,c,d,group_1,group_2]               |[a,b,c,d,e,f,g,h,i,j,k]                  |
+-----+----------------------------------------+-----------------------------------------+

输出_df:

+-----+-------------------|-------------------|
|id   |col1               |col2               |
+-----+-------------------|-------------------|
|id1  |[a,b,c,d]          |[a,b,c,d]          |
|id1  |[e,f,g]            |group_1            |
|id1  |[h,i,j,k]          |group_2            |
+-----+-------------------|-------------------|

实际上,array2 列将包含来自array1 列的元素。这就是我的源数据框 (source_df1) 的样子。

如果我们看到 array1 列,则有单独的元素,如 (a,b,c,d) 以及 group_1group_2 元素,但它们放在一起是不同的。

现在我想通过爆炸来创建 pyspark 数据框,使个人和组元素按output_df 所示进行分类。

Example1 Observation:如果我们看到输出数据框output_df,第二条记录group_1 只有[e,f,g],因为其他元素已经是单个元素的一部分。

示例 2:

source_df1:

+-----+----------------------------------------+-----------------------------------------+
|id   |array1                                  |array2                                   |
+-----+----------------------------------------+-----------------------------------------+
|id1  |[a,b,group_1,group_2]                   |[a,b,c,d,e,f,g,h,i,j,k]                  |
+-----+----------------------------------------+-----------------------------------------+

输出_df:

+-----+-------------------|-------------------|
|id   |col1               |col2               |
+-----+-------------------|-------------------|
|id1  |[a,b]              |[a,b]              |
|id1  |[c,d,e,f,g]        |group_1            |
|id1  |[h,i,j,k]          |group_2            |
+-----+-------------------|-------------------|

Example2 观察:如果我们看到输出数据框 output_df。第二条记录group_1 只有[c,d,e,f,g],因为其他元素已经是单个元素的一部分。

任何人都可以帮助实现这一目标吗?

【问题讨论】:

【参考方案1】:

如果你可以使用 Spark 2.4+,你可以通过一些数组函数来实现:

from pyspark.sql import functions as F


df1 = df.withColumn(
    "individual",
    F.array_except(F.col("array1"), F.array(*[F.lit("group_1"), F.lit("group_2")]))
).withColumn(
    "group_1",
    F.array_except(F.array(*[F.lit(i) for i in group_1]), "individual")
).withColumn(
    "group_2",
    F.array_except(F.array(*[F.lit(i) for i in group_2]), "individual")
).withColumn(
    "array2",
    F.explode(F.array(
        *[
            F.struct(F.array_intersect("array2", "individual").alias("col1"),
                     F.col("individual").cast("string").alias("col2")),
            F.struct(F.array_intersect("array2", "group_1").alias("col1"),
                     F.lit("group_1").alias("col2")),
            F.struct(F.array_intersect("array2", "group_2").alias("col1"),
                     F.lit("group_2").alias("col2"))
        ])
    )
).select("id", "array2.*")

df1.show(truncate=False)

#+---+------------+------------+
#|id |col1        |col2        |
#+---+------------+------------+
#|id1|[a, b, c, d]|[a, b, c, d]|
#|id1|[e, f, g]   |group_1     |
#|id1|[h, i, j, k]|group_2     |
#+---+------------+------------+

说明:

首先,将array1分成三个数组:individualgroup_1group_2。每一个都包含对应组的元素。 group_1group_2 中存在于 individual 中的元素将从这些组中移除。 然后,使用array_intersect 函数从array2 列中获取元素,这些元素存在于上面创建的三个组数组中。 最后,分解上面创建的新数组

请注意,如果您想验证group_1group_2 是否存在于array1 列中,您可以将whenarray_contains 函数一起使用:

F.when(
    F.array_contains(F.col("array1"), F.lit("group_1")),
    F.array_except(F.array(*[F.lit(i) for i in group_1]), "individual")
)

在示例中,我认为它始终存在于 array1 中。

【讨论】:

以上是关于分组并爆炸pyspark数组类型列的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark Struct 列:爆炸后的奇怪行为

如何在 PySpark 中进行分组并查找列的唯一项目 [重复]

获取pyspark数组类型列的最后n个元素

Pyspark:检查数组类型列是不是包含列表中的值[重复]

使用 pyspark 将结构数组旋转到列中 - 不爆炸数组

想将key添加到pyspark dataFrame的爆炸数组中