如何合并来自多行的值,以便它们可以一起处理 - Spark scala

Posted

技术标签:

【中文标题】如何合并来自多行的值,以便它们可以一起处理 - Spark scala【英文标题】:How to Merge values from multiple rows so they can be processed together - Spark scala 【发布时间】:2017-01-20 16:21:07 【问题描述】:

每个 personId 我有多个数据库行,其中的列可能有也可能没有值 - 我在这里使用颜色,因为数据是文本而不是数字,因此不适合内置聚合函数。一个简化的例子是

PersonId    ColA    ColB    ColB
100         red
100                 green
100                         gold
100         green
110                 yellow
110         white
110   
120         
etc...

我希望能够在函数中决定每个唯一的 PersonId 使用哪个列数据。如果数据每列没有多个值(颜色),则对表进行三向连接将是一个很好的解决方案。例如。该连接将 3 行合并为一行,但仍会产生多行。

PersonId    ColA    ColB    ColB
100         red     green   gold
100         green                                   
110         white   yellow
110   
120

所以我正在寻找的解决方案是允许我在一个地方(函数)处理一个人的所有值(颜色),以便可以根据他们的所有数据做出决定。 真实数据当然有更多列,但这个决定的主要数据是三列。数据在 Scala Spark 中作为 Dataframe 读取,我更喜欢使用 API 到 sql。我不知道任何异国情调的窗口或 groupby 功能是否会有所帮助,或者它是否会归结为简单的旧迭代和累积。 [How to aggregate values into collection after groupBy? 中使用的技术可能适用,但有点飞跃。

【问题讨论】:

我没有时间给出完整的答案,但是您是否查看过滞后/领先或其他窗口操作 为什么在这个例子中,100 的第二行是绿色,第一行是红绿金,而不是第一行是红色,第二行是绿绿金?你能详细解释一下你想如何组合它们吗? 这只是显示了 3-way join 可能产生的结果。我最初认为 3-way (self) join 只会返回 1 行 - 除非每列在其他行中具有不同的值,否则它会返回。真正的问题只是应对分布在多行多列的数据。 我主要在 Java Spark 中使用 RDD,在那种环境中,您可以先执行一个“flatMap”将此 RDD 映射到一个配对 RDD,其中您有 PersonID 作为键和另一列表示与键关联的颜色。然后你可以'AggragateByKey' 并得到你需要的东西。如果您认为这种方法可能会有所帮助,我可以详细说明。 拜托,这听起来很有希望 - 尽管我需要三个信息,即 PersonId、颜色和颜色所在的列。在哪一列中选择哪种颜色很重要。 【参考方案1】:

考虑使用 customUDF 来执行此操作。

import org.apache.spark.sql.functions._
val df = Seq((100, "red", null, null), (100, null, "white", null), (100, null, null, "green"), (200, null, "red", null)).toDF("PID", "A", "B", "C")

df.show()
+---+----+-----+-----+
|PID|   A|    B|    C|
+---+----+-----+-----+
|100| red| null| null|
|100|null|white| null|
|100|null| null|green|
|200|null|  red| null|
+---+----+-----+-----+

val customUDF = udf((array: Seq[String]) => 
    val newts = array.filter(_.nonEmpty)
    if  (newts.size == 0) null
    else newts.head
)

df.groupBy($"PID").agg(customUDF(collect_set($"A")).as("colA"), customUDF(collect_set($"B")).as("colB"), customUDF(collect_set($"C")).as("colC")).show

+---+----+-----+-----+
|PID|colA| colB| colC|
+---+----+-----+-----+
|100| red|white|green|
|200|null|  red| null|
+---+----+-----+-----+


【讨论】:

以上是关于如何合并来自多行的值,以便它们可以一起处理 - Spark scala的主要内容,如果未能解决你的问题,请参考以下文章

如何使用来自 1 列的数据创建 2 列并合并它们

PostgreSQL:如何将多行的值放在不同的列中,并将所有值合并到一行中?

如何将来自 Trident/Storm 的值存储在列表中(使用 Java API)

如何使用 pandas 将多行字符串合并为一个?

同时更新多行 SQL server

如何隔离受污染的合并主题分支上的提交,以便将它们应用于不同的分支?