Spark Structured Streaming - 如何按最新和聚合计数进行重复数据删除
Posted
技术标签:
【中文标题】Spark Structured Streaming - 如何按最新和聚合计数进行重复数据删除【英文标题】:Spark Structured Streaming - How to deduplicate by latest and aggregate count 【发布时间】:2017-10-06 10:14:24 【问题描述】:我想使用窗口期执行结构化流式聚合。鉴于以下数据模式。目标是根据用户过滤最近发生的事件。然后汇总每个位置的每种事件类型的计数。
time location user type
1 A 1 one
2 A 1 two
1 B 2 one
2 B 2 one
1 A 3 two
1 A 4 one
示例输出:
location countOne countTwo
A 1 2
B 1 0
类似于以下内容:
val aggTypes = df
.select($"location", $"time", $"user", $"type")
.groupBy($"user")
.agg(max($"timestamp") as 'timestamp)
.select("*")
.withWatermark("timestamp", conf.kafka.watermark.toString + " seconds")
.groupBy(functions.window($"timestamp", DataConstant.t15min.toString + " seconds", DataConstant.t1min.toString + " seconds", $"location")
.agg(count(when($"type" === "one", $"type")) as 'countOne, count(when($"type" === "two", $"type" as 'countTwo)))
.drop($"window")
由于结构化流不支持多个聚合,并且流数据帧/数据集不支持非基于时间的窗口。我不确定是否有可能在 1 个流式查询中实现所需的输出。
感谢任何帮助。
【问题讨论】:
【参考方案1】:似乎您正在尝试进行无状态聚合。 https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/KeyValueGroupedDataset.html#flatMapGroups(org.apache.spark.api.java.function.FlatMapGroupsFunction,%20org.apache.spark.sql.Encoder)
flatMapGroups 是一个聚合 API,它将一个函数应用于数据集中的每个组。它仅在分组数据集上可用。flatMapGroups 不支持会增加 shuffle 开销的部分聚合。因此,仅使用此 API 进行适合内存的小批量聚合。还建议使用 reduce 函数或 Aggregator。 https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/expressions/Aggregator.html
val count = words.groupByKey(x => x)
.flatMapGroups
case (x, iterator) ⇒ Iterator((x, iterator.length))
.toDF("x", "count")
count.writeStream.format("console").outputMode(OutputMode.Append())
【讨论】:
以上是关于Spark Structured Streaming - 如何按最新和聚合计数进行重复数据删除的主要内容,如果未能解决你的问题,请参考以下文章
Spark Structured Streaming - 1
删除由 spark-structured-streaming 写入的损坏的 parquet 文件时,我会丢失数据吗?