Scala Spark 循环加入数据框
Posted
技术标签:
【中文标题】Scala Spark 循环加入数据框【英文标题】:Scala Spark Join Dataframe in loop 【发布时间】:2020-06-28 21:19:22 【问题描述】:我正在尝试在循环中即时加入DataFrames
。我正在使用属性文件来获取要在最终数据框中使用的列详细信息。
属性文件 -
a01=status:single,perm_id:multi
a02=status:single,actv_id:multi
a03=status:single,perm_id:multi,actv_id:multi
............................
............................
对于属性文件中的每一行,我需要创建一个 DataFrame 并将其保存在一个文件中。使用PropertiesReader
加载属性文件。如果模式是单一的,那么我只需要从表中获取列值。但如果是多,那么我需要获取值列表。
val propertyColumn = properties.get("a01") //a01 value we are getting as an argument. This might be a01,a02 or a0n
val columns = propertyColumn.toString.split(",").map(_.toString)
act_det 表 -
+-------+--------+-----------+-----------+-----------+------------+
|id |act_id |status |perm_id |actv_id | debt_id |
+-------+--------+-----------+-----------+-----------+------------+
| 1 |1 | 4 | 1 | 10 | 1 |
+-------+--------+-----------+-----------+-----------+------------+
| 2 |1 | 4 | 2 | 20 | 2 |
+-------+--------+-----------+-----------+-----------+------------+
| 3 |1 | 4 | 3 | 30 | 1 |
+-------+--------+-----------+-----------+-----------+------------+
| 4 |2 | 4 | 5 | 10 | 3 |
+-------+--------+-----------+-----------+-----------+------------+
| 5 |2 | 4 | 6 | 20 | 1 |
+-------+--------+-----------+-----------+-----------+------------+
| 6 |2 | 4 | 7 | 30 | 1 |
+-------+--------+-----------+-----------+-----------+------------+
| 7 |3 | 4 | 1 | 10 | 3 |
+-------+--------+-----------+-----------+-----------+------------+
| 8 |3 | 4 | 5 | 20 | 1 |
+-------+--------+-----------+-----------+-----------+------------+
| 9 |3 | 4 | 2 | 30 | 3 |
+-------+--------+-----------+-----------+------------+-----------+
主数据框 -
val data = sqlContext.sql("select * from act_det")
我想要以下输出 -
对于a01-
+-------+--------+-----------+
|act_id |status |perm_id |
+-------+--------+-----------+
| 1 | 4 | [1,2,3] |
+-------+--------+-----------+
| 2 | 4 | [5,6,7] |
+-------+--------+-----------+
| 3 | 4 | [1,5,2] |
+-------+--------+-----------+
对于a02-
+-------+--------+-----------+
|act_id |status |actv_id |
+-------+--------+-----------+
| 1 | 4 | [10,20,30]|
+-------+--------+-----------+
| 2 | 4 | [10,20,30]|
+-------+--------+-----------+
| 3 | 4 | [10,20,30]|
+-------+--------+-----------+
对于a03-
+-------+--------+-----------+-----------+
|act_id |status |perm_id |actv_id |
+-------+--------+-----------+-----------+
| 1 | 4 | [1,2,3] |[10,20,30] |
+-------+--------+-----------+-----------+
| 2 | 4 | [5,6,7] |[10,20,30] |
+-------+--------+-----------+-----------+
| 3 | 4 | [1,5,2] |[10,20,30] |
+-------+--------+-----------+-----------+
但是数据框的创建过程应该是动态的。
我尝试了下面的代码,但我无法在循环中实现 DataFrames 的连接逻辑。
val finalDF:DataFrame = ??? //empty dataframe
for
column <- columns
yeild
val eachColumn = column.toString.split(":").map(_.toString)
val columnName = eachColumn(0)
val mode = eachColumn(1)
if(mode.equalsIgnoreCase("single"))
data.select($"act_id", $"status").distinct
//I want to join finalDF with data.select($"act_id", $"status").distinct
else if(mode.equalsIgnoreCase("multi"))
data.groupBy($"act_id").agg(collect_list($"perm_id").as("perm_id"))
//I want to join finalDF with data.groupBy($"act_id").agg(collect_list($"perm_id").as("perm_id"))
任何建议或指导将不胜感激。
【问题讨论】:
只需创建中间表act_id、perm_id并在else if语句中将其与数据框连接 您可以添加计算结果的初始数据帧吗? 添加主表记录。 Avijit,我已经在检查下添加了解决方案。 谢谢@Srinivas。我会实施您分享的解决方案并通知您。 【参考方案1】:检查下面的代码。
scala> df.show(false)
+---+------+------+-------+-------+-------+
|id |act_id|status|perm_id|actv_id|debt_id|
+---+------+------+-------+-------+-------+
|1 |1 |4 |1 |10 |1 |
|2 |1 |4 |2 |20 |2 |
|3 |1 |4 |3 |30 |1 |
|4 |2 |4 |5 |10 |3 |
|5 |2 |4 |6 |20 |1 |
|6 |2 |4 |7 |30 |1 |
|7 |3 |4 |1 |10 |3 |
|8 |3 |4 |5 |20 |1 |
|9 |3 |4 |2 |30 |3 |
+---+------+------+-------+-------+-------+
定义primary keys
scala> val primary_key = Seq("act_id").map(col(_))
primary_key: Seq[org.apache.spark.sql.Column] = List(act_id)
配置
scala> configs.foreach(println)
/*
(a01,status:single,perm_id:multi)
(a02,status:single,actv_id:multi)
(a03,status:single,perm_id:multi,actv_id:multi)
*/
构造表达式。
scala>
val columns = configs
.map(c =>
c._2
.split(",")
.map(c =>
val cc = c.split(":");
if(cc.tail.contains("single"))
first(col(cc.head)).as(cc.head)
else
collect_list(col(cc.head)).as(cc.head)
)
)
/*
columns: scala.collection.immutable.Iterable[Array[org.apache.spark.sql.Column]] = List(
Array(first(status, false) AS `status`, collect_list(perm_id) AS `perm_id`),
Array(first(status, false) AS `status`, collect_list(actv_id) AS `actv_id`),
Array(first(status, false) AS `status`, collect_list(perm_id) AS `perm_id`, collect_list(actv_id) AS `actv_id`)
)
*/
最终结果
scala> columns.map(c => df.groupBy(primary_key:_*).agg(c.head,c.tail:_*)).map(_.show(false))
+------+------+---------+
|act_id|status|perm_id |
+------+------+---------+
|3 |4 |[1, 5, 2]|
|1 |4 |[1, 2, 3]|
|2 |4 |[5, 6, 7]|
+------+------+---------+
+------+------+------------+
|act_id|status|actv_id |
+------+------+------------+
|3 |4 |[10, 20, 30]|
|1 |4 |[10, 20, 30]|
|2 |4 |[10, 20, 30]|
+------+------+------------+
+------+------+---------+------------+
|act_id|status|perm_id |actv_id |
+------+------+---------+------------+
|3 |4 |[1, 5, 2]|[10, 20, 30]|
|1 |4 |[1, 2, 3]|[10, 20, 30]|
|2 |4 |[5, 6, 7]|[10, 20, 30]|
+------+------+---------+------------+
【讨论】:
以上是关于Scala Spark 循环加入数据框的主要内容,如果未能解决你的问题,请参考以下文章