Spark Dataframe - 为特定 KEY 组的 VALUE 更改写入新记录
Posted
技术标签:
【中文标题】Spark Dataframe - 为特定 KEY 组的 VALUE 更改写入新记录【英文标题】:Spark Dataframe - Write a new record for a change in VALUE for a particular KEY group 【发布时间】:2017-11-19 04:44:24 【问题描述】:当特定“KEY”组的“AMT”列发生变化时,需要写入一行。
例如:
Scenarios-1: For KEY=2, first change is 90 to 20, So need to write a record with value (20-90).
Similarly the next change for the same key group is 20 to 30.5, So again need to write another record with value (30.5 - 20)
Scenarios-2: For KEY=1, only one record for this KEY group so write as is
Scenarios-3: For KEY=3, Since the same AMT value exists twice, so write once
如何实现?使用窗口函数还是通过 groupBy 聚合函数?
输入数据示例:
val DF1 = List((1,34.6),(2,90.0),(2,90.0),(2,20.0),(2,30.5),(3,89.0),(3,89.0)).toDF("KEY", "AMT")
DF1.show(false)
+-----+-------------------+
|KEY |AMT |
+-----+-------------------+
|1 |34.6 |
|2 |90.0 |
|2 |90.0 |
|2 |20.0 |----->[ 20.0 - 90.0 = -70.0 ]
|2 |30.5 |----->[ 30.5 - 20.0 = 10.5 ]
|3 |89.0 |
|3 |89.0 |
+-----+-------------------+
预期值:
scala> df2.show()
+----+--------------------+
|KEY | AMT |
+----+--------------------+
| 1 | 34.6 |-----> As Is
| 2 | -70.0 |----->[ 20.0 - 90.0 = -70.0 ]
| 2 | 10.5 |----->[ 30.5 - 20.0 = 10.5 ]
| 3 | 89.0 |-----> As Is, with one record only
+----+--------------------+
【问题讨论】:
2, 90.0
在源数据帧中有两次,但在预期的数据帧中没有。这就是你想要的解决方案吗?
是的慈善家。这就是我们所期望的......基本上我们需要捕获 AMT 列中的任何变化
【参考方案1】:
我试图在 pyspark 中而不是在 scala 中解决它。
from pyspark.sql.functions import lead
from pyspark.sql.window import Window
w1=Window().partitionBy("key").orderBy("key")
DF4 =spark.createDataFrame([(1,34.6),(2,90.0),(2,90.0),(2,20.0),(2,30.5),(3,89.0),(3,89.0)],["KEY", "AMT"])
DF4.createOrReplaceTempView('keyamt')
DF7=spark.sql('select distinct key,amt from keyamt where key in ( select key from (select key,count(distinct(amt))dist from keyamt group by key) where dist=1)')
DF8=DF4.join(DF7,DF4['KEY']==DF7['KEY'],'leftanti').withColumn('new_col',((lag('AMT',1).over(w1)).cast('double') ))
DF9=DF8.withColumn('new_col1', ((DF8['AMT']-DF8['new_col'].cast('double'))))
DF9.withColumn('new_col1', ((DF9['AMT']-DF9['new_col'].cast('double')))).na.fill(0)
DF9.filter(DF9['new_col1'] !=0).select(DF9['KEY'],DF9['new_col1']).union(DF7).orderBy(DF9['KEY'])
输出:
+---+--------+
|KEY|new_col1|
+---+--------+
| 1| 34.6|
| 2| -70.0|
| 2| 10.5|
| 3| 89.0|
+---+--------+
【讨论】:
【参考方案2】:您可以使用window
函数结合when
、lead
、monotically_increasing_id()
用于ordering 和withColumn
api,如下所示
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("KEY").orderBy("rowNo")
val tempdf = DF1.withColumn("rowNo", monotonically_increasing_id())
tempdf.select($"KEY", when(lead("AMT", 1).over(windowSpec).isNull || (lead("AMT", 1).over(windowSpec)-$"AMT").as("AMT")===lit(0.0), $"AMT").otherwise(lead("AMT", 1).over(windowSpec)-$"AMT").as("AMT")).show(false)
【讨论】:
以上是关于Spark Dataframe - 为特定 KEY 组的 VALUE 更改写入新记录的主要内容,如果未能解决你的问题,请参考以下文章
Spark Dataframe:从 Map 类型生成元组数组
Spark Dataframe - 将特定行推到数据框中的最后一个
Spark Dataframe GroupBy 和计算复杂聚合函数