A 列和 B 列之间的流差由 C 列和 D 列汇总
Posted
技术标签:
【中文标题】A 列和 B 列之间的流差由 C 列和 D 列汇总【英文标题】:stream difference between column A and B aggregated by column C and D 【发布时间】:2020-02-29 11:22:18 【问题描述】:如何将以下内容流式传输到表格中:
C 列和 D 列汇总的 A 列和 B 列之间的差异。
+-------------+-------------------+--+-
| Column_A|Column_B |Column_C|Column_D|
+-------------+-------------------+--+-
|52 |67 |boy |car |
|44 |25 |girl |bike |
|98 |85 |boy |car |
|52 |41 |girl |car |
+-------------+-------------------+--+-
这是我的尝试,但它不起作用:
difference = streamingDataF.withColumn("Difference", expr("Column_A - Column_B")).drop("Column_A").drop("Column_B").groupBy("Column_C")
differenceStream = difference.writeStream\
.queryName("diff_aggr")\
.format("memory").outputMode("append")\
.start()
我收到此错误:“GroupedData”对象没有属性“writeStream”
【问题讨论】:
【参考方案1】:取决于您希望如何聚合分组数据 - 例如,您可以这样做
先决条件(以防您尚未设置):
from pyspark.sql import functions as F
from pyspark.sql.functions import *
对于sum
:
difference = streamingDataF.withColumn("Difference", expr("Column_A - Column_B")).drop("Column_A").drop("Column_B").groupBy("Column_C").agg(F.sum(F.col("Difference")).alias("Difference"))
对于max
:
difference = streamingDataF.withColumn("Difference", expr("Column_A - Column_B")).drop("Column_A").drop("Column_B").groupBy("Column_C").agg(F.max(F.col("Difference")).alias("Difference"))
然后:
differenceStream = difference.writeStream\
.queryName("diff_aggr")\
.format("memory").outputMode("append")\
.start()
关键是 - 如果你这样做 groupBy
你还需要通过聚合来减少。如果您想将值排序在一起,请尝试df.sort(...)
【讨论】:
如何将 C 列和 D 列聚合在一起? 你的意思是:difference = streamingDataF.withColumn("Difference", expr("Column_A - Column_B")).drop("Column_A").drop("Column_B").groupBy("Column_C", "Column_D").agg(F.max(F.col("Difference")).alias("Difference_max"), F.min(F.col("Difference")).alias("Difference_min"))
?以上是关于A 列和 B 列之间的流差由 C 列和 D 列汇总的主要内容,如果未能解决你的问题,请参考以下文章