基于一百列对在 Spark 数据框中创建新列

Posted

技术标签:

【中文标题】基于一百列对在 Spark 数据框中创建新列【英文标题】:Create new columns in Spark dataframe based on a hundred column pairs 【发布时间】:2022-01-19 15:31:19 【问题描述】:

我正在尝试根据 100 列 (sch0,shm2...shm100) 中的值创建大约 9-10 列,但是这些列的值将是列中的值 (idm0,idm1....idm100 ) 这是同一数据框的一部分。

除了这 2 对 100 之外,还有其他列。 问题是,并不是所有的方案(schm0,schm1..schm100)都会有值,我们必须遍历每一个来找出值并相应地创建列,85+列大部分时间都是空的,所以我们需要忽略它们。

输入数据框示例:

+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+
|col1|col2|col3|sch0|idsm0|schm1|idsm1|schm2|idsm2|schm3|idsm3|
+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+
|   a|   b|   c|   0|    1|    2|    3|    4|    5| null| null|
+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+

schmidsm 可以达到 100,所以它基本上是 100 列的键值对。

预期输出:

----+----+----+----------+-------+-------+
|col1|col2|col3|found_zero|found_2|found_4|
+----+----+----+----------+-------+-------+
|   a|   b|   c|         1|      3|      5|
+----+----+----+----------+-------+-------+

注意:任何列都没有固定值,任何列都可以有任何值,并且我们创建的列必须基于在任何方案列中找到的值 (schm0...schm100)并且创建的列中的值将是方案的对应值,即 idsymbol (idsm0...idsm100)

我发现很难制定一个计划来做这件事,任何帮助将不胜感激。

编辑- 添加另一个输入示例--

col1|col2|schm_0|idsm_0|schm_1|idsm_1|schm_2|idsm_2|schm_3|idsm_3|schm_4|idsm_4|schm_5|idsm_5|
+----+----+------+------+------+------+------+------+------+------+------+------+------+------+
|   2|   6|    b1|   id1|     i|   id2|    xs|   id3|    ch|   id4|  null|  null|  null|  null|
|   3|   5|    b2|   id5|    x2|   id6|    ch|   id7|    be|   id8|  null|  null|    db|  id15|
|   4|   7|    b1|   id9|    ch|  id10|    xs|  id11|    us|  id12|  null|  null|  null|  null|
+----+----+------+------+------+------+------+------+------+------+------+------+------+------+

对于一个特定的记录,col(schm_0,schm_1....schm_100) 可以有大约 9 到 10 个唯一值,因为并非所有列都会填充值。

我们需要根据 9 个唯一值创建 9 个不同的列,因此对于一行,我们需要遍历 100 个 schmeme 列中的每一个,并根据找到的值收集在那里找到的所有值,分开需要创建列...并且这些创建的列中的值将是 idsm(idsm_0,idsm_1....idsm_100) 中的值

即如果 schm_0 具有值“cb”,我们需要为例如“col_cb”创建新列,并且此列中的值“col_cb”将是“idsm_0”列中的值。 同样,我们需要对所有 100 列执行此操作(我们需要省略空列)。

预期输出-

+----+----+------+------+-----+------+------+------+------+------+------+
|col1|col2|col_b1|col_b2|col_i|col_x2|col_ch|col_xs|col_be|col_us|col_db|
+----+----+------+------+-----+------+------+------+------+------+------+
|   2|   6|   id1|  null|  id2|  null|   id4|   id3|  null|  null|  null|
|   3|   5|  null|   id5| null|   id6|   1d7|  null|   id8|  null|  id15|
|   4|   7|   id9|  null| null|  null|  1d10|  id11|  null|  id12|  null|
+----+----+------+------+-----+------+------+------+------+------+------+

希望这可以清除问题陈述。 对此的任何帮助将不胜感激。

小问题再次编辑-

正如我们在上面的示例中看到的,我们创建的列是基于在方案符号列中找到的值,并且已经定义了一组将被创建的列,其数量为 10..列例如将是(col_a,col_b,col_c,cold_d,col_e,col_f,col_g_col_h,col_i,col_j)

并非所有 10 个关键字,即 (a,b,c....j) 都会一直出现在 (shcheme0.....scheme99) 下的数据集中。

要求是我们需要传递所有 10 列,如果某些键 (a,b,c...j) 不存在,则创建的列将具有空值。

【问题讨论】:

您能否给出或扩展您的示例,将多条记录作为输入和您期望的输出?仅仅看到单个记录的输出和您的描述就会让人感到困惑。 @NikunjKakadiya ,很抱歉没有说清楚,我添加了更多示例..希望有所帮助。 查看我发布的答案.. @NikunjKakadiya 感谢您的快速响应,这似乎适用于较小的方案和 idsybmol 列集,但正如我所提到的,我在实际案例中每列都有数百个,因此将所有这 100 个列在堆栈中会是明智的吗?或者我们可以尝试像 Map 这样的东西,我们可以在其中创建 100 多个列并将 scheme 和 idsm 存储为键值对,然后对存储在 map 中的值进行条件列创建。 @Arjun_Jha 据我所知,您可以使用 map 使其与堆栈功能一起更加动态,但不知道没有堆栈。遍历所有列会增加参与度,但会检查我是否找到更有效的方法。 【参考方案1】:

您可以获得所需的输出,但这将是一个多步骤的过程。

首先,您必须从原始数据帧中创建两个单独的数据帧,即一个包含 schm 列,另一个包含 idsm 列。您将不得不取消透视 schm 列和 idsm 列。

然后,您将根据列的唯一组合加入两个数据框,并根据空值过滤数据框。然后,您将根据唯一列进行分组,并以 schm 列为轴,并获取 idsm 列的第一个值。

//Sample Data
import org.apache.spark.sql.functions._
val initialdf = Seq((2,6,"b1","id1","i","id2","xs","id3","ch","id4",null,null,null,null),(3,5,"b2","id5","x2","id6","ch","id7","be","id8",null,null,"db","id15"),(4,7,"b1","id9","ch","id10","xs","id11","us","id12","es","id00",null,null)).toDF("col1","col2","schm_0","idsm_0","schm_1","idsm_1","schm_2","idsm_2","schm_3","idsm_3","schm_4","idsm_4","schm_5","idsm_5")
//creating two separate dataframes
val schmdf = initialdf.selectExpr("col1","col2", "stack(6, 'schm_0',schm_0, 'schm_1',schm_1,'schm_2',schm_2,'schm_3' ,schm_3, 'schm_4',schm_4,'schm_5',schm_5) as (schm,schm_value)").withColumn("id",split($"schm", "_")(1))
val idsmdf = initialdf.selectExpr("col1","col2", "stack(6, 'idsm_0',idsm_0, 'idsm_1',idsm_1,'idsm_2',idsm_2,'idsm_3' ,idsm_3, 'idsm_4',idsm_4,'idsm_5',idsm_5) as (idsm,idsm_value)").withColumn("id",split($"idsm", "_")(1))
//joining two dataframes and applying filter operation and giving alias for the column names to be used in next operation
val df = schmdf.join(idsmdf,Seq("col1","col2","id"),"inner").filter($"idsm_value" =!= "null").select("col1","col2","schm","schm_value","idsm","idsm_value").withColumn("schm_value", concat(lit("col_"),$"schm_value"))

df.groupBy("col1","col2").pivot("schm_value").agg(first("idsm_value")).show

你可以看到如下输出:

+----+----+------+------+------+------+------+------+-----+------+------+------+
|col1|col2|col_b1|col_b2|col_be|col_ch|col_db|col_es|col_i|col_us|col_x2|col_xs|
+----+----+------+------+------+------+------+------+-----+------+------+------+
|   2|   6|   id1|  null|  null|   id4|  null|  null|  id2|  null|  null|   id3|
|   3|   5|  null|   id5|   id8|   id7|  id15|  null| null|  null|   id6|  null|
|   4|   7|   id9|  null|  null|  id10|  null|  id00| null|  id12|  null|  id11|
+----+----+------+------+------+------+------+------+-----+------+------+------+

使用地图更新答案:

如果您有 n 列并且您事先知道它们,则可以使用下面的方法,它比上述方法更通用。

//sample Data
val initialdf = Seq((2,6,"b1","id1","i","id2","xs","id3","ch","id4",null,null,null,null),(3,5,"b2","id5","x2","id6","ch","id7","be","id8",null,null,"db","id15"),(4,7,"b1","id9","ch","id10","xs","id11","us","id12","es","id00",null,null)).toDF("col1","col2","schm_0","idsm_0","schm_1","idsm_1","schm_2","idsm_2","schm_3","idsm_3","schm_4","idsm_4","schm_5","idsm_5")

import org.apache.spark.sql.functions._

val schmcols = Seq("schm_0", "schm_1", "schm_2","schm_3","schm_4","schm_5")
val schmdf = initialdf.select($"col1",$"col2", explode(array(
                schmcols.map(column =>
                    struct(    
                        lit(column).alias("schm"),
                        col(column).alias("schm_value")
                    )): _*
        )).alias("schmColumn"))
.withColumn("id",split($"schmColumn.schm", "_")(1))
.withColumn("schm",$"schmColumn.schm")
.withColumn("schm_value",$"schmColumn.schm_value").drop("schmColumn")

val idcols = Seq("idsm_0", "idsm_1", "idsm_2","idsm_3","idsm_4","idsm_5")
val idsmdf = initialdf.select($"col1",$"col2", explode(array(
              idcols.map(
                column =>
                    struct(    
                        lit(column).alias("idsm"),
                        col(column).alias("idsm_value")
                    )): _*
               )).alias("idsmColumn"))
.withColumn("id",split($"idsmColumn.idsm", "_")(1))
.withColumn("idsm",$"idsmColumn.idsm")
.withColumn("idsm_value",$"idsmColumn.idsm_value").drop("idsmColumn")

val df = schmdf.join(idsmdf,Seq("col1","col2","id"),"inner")
.filter($"idsm_value" =!= "null")
.select("col1","col2","schm","schm_value","idsm","idsm_value")
.withColumn("schm_value", concat(lit("col_"),$"schm_value"))

df.groupBy("col1","col2").pivot("schm_value")
.agg(first("idsm_value")).show

【讨论】:

以上是关于基于一百列对在 Spark 数据框中创建新列的主要内容,如果未能解决你的问题,请参考以下文章

如何基于第二个 DataFrame (Java) 在 Spark DataFrame 中创建新列?

检查字符串以在 spark 数据框中创建新列

在 spark scala 中对数据框的每一列进行排序

如何根据基于其他列的列对数据框进行排序[重复]

如何使用 R 语言在基于多个二进制变量的数据框中创建新变量?

Spark - 如何使用列对数据框中的字符串进行切片[重复]