基于其他列(即应用 CDC)将多个 spark 数据帧行组合成一个
Posted
技术标签:
【中文标题】基于其他列(即应用 CDC)将多个 spark 数据帧行组合成一个【英文标题】:Combine multiple spark dataframe rows into one based on other columns i.e. applying CDC 【发布时间】:2021-11-30 11:20:02 【问题描述】:我的 spark 数据框中有以下形式的数据。
+----+------+------+------+-----------+-------------------------+
| id | name | age | city | operation | update_time |
+----+------+------+------+-----------+-------------------------+
| 1 | jon | 12 | NULL | INSERT | 2021-10-11T16:11:00.378 |
+----+------+------+------+-----------+-------------------------+
| 1 | NULL | NULL | NY | UPDATE | 2021-10-11T17:11:00.378 |
+----+------+------+------+-----------+-------------------------+
| 1 | jack | NULL | NULL | UPDATE | 2021-10-11T18:11:00.378 |
+----+------+------+------+-----------+-------------------------+
| 2 | sam | 11 | TN | INSERT | 2021-10-11T18:11:00.378 |
+----+------+------+------+-----------+-------------------------+
| 3 | tim | NULL | CA | INSERT | 2021-10-11T16:11:00.378 |
+----+------+------+------+-----------+-------------------------+
| 3 | NULL | 33 | MT | UPDATE | 2021-10-11T17:11:00.378 |
+----+------+------+------+-----------+-------------------------+
我正在尝试在数据框中寻找可以帮助我将数据转换为以下形式的函数。但是什么也没找到。我能想到的最多的是连接,但它应该与多个数据框一起使用。但这里我只有一个。那么如何将行合并为一行并将所有更新的列值包含在一行中。
+----+------+-----+------+-------------------------+
| id | name | age | city | update_time |
+----+------+-----+------+-------------------------+
| 1 | jack | 12 | NY | 2021-10-11T18:11:00.378 |
+----+------+-----+------+-------------------------+
| 2 | sam | 11 | TN | 2021-10-11T18:11:00.378 |
+----+------+-----+------+-------------------------+
| 3 | tim | 33 | MT | 2021-10-11T17:11:00.378 |
+----+------+-----+------+-------------------------+
【问题讨论】:
我认为使用窗口函数和last
使用您的示例数据编写一些东西会很容易(我的意思是,我确实编写了一些适用于您的示例数据的代码)但我认为它也是简单化。例如,您如何假设所有记录都将始终具有 INSERT?我认为在 CDC 的情况下这不太可能。您不需要与主表进行比较而不仅仅是数据框吗?你可能应该看看Delta Lake。
是的,你是对的,可能存在我只有 UPDATE 而没有 INSERT 操作的情况。但我无法连接到数据库进行比较。
【参考方案1】:
package spark
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object DataFrameWindowFunc extends App
val spark = SparkSession
.builder()
.master("local")
.appName("DataFrame-WindowFunction")
.getOrCreate()
import spark.implicits._
case class Data(
id: Int,
name: Option[String],
age: Option[Int],
city: Option[String],
operation: String,
updateTime: String
)
val sourceData = Seq(
Data(1, Some("jon"), Some(12), None, "INSERT", "2021-10-11T16:11:00.378"),
Data(1, None, None, Some("NY"), "UPDATE", "2021-10-11T17:11:00.378"),
Data(1, Some("jack"), None, None, "UPDATE", "2021-10-11T18:11:00.378"),
Data(
2,
Some("sam"),
Some(11),
Some("TN"),
"INSERT",
"2021-10-11T18:11:00.378"
),
Data(3, Some("tim"), None, Some("CA"), "INSERT", "2021-10-11T16:11:00.378"),
Data(3, None, Some(33), Some("MT"), "UPDATE", "2021-10-11T17:11:00.378")
).toDF
sourceData.show(false)
// +---+----+----+----+---------+-----------------------+
// |id |name|age |city|operation|updateTime |
// +---+----+----+----+---------+-----------------------+
// |1 |jon |12 |null|INSERT |2021-10-11T16:11:00.378|
// |1 |null|null|NY |UPDATE |2021-10-11T17:11:00.378|
// |1 |jack|null|null|UPDATE |2021-10-11T18:11:00.378|
// |2 |sam |11 |TN |INSERT |2021-10-11T18:11:00.378|
// |3 |tim |null|CA |INSERT |2021-10-11T16:11:00.378|
// |3 |null|33 |MT |UPDATE |2021-10-11T17:11:00.378|
// +---+----+----+----+---------+-----------------------+
val res = sourceData
.orderBy(col("id"), col("updateTime").desc)
.groupBy("id")
.agg(
first(col("name"), true).alias("name"),
first(col("age"), true).alias("age"),
first(col("city"), true).alias("city"),
first(col("updateTime"), true).alias("update_time")
)
.orderBy(col("id"))
res.show(false)
// +---+----+---+----+-----------------------+
// |id |name|age|city|update_time |
// +---+----+---+----+-----------------------+
// |1 |jack|12 |NY |2021-10-11T18:11:00.378|
// |2 |sam |11 |TN |2021-10-11T18:11:00.378|
// |3 |tim |33 |MT |2021-10-11T17:11:00.378|
// +---+----+---+----+-----------------------+
【讨论】:
感谢您给我一个方向,所以如果正确理解代码,它将选择由 updated_time desc 排序的组中的第一个(),而与操作无关。因此,如果操作全部更新,它会将所有行分组并使其成为一行?抱歉,我是 scala 和 spark 的新手,所以理解代码需要时间。 是的。每个 id 一行。 好的,但是哪行代码可以折叠列值并选择最新的值。例如乔恩和杰克 orderBy(...) - 对 id 和插入/更新时间从最后一行到第一行进行排序。 groupBy(...) - 组 id 的一行。 first(col, true) - 如果不为空,则为组提供第一个值列。使用 agg() 函数,我们可以一次计算多个聚合。以上是关于基于其他列(即应用 CDC)将多个 spark 数据帧行组合成一个的主要内容,如果未能解决你的问题,请参考以下文章
将 UDF 应用于 Spark Dataframe 中的多个列
使用 PySpark 将多个数字列拟合到 spark-ml 模型中