基于一百列对在 Spark 数据框中创建新列
Posted
技术标签:
【中文标题】基于一百列对在 Spark 数据框中创建新列【英文标题】:Create new columns in Spark dataframe based on a hundred column pairs 【发布时间】:2022-01-19 15:31:19 【问题描述】:我正在尝试根据 100 列 (sch0,shm2...shm100) 中的值创建大约 9-10 列,但是这些列的值将是列中的值 (idm0,idm1....idm100 ) 这是同一数据框的一部分。
除了这 2 对 100 之外,还有其他列。 问题是,并不是所有的方案(schm0,schm1..schm100)都会有值,我们必须遍历每一个来找出值并相应地创建列,85+列大部分时间都是空的,所以我们需要忽略它们。
输入数据框示例:
+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+
|col1|col2|col3|sch0|idsm0|schm1|idsm1|schm2|idsm2|schm3|idsm3|
+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+
| a| b| c| 0| 1| 2| 3| 4| 5| null| null|
+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+
schm
和idsm
可以达到 100,所以它基本上是 100 列的键值对。
预期输出:
----+----+----+----------+-------+-------+
|col1|col2|col3|found_zero|found_2|found_4|
+----+----+----+----------+-------+-------+
| a| b| c| 1| 3| 5|
+----+----+----+----------+-------+-------+
注意:任何列都没有固定值,任何列都可以有任何值,并且我们创建的列必须基于在任何方案列中找到的值 (schm0
...schm100
)并且创建的列中的值将是方案的对应值,即 idsymbol (idsm0
...idsm100
)
我发现很难制定一个计划来做这件事,任何帮助将不胜感激。
编辑- 添加另一个输入示例--
col1|col2|schm_0|idsm_0|schm_1|idsm_1|schm_2|idsm_2|schm_3|idsm_3|schm_4|idsm_4|schm_5|idsm_5|
+----+----+------+------+------+------+------+------+------+------+------+------+------+------+
| 2| 6| b1| id1| i| id2| xs| id3| ch| id4| null| null| null| null|
| 3| 5| b2| id5| x2| id6| ch| id7| be| id8| null| null| db| id15|
| 4| 7| b1| id9| ch| id10| xs| id11| us| id12| null| null| null| null|
+----+----+------+------+------+------+------+------+------+------+------+------+------+------+
对于一个特定的记录,col(schm_0,schm_1....schm_100) 可以有大约 9 到 10 个唯一值,因为并非所有列都会填充值。
我们需要根据 9 个唯一值创建 9 个不同的列,因此对于一行,我们需要遍历 100 个 schmeme 列中的每一个,并根据找到的值收集在那里找到的所有值,分开需要创建列...并且这些创建的列中的值将是 idsm(idsm_0,idsm_1....idsm_100) 中的值
即如果 schm_0 具有值“cb”,我们需要为例如“col_cb”创建新列,并且此列中的值“col_cb”将是“idsm_0”列中的值。 同样,我们需要对所有 100 列执行此操作(我们需要省略空列)。
预期输出-
+----+----+------+------+-----+------+------+------+------+------+------+
|col1|col2|col_b1|col_b2|col_i|col_x2|col_ch|col_xs|col_be|col_us|col_db|
+----+----+------+------+-----+------+------+------+------+------+------+
| 2| 6| id1| null| id2| null| id4| id3| null| null| null|
| 3| 5| null| id5| null| id6| 1d7| null| id8| null| id15|
| 4| 7| id9| null| null| null| 1d10| id11| null| id12| null|
+----+----+------+------+-----+------+------+------+------+------+------+
希望这可以清除问题陈述。 对此的任何帮助将不胜感激。
小问题再次编辑-
正如我们在上面的示例中看到的,我们创建的列是基于在方案符号列中找到的值,并且已经定义了一组将被创建的列,其数量为 10..列例如将是(col_a,col_b,col_c,cold_d,col_e,col_f,col_g_col_h,col_i,col_j)
并非所有 10 个关键字,即 (a,b,c....j) 都会一直出现在 (shcheme0.....scheme99) 下的数据集中。
要求是我们需要传递所有 10 列,如果某些键 (a,b,c...j) 不存在,则创建的列将具有空值。
【问题讨论】:
您能否给出或扩展您的示例,将多条记录作为输入和您期望的输出?仅仅看到单个记录的输出和您的描述就会让人感到困惑。 @NikunjKakadiya ,很抱歉没有说清楚,我添加了更多示例..希望有所帮助。 查看我发布的答案.. @NikunjKakadiya 感谢您的快速响应,这似乎适用于较小的方案和 idsybmol 列集,但正如我所提到的,我在实际案例中每列都有数百个,因此将所有这 100 个列在堆栈中会是明智的吗?或者我们可以尝试像 Map 这样的东西,我们可以在其中创建 100 多个列并将 scheme 和 idsm 存储为键值对,然后对存储在 map 中的值进行条件列创建。 @Arjun_Jha 据我所知,您可以使用 map 使其与堆栈功能一起更加动态,但不知道没有堆栈。遍历所有列会增加参与度,但会检查我是否找到更有效的方法。 【参考方案1】:您可以获得所需的输出,但这将是一个多步骤的过程。
首先,您必须从原始数据帧中创建两个单独的数据帧,即一个包含 schm 列,另一个包含 idsm 列。您将不得不取消透视 schm 列和 idsm 列。
然后,您将根据列的唯一组合加入两个数据框,并根据空值过滤数据框。然后,您将根据唯一列进行分组,并以 schm 列为轴,并获取 idsm 列的第一个值。
//Sample Data
import org.apache.spark.sql.functions._
val initialdf = Seq((2,6,"b1","id1","i","id2","xs","id3","ch","id4",null,null,null,null),(3,5,"b2","id5","x2","id6","ch","id7","be","id8",null,null,"db","id15"),(4,7,"b1","id9","ch","id10","xs","id11","us","id12","es","id00",null,null)).toDF("col1","col2","schm_0","idsm_0","schm_1","idsm_1","schm_2","idsm_2","schm_3","idsm_3","schm_4","idsm_4","schm_5","idsm_5")
//creating two separate dataframes
val schmdf = initialdf.selectExpr("col1","col2", "stack(6, 'schm_0',schm_0, 'schm_1',schm_1,'schm_2',schm_2,'schm_3' ,schm_3, 'schm_4',schm_4,'schm_5',schm_5) as (schm,schm_value)").withColumn("id",split($"schm", "_")(1))
val idsmdf = initialdf.selectExpr("col1","col2", "stack(6, 'idsm_0',idsm_0, 'idsm_1',idsm_1,'idsm_2',idsm_2,'idsm_3' ,idsm_3, 'idsm_4',idsm_4,'idsm_5',idsm_5) as (idsm,idsm_value)").withColumn("id",split($"idsm", "_")(1))
//joining two dataframes and applying filter operation and giving alias for the column names to be used in next operation
val df = schmdf.join(idsmdf,Seq("col1","col2","id"),"inner").filter($"idsm_value" =!= "null").select("col1","col2","schm","schm_value","idsm","idsm_value").withColumn("schm_value", concat(lit("col_"),$"schm_value"))
df.groupBy("col1","col2").pivot("schm_value").agg(first("idsm_value")).show
你可以看到如下输出:
+----+----+------+------+------+------+------+------+-----+------+------+------+
|col1|col2|col_b1|col_b2|col_be|col_ch|col_db|col_es|col_i|col_us|col_x2|col_xs|
+----+----+------+------+------+------+------+------+-----+------+------+------+
| 2| 6| id1| null| null| id4| null| null| id2| null| null| id3|
| 3| 5| null| id5| id8| id7| id15| null| null| null| id6| null|
| 4| 7| id9| null| null| id10| null| id00| null| id12| null| id11|
+----+----+------+------+------+------+------+------+-----+------+------+------+
使用地图更新答案:
如果您有 n 列并且您事先知道它们,则可以使用下面的方法,它比上述方法更通用。
//sample Data
val initialdf = Seq((2,6,"b1","id1","i","id2","xs","id3","ch","id4",null,null,null,null),(3,5,"b2","id5","x2","id6","ch","id7","be","id8",null,null,"db","id15"),(4,7,"b1","id9","ch","id10","xs","id11","us","id12","es","id00",null,null)).toDF("col1","col2","schm_0","idsm_0","schm_1","idsm_1","schm_2","idsm_2","schm_3","idsm_3","schm_4","idsm_4","schm_5","idsm_5")
import org.apache.spark.sql.functions._
val schmcols = Seq("schm_0", "schm_1", "schm_2","schm_3","schm_4","schm_5")
val schmdf = initialdf.select($"col1",$"col2", explode(array(
schmcols.map(column =>
struct(
lit(column).alias("schm"),
col(column).alias("schm_value")
)): _*
)).alias("schmColumn"))
.withColumn("id",split($"schmColumn.schm", "_")(1))
.withColumn("schm",$"schmColumn.schm")
.withColumn("schm_value",$"schmColumn.schm_value").drop("schmColumn")
val idcols = Seq("idsm_0", "idsm_1", "idsm_2","idsm_3","idsm_4","idsm_5")
val idsmdf = initialdf.select($"col1",$"col2", explode(array(
idcols.map(
column =>
struct(
lit(column).alias("idsm"),
col(column).alias("idsm_value")
)): _*
)).alias("idsmColumn"))
.withColumn("id",split($"idsmColumn.idsm", "_")(1))
.withColumn("idsm",$"idsmColumn.idsm")
.withColumn("idsm_value",$"idsmColumn.idsm_value").drop("idsmColumn")
val df = schmdf.join(idsmdf,Seq("col1","col2","id"),"inner")
.filter($"idsm_value" =!= "null")
.select("col1","col2","schm","schm_value","idsm","idsm_value")
.withColumn("schm_value", concat(lit("col_"),$"schm_value"))
df.groupBy("col1","col2").pivot("schm_value")
.agg(first("idsm_value")).show
【讨论】:
以上是关于基于一百列对在 Spark 数据框中创建新列的主要内容,如果未能解决你的问题,请参考以下文章
如何基于第二个 DataFrame (Java) 在 Spark DataFrame 中创建新列?