复制当前行,修改它并在火花中添加一个新行

Posted

技术标签:

【中文标题】复制当前行,修改它并在火花中添加一个新行【英文标题】:copy current row , modify it and add a new row in spark 【发布时间】:2020-07-24 13:28:40 【问题描述】:

我正在使用带有 java8 版本的 spark-sql-2.4.1v。 我有一个场景,我需要复制当前行并创建另一行修改几列数据如何在 spark-sql 中实现?

例如: 给定

 val data = List(
  ("20", "score", "school",  14 ,12),
  ("21", "score", "school",  13 , 13),
  ("22", "rate", "school",  11 ,14)
 )
val df = data.toDF("id", "code", "entity", "value1","value2")

电流输出

+---+-----+------+------+------+
| id| code|entity|value1|value2|
+---+-----+------+------+------+
| 20|score|school|    14|    12|
| 21|score|school|    13|    13|
| 22| rate|school|    11|    14|
+---+-----+------+------+------+

当列“code”为“rate”时,将其复制为两行,即一列是 原始的,第二个是新代码“old_rate”的另一行,例如 下面

预期输出:

+---+--------+------+------+------+
| id|    code|entity|value1|value2|
+---+--------+------+------+------+
| 20|   score|school|    14|    12|
| 21|   score|school|    13|    13|
| 22|    rate|school|    11|    14|
| 22|new_rate|school|    11|    14|
+---+--------+------+------+------+

如何做到这一点?

【问题讨论】:

【参考方案1】:

您可以将此方法用于您的场景,

df.union(df.filter($"code"==="rate").withColumn("code",concat(lit("new_"), $"code"))).show()
/*
+---+--------+------+------+------+
| id|    code|entity|value1|value2|
+---+--------+------+------+------+
| 20|   score|school|    14|    12|
| 21|   score|school|    13|    13|
| 22|    rate|school|    11|    14|
| 22|new_rate|school|    11|    14|
+---+--------+------+------+------+
*/

【讨论】:

如何在 spark 中处理这个用例,任何建议请***.com/questions/63127722/… 非常感谢,我还有一个这样的用例,任何建议请***.com/questions/63137437/… 你能给我同样的建议吗***.com/questions/63450135/… 你能告诉我这个广播变量访问有什么问题吗? ***.com/questions/64003697/…【参考方案2】:

使用when检查code === rate,如果匹配则将该列值替换为array(lit("rate"),lit("new_rate"))&不匹配的列值array($"code")然后分解code列。

检查下面的代码。

scala> df.show(false)
+---+-----+------+------+------+
|id |code |entity|value1|value2|
+---+-----+------+------+------+
|20 |score|school|14    |12    |
|21 |score|school|13    |13    |
|22 |rate |school|11    |14    |
+---+-----+------+------+------+
val colExpr = explode(
    when(
        $"code" === "rate",
        array(
            lit("rate"),
            lit("new_rate")
        )
    )
    .otherwise(array($"code"))
)
scala> df.withColumn("code",colExpr).show(false)
+---+--------+------+------+------+
|id |code    |entity|value1|value2|
+---+--------+------+------+------+
|20 |score   |school|14    |12    |
|21 |score   |school|13    |13    |
|22 |rate    |school|11    |14    |
|22 |new_rate|school|11    |14    |
+---+--------+------+------+------+

【讨论】:

非常感谢斯里尼,它奏效了。需要将其转换为 java...非常感谢.. 这是做什么的 ".otherwise(array($"code"))" ?? 如何在 spark 中处理这个用例,任何建议请***.com/questions/63127722/… 接受了答案,我现在的问题是如何查找 value1 value2 列...卡在那里。

以上是关于复制当前行,修改它并在火花中添加一个新行的主要内容,如果未能解决你的问题,请参考以下文章

如何避免火花提交缓存

如何将火花数据帧的“第一”行复制到另一个数据帧?为啥我的最小示例失败了?

如何将火花行(StructType)投射到scala案例类

如何在火花流中添加 2 行具有相同键(列值)的行?

如何将新列和相应的行特定值添加到火花数据帧?

如何截断火花数据框列的值? [复制]