Flink Streaming Python API - reduce() 产生增量结果而不是最终值

Posted

技术标签:

【中文标题】Flink Streaming Python API - reduce() 产生增量结果而不是最终值【英文标题】:Flink Streaming Python API - reduce() produces incremental results instead of a final value 【发布时间】:2019-05-13 12:49:52 【问题描述】:

我正在尝试使用 Python API 进行流式传输在 Flink 上实现 Kmeans 聚类算法。我正在根据第 0 个索引执行 key_by,然后尝试在每个组上使用 reduce() 以获得一种计数聚合。

class CentroidAccumulator(ReduceFunction):                                                                                                                                       
    def reduce(self, val1, val2):                                                                                                                                                
        id1, point1, count1 =  val1                                                                                                                                              
        id2, point2, count2 =  val2                                                                                                                                              
        return (id1, point1.add(point2), count1+count2)   

class Selector(KeySelector):                                                                                                                                                     
    def getKey(self, value):                                                                                                                                                     
        return value[0]   


nearest_points = points \                                                                                                                                                
                .map(SelectNearestPoint(centroids)) \                                                                                                                            
                .key_by(Selector()).reduce(CentroidAccumulator()) 
nearest_points.write_as_text("output.txt")

预期结果:

(1, <tuple>, count)
(2, <tuple>, count)
(3, <tuple>, count)
(4, <tuple>, count)

实际结果:

我得到了写入文件的所有迭代的输出(我正在测试的样本中有 40 个点,因此输出有 40 行这样的)

(1, <kmeans_clustering.Point instance at 0x2>, 1)                                                                                                                                
(3, <kmeans_clustering.Point instance at 0x3>, 1)                                                                                                                                
(2, <kmeans_clustering.Point instance at 0x4>, 1)                                                                                                                                
(2, <kmeans_clustering.Point instance at 0x5>, 2)                                                                                                                                
.
.
.                                                                                                                
(2, <kmeans_clustering.Point instance at 0x20>, 13)                                                                                                                              
(2, <kmeans_clustering.Point instance at 0x21>, 14)                                                                                                                              
(1, <kmeans_clustering.Point instance at 0x22>, 10)                                                                                                                              
(4, <kmeans_clustering.Point instance at 0x23>, 4)                                                                                                                               
(2, <kmeans_clustering.Point instance at 0x24>, 15)                                                                                                                              
(2, <kmeans_clustering.Point instance at 0x25>, 16)                                                                                                                              
(1, <kmeans_clustering.Point instance at 0x26>, 11)                                                                                                                              
(4, <kmeans_clustering.Point instance at 0x27>, 5)                                                                                                                               
(2, <kmeans_clustering.Point instance at 0x28>, 17)                                                                                                                              
(2, <kmeans_clustering.Point instance at 0x29>, 18) 

问题是它正在减少,但我只想获得每个组的 reduce 转换的最后一个值(据我所知,reduce 应该如何工作)。我做错了什么?

【问题讨论】:

【参考方案1】:

你没有做错任何事;这是流式减少功能的预期行为。从概念上讲,数据流是无穷无尽的数据流——因此“等到最后”才产生结果是没有意义的。流媒体程序的标准行为是为每个事件生成一个结果。

当然,这可能有点不方便。如果你只想看到最终的结果,那么必须有某种方式表明结束已经到来。对于批处理程序,这很自然。对于流式应用程序,有限数据源发送一个值为 MAX_WATERMARK 的水印,可用于检测输入是否已到达终点——您可以在带有事件时间计时器的 ProcessFunction 中捕捉到这一点,但这是一个有点复杂的解决方案。您还可以使用 windows 来实现一种解决方法。

【讨论】:

有道理。使用.key_by(Selector()) .time_window(milliseconds(50)) .reduce(CentroidAccumulator()) 工作我有点不喜欢在 ProcessFunction 解决方案中使用捕获。

以上是关于Flink Streaming Python API - reduce() 产生增量结果而不是最终值的主要内容,如果未能解决你的问题,请参考以下文章

Apache Flink -Streaming(DataStream API)

Flink SQL管理平台flink-streaming-platform-web安装搭建

Kafka + Flink 出现异常 java.lang.NoClassDefFoundError: org/apache/flink/streaming/util/serialization/Des

FlinkFlink SQL 开源UI平台 flink-streaming-platform-web

ClassNotFoundException:org.apache.flink.streaming.api.checkpoint.CheckpointNotifier 同时消费一个 kafka 主题

翻译Flink Table Api & SQL —Streaming 概念 ——时间属性