Spark 结构化流式传输 - 为不同的 GroupBy 键使用不同的 Windows

Posted

技术标签:

【中文标题】Spark 结构化流式传输 - 为不同的 GroupBy 键使用不同的 Windows【英文标题】:Spark Structured streaming- Using different Windows for different GroupBy Keys 【发布时间】:2017-11-15 10:30:12 【问题描述】:

目前我通过 Spark 结构化流从 Kafka 主题中读取后得到下表

key,timestamp,value  
-----------------------------------
key1,2017-11-14 07:50:00+0000,10    
key1,2017-11-14 07:50:10+0000,10  
key1,2017-11-14 07:51:00+0000,10    
key1,2017-11-14 07:51:10+0000,10    
key1,2017-11-14 07:52:00+0000,10    
key1,2017-11-14 07:52:10+0000,10  

key2,2017-11-14 07:50:00+0000,10  
key2,2017-11-14 07:51:00+0000,10  
key2,2017-11-14 07:52:10+0000,10  
key2,2017-11-14 07:53:00+0000,10  

我想为每个键使用不同的窗口并执行聚合

例如 key1 将在 1 分钟的窗口上聚合以产生

key,window,sum
------------------------------------------
key1,[2017-11-14 07:50:00+0000,2017-11-14 07:51:00+0000],20  
key1,[2017-11-14 07:51:00+0000,2017-11-14 07:52:00+0000],20  
key1,[2017-11-14 07:52:00+0000,2017-11-14 07:53:00+0000],20  

key2 将在 2 分钟的窗口内聚合以产生

key,window,sum
------------------------------------------
key2,[2017-11-14 07:50:00+0000,2017-11-14 07:52:00+0000],20  
key2,[2017-11-14 07:52:00+0000,2017-11-14 07:54:00+0000],20  

目前我在做以下事情

var l1 = List(List(key1,"60 seconds"),List(key2,"120 seconds"))  
l1.foreachlist => 

    val filtered_df = df.filter($"key" === list(0))

    val windowedPlantSum = filtered_df
        .withWatermark("timestamp", "120 minutes")
        .groupBy(
          window($"timestamp", list(1)),
          $"key"
        )
        .agg(sum("value").alias("sum")

    //start the stream

上述方法启动 2 个单独的流。在我的例子中,有 200 个这样的键启动了 200 个由于内存问题而失败的流。

在 Spark 结构化流中,有什么方法可以根据 Keys 指定窗口,还是有其他方法?

【问题讨论】:

【参考方案1】:

我猜你必须使用mapGroupsWithState 来管理一个查询

来自幻灯片 28:https://www.slideshare.net/databricks/arbitrary-stateful-aggregations-using-structured-streaming-in-apache-spark

还有:

Arbitrary Stateful Processing in Apache Spark’s Structured Streaming Deep dive stateful stream processing Official documentation

【讨论】:

以上是关于Spark 结构化流式传输 - 为不同的 GroupBy 键使用不同的 Windows的主要内容,如果未能解决你的问题,请参考以下文章

嵌套 json 中的结构化流式传输不同模式

如何在不中断流式传输作业的情况下更改 spark spark 流式事件中的 json 架构?

流式传输 Kmeans Spark JAVA

Spark 流式传输 Kafka 消息未使用

使用 Spark 流式传输的 Redshift

Spark 结构化流文件源起始偏移量