Spark Structured Streaming - 如何按最新和聚合计数进行重复数据删除

Posted

技术标签:

【中文标题】Spark Structured Streaming - 如何按最新和聚合计数进行重复数据删除【英文标题】:Spark Structured Streaming - How to deduplicate by latest and aggregate count 【发布时间】:2017-10-06 10:14:24 【问题描述】:

我想使用窗口期执行结构化流式聚合。鉴于以下数据模式。目标是根据用户过滤最近发生的事件。然后汇总每个位置的每种事件类型的计数。

time    location   user   type
 1        A         1      one
 2        A         1      two
 1        B         2      one
 2        B         2      one
 1        A         3      two
 1        A         4      one

示例输出:

location   countOne   countTwo
    A          1         2
    B          1         0

类似于以下内容:

val aggTypes = df
  .select($"location", $"time", $"user", $"type")
  .groupBy($"user")
  .agg(max($"timestamp") as 'timestamp)
  .select("*")
  .withWatermark("timestamp", conf.kafka.watermark.toString + " seconds")
  .groupBy(functions.window($"timestamp", DataConstant.t15min.toString + " seconds", DataConstant.t1min.toString + " seconds", $"location")
  .agg(count(when($"type" === "one", $"type")) as 'countOne, count(when($"type" === "two", $"type" as 'countTwo)))
  .drop($"window")

由于结构化流不支持多个聚合,并且流数据帧/数据集不支持非基于时间的窗口。我不确定是否有可能在 1 个流式查询中实现所需的输出。

感谢任何帮助。

【问题讨论】:

【参考方案1】:

似乎您正在尝试进行无状态聚合。 https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/KeyValueGroupedDataset.html#flatMapGroups(org.apache.spark.api.java.function.FlatMapGroupsFunction,%20org.apache.spark.sql.Encoder)

flatMapGroups 是一个聚合 API,它将一个函数应用于数据集中的每个组。它仅在分组数据集上可用。flatMapGroups 不支持会增加 shuffle 开销的部分聚合。因此,仅使用此 API 进行适合内存的小批量聚合。还建议使用 reduce 函数或 Aggregator。 https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/expressions/Aggregator.html

val count = words.groupByKey(x => x)
            .flatMapGroups
             
              case (x, iterator) ⇒ Iterator((x, iterator.length))
              .toDF("x", "count")        


count.writeStream.format("console").outputMode(OutputMode.Append())

【讨论】:

以上是关于Spark Structured Streaming - 如何按最新和聚合计数进行重复数据删除的主要内容,如果未能解决你的问题,请参考以下文章

Spark Structured Streaming

Spark Structured Streaming

Spark Structured Streaming - 1

删除由 spark-structured-streaming 写入的损坏的 parquet 文件时,我会丢失数据吗?

无法使用Spark Structured Streaming在Parquet文件中写入数据

如何使用Spark Structured Streaming连续监视目录