Spark Structured Streaming 中的多重聚合和 Distinct 函数
Posted
技术标签:
【中文标题】Spark Structured Streaming 中的多重聚合和 Distinct 函数【英文标题】:Multiple aggregations and Distinct Function in Spark Structured Streaming 【发布时间】:2020-02-20 23:26:47 【问题描述】:我需要对来自 Kafka 的流数据进行一些聚合,并每隔 M 秒将前 10 行结果输出到控制台。
input_df = (
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", "page_views")
.load()
.selectExpr('cast(value as string)')
)
...
...
# info has 2 cols: domain, uid (info = transformation of input_df)
# It's an example of what I want to do (like in simple pyspark)
stat = (
info
.groupby('domain')
.agg(
F.count(F.col('UID')).alias('view'),
F.countDistinct(F.col('UID')).alias('unique')
)
.sort(F.col("view").desc())
.limit(10)
)
query = (
stat
.writeStream
.outputMode("complete")
.format("console")
.option("truncate", "true")
.start()
)
这个例子没有时间触发,但我可以自己做。 因为不允许使用countDistinct,所以我没有做练习的想法。 我尝试为每个聚合(df_1 =(域,视图),df_2 =(域,唯一))创建 2 个 dfs,然后将 df_1 与 df_2 连接,但也不允许有多个聚合。所以这对我来说是死胡同。 做出决定会很酷。
感谢您的关注!
【问题讨论】:
【参考方案1】:你可以通过flatMapGroupWithState来实现,它是任意状态函数。此外,它支持追加模式和更新模式。
【讨论】:
以上是关于Spark Structured Streaming 中的多重聚合和 Distinct 函数的主要内容,如果未能解决你的问题,请参考以下文章
Spark Structured Streaming - 1
删除由 spark-structured-streaming 写入的损坏的 parquet 文件时,我会丢失数据吗?