Spark Structured Streaming 中的多重聚合和 Distinct 函数

Posted

技术标签:

【中文标题】Spark Structured Streaming 中的多重聚合和 Distinct 函数【英文标题】:Multiple aggregations and Distinct Function in Spark Structured Streaming 【发布时间】:2020-02-20 23:26:47 【问题描述】:

我需要对来自 Kafka 的流数据进行一些聚合,并每隔 M 秒将前 10 行结果输出到控制台。

    input_df = (
       spark
       .readStream
       .format("kafka")
       .option("kafka.bootstrap.servers", brokers)
       .option("subscribe", "page_views")
       .load()
       .selectExpr('cast(value as string)')
    )

    ...
    ...

    # info has 2 cols: domain, uid  (info = transformation of input_df)
    # It's an example of what I want to do (like in simple pyspark)
    stat = (
        info
        .groupby('domain')
        .agg(
             F.count(F.col('UID')).alias('view'),
             F.countDistinct(F.col('UID')).alias('unique')
        )
        .sort(F.col("view").desc())
        .limit(10)
    )

    query = (
        stat
        .writeStream
        .outputMode("complete")
        .format("console")
        .option("truncate", "true")
        .start()
    )

这个例子没有时间触发,但我可以自己做。 因为不允许使用countDistinct,所以我没有做练习的想法。 我尝试为每个聚合(df_1 =(域,视图),df_2 =(域,唯一))创建 2 个 dfs,然后将 df_1 与 df_2 连接,但也不允许有多个聚合。所以这对我来说是死胡同。 做出决定会很酷。

感谢您的关注!

【问题讨论】:

【参考方案1】:

你可以通过flatMapGroupWithState来实现,它是任意状态函数。此外,它支持追加模式和更新模式。

【讨论】:

以上是关于Spark Structured Streaming 中的多重聚合和 Distinct 函数的主要内容,如果未能解决你的问题,请参考以下文章

Spark Structured Streaming

Spark Structured Streaming

Spark Structured Streaming - 1

删除由 spark-structured-streaming 写入的损坏的 parquet 文件时,我会丢失数据吗?

无法使用Spark Structured Streaming在Parquet文件中写入数据

如何使用Spark Structured Streaming连续监视目录