如何根据具有相同 id 的另一行更新一行
Posted
技术标签:
【中文标题】如何根据具有相同 id 的另一行更新一行【英文标题】:how to update a row based on another row with same id 【发布时间】:2018-12-24 07:16:07 【问题描述】:使用 Spark 数据框,我想根据具有相同 id 的其他行更新行值。
例如, 我有以下记录,
id,value
1,10
1,null
1,null
2,20
2,null
2,null
我想得到如下结果
id,value
1,10
1,10
1,10
2,20
2,20
2,20
总而言之,某些行中的 value 列为 null,如果有另一行具有相同 id 且具有有效值的行,我想更新它们。
在sql中,我可以简单地用inner-join写一个更新语句,但是我在Spark-sql中没有找到同样的方法。
更新 combineCols a 内连接 combineCols b 在 a.id = b.id 设置 a.value = b.value (这就是我在sql中的做法)
【问题讨论】:
如果一个ID有多个值怎么办?例如,如果 id 1 的值不是 (10, null, null),而是 (10,30, null),那么应该发生什么? 只能为一个值或为空 【参考方案1】:让我们使用 SQL 方法来解决这个问题 -
myValues = [(1,10),(1,None),(1,None),(2,20),(2,None),(2,None)]
df = sqlContext.createDataFrame(myValues,['id','value'])
df.registerTempTable('table_view')
df1=sqlContext.sql(
'select id, sum(value) over (partition by id) as value from table_view'
)
df1.show()
+---+-----+
| id|value|
+---+-----+
| 1| 10|
| 1| 10|
| 1| 10|
| 2| 20|
| 2| 20|
| 2| 20|
+---+-----+
警告: 这些代码假定任何特定的id
都只有一个non-null
值。当我们groupby
值时,我们必须使用aggregation
函数,而我使用了sum
。如果任何id
有2 个non-null
值,则将求和。如果id
可以有多个non-null
值,那么最好使用min/max
,这样我们就可以得到其中一个值而不是sum
。
df1=sqlContext.sql(
'select id, max(value) over (partition by id) as value from table_view'
)
【讨论】:
谢谢,但实际上我在操作一个中间表,它有70多列和2000万行,用整个表的SQL来做效率高吗? 哦,是的,为什么不呢。 SQL 在内置优化中使用它自己的。我一定会使用它。但是,您也可以自己进行时间测试。请注意,我的代码假定对于任何特定的id
,您只有一个non-null
值,否则它将总结它们。对值进行分组时,需要提供聚合函数,我提供了sum.
如果两个答案中的任何一个对您有所帮助,您可以随时投票并接受您喜欢的答案。【参考方案2】:
您可以使用 window 来执行此操作(在 pyspark 中):
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# create dataframe
df = sc.parallelize([
[1,10],
[1,None],
[1,None],
[2,20],
[2,None],
[2,None],
]).toDF(('id', 'value'))
window = Window.partitionBy('id').orderBy(F.desc('value'))
df \
.withColumn('value', F.first('value').over(window)) \
.show()
结果:
+---+-----+
| id|value|
+---+-----+
| 1| 10|
| 1| 10|
| 1| 10|
| 2| 20|
| 2| 20|
| 2| 20|
+---+-----+
您可以在 scala 中使用相同的功能。
【讨论】:
谢谢:) 我还通过使用 groupby val df2 = df1.filter(!isnan($"value")).groupBy("id").agg(mean("value" ).as("update_value")) val result = df1.join(df2,Seq("id"),"inner").selectExpr("id","update_value") 有没有比使用均值函数更好的方法在这种情况下? 如果同一个 id 有多个值,例如 1,10 和 1,20,那么所有具有 1 id 的行的值都是 15。我从你的解释中了解到你不想要它。如果同一个 id 的所有值都为 null,您希望发生什么? 如果所有值都为null,则保持为null,如果不为null,则只有一个可能的值 如果所有值都为 null,则它在您的解决方案中变为 0。对于所有这些情况,您都可以使用第一个函数。以上是关于如何根据具有相同 id 的另一行更新一行的主要内容,如果未能解决你的问题,请参考以下文章
如何在pyspark中使用具有相同客户ID的新数据框的值更新一行旧数据框
使用相同的主键将值从一行添加到另一行 - Oracle SQL