如何根据具有相同 id 的另一行更新一行

Posted

技术标签:

【中文标题】如何根据具有相同 id 的另一行更新一行【英文标题】:how to update a row based on another row with same id 【发布时间】:2018-12-24 07:16:07 【问题描述】:

使用 Spark 数据框,我想根据具有相同 id 的其他行更新行值。

例如, 我有以下记录,

id,value
1,10
1,null
1,null
2,20
2,null
2,null

我想得到如下结果

id,value
1,10
1,10
1,10
2,20
2,20
2,20

总而言之,某些行中的 value 列为 null,如果有另一行具有相同 id 且具有有效值的行,我想更新它们。

在sql中,我可以简单地用inner-join写一个更新语句,但是我在Spark-sql中没有找到同样的方法。

更新 combineCols a 内连接 combineCols b 在 a.id = b.id 设置 a.value = b.value (这就是我在sql中的做法)

【问题讨论】:

如果一个ID有多个值怎么办?例如,如果 id 1 的值不是 (10, null, null),而是 (10,30, null),那么应该发生什么? 只能为一个值或为空 【参考方案1】:

让我们使用 SQL 方法来解决这个问题 -

myValues = [(1,10),(1,None),(1,None),(2,20),(2,None),(2,None)]
df = sqlContext.createDataFrame(myValues,['id','value'])

df.registerTempTable('table_view')
df1=sqlContext.sql(
    'select id, sum(value) over (partition by id) as value from table_view'
)
df1.show()
+---+-----+
| id|value|
+---+-----+
|  1|   10|
|  1|   10|
|  1|   10|
|  2|   20|
|  2|   20|
|  2|   20|
+---+-----+

警告: 这些代码假定任何特定的id 都只有一个non-null 值。当我们groupby 值时,我们必须使用aggregation 函数,而我使用了sum。如果任何id 有2 个non-null 值,则将求和。如果id 可以有多个non-null 值,那么最好使用min/max,这样我们就可以得到其中一个值而不是sum

df1=sqlContext.sql(
    'select id, max(value) over (partition by id) as value from table_view'
)

【讨论】:

谢谢,但实际上我在操作一个中间表,它有70多列和2000万行,用整个表的SQL来做效率高吗? 哦,是的,为什么不呢。 SQL 在内置优化中使用它自己的。我一定会使用它。但是,您也可以自己进行时间测试。请注意,我的代码假定对于任何特定的id,您只有一个non-null 值,否则它将总结它们。对值进行分组时,需要提供聚合函数,我提供了sum. 如果两个答案中的任何一个对您有所帮助,您可以随时投票并接受您喜欢的答案。【参考方案2】:

您可以使用 window 来执行此操作(在 pyspark 中):

from pyspark.sql import functions as F
from pyspark.sql.window import Window

# create dataframe
df = sc.parallelize([
    [1,10],
    [1,None],
    [1,None],
    [2,20],
    [2,None],
    [2,None],
]).toDF(('id', 'value'))

window = Window.partitionBy('id').orderBy(F.desc('value'))
df \
    .withColumn('value', F.first('value').over(window)) \
    .show()

结果:

+---+-----+
| id|value|
+---+-----+
|  1|   10|
|  1|   10|
|  1|   10|
|  2|   20|
|  2|   20|
|  2|   20|
+---+-----+

您可以在 scala 中使用相同的功能。

【讨论】:

谢谢:) 我还通过使用 groupby val df2 = df1.filter(!isnan($"value")).groupBy("id").agg(mean("value" ).as("update_value")) val result = df1.join(df2,Seq("id"),"inner").selectExpr("id","update_value") 有没有比使用均值函数更好的方法在这种情况下? 如果同一个 id 有多个值,例如 1,10 和 1,20,那么所有具有 1 id 的行的值都是 15。我从你的解释中了解到你不想要它。如果同一个 id 的所有值都为 null,您希望发生什么? 如果所有值都为null,则保持为null,如果不为null,则只有一个可能的值 如果所有值都为 null,则它在您的解决方案中变为 0。对于所有这些情况,您都可以使用第一个函数。

以上是关于如何根据具有相同 id 的另一行更新一行的主要内容,如果未能解决你的问题,请参考以下文章

如何在pyspark中使用具有相同客户ID的新数据框的值更新一行旧数据框

使用相同的主键将值从一行添加到另一行 - Oracle SQL

使用特定列相同的另一行的值更新列值

如何从具有相同 id 的多行中选择仅一行的值并用 - 分隔它们? [复制]

如何将具有相同列值的mysql行分组为一行?

Mysql - 仅返回具有相同唯一 ID 的第一行