Flink Streaming Python API - reduce() 产生增量结果而不是最终值
Posted
技术标签:
【中文标题】Flink Streaming Python API - reduce() 产生增量结果而不是最终值【英文标题】:Flink Streaming Python API - reduce() produces incremental results instead of a final value 【发布时间】:2019-05-13 12:49:52 【问题描述】:我正在尝试使用 Python API 进行流式传输在 Flink 上实现 Kmeans 聚类算法。我正在根据第 0 个索引执行 key_by
,然后尝试在每个组上使用 reduce()
以获得一种计数聚合。
class CentroidAccumulator(ReduceFunction):
def reduce(self, val1, val2):
id1, point1, count1 = val1
id2, point2, count2 = val2
return (id1, point1.add(point2), count1+count2)
class Selector(KeySelector):
def getKey(self, value):
return value[0]
nearest_points = points \
.map(SelectNearestPoint(centroids)) \
.key_by(Selector()).reduce(CentroidAccumulator())
nearest_points.write_as_text("output.txt")
预期结果:
(1, <tuple>, count)
(2, <tuple>, count)
(3, <tuple>, count)
(4, <tuple>, count)
实际结果:
我得到了写入文件的所有迭代的输出(我正在测试的样本中有 40 个点,因此输出有 40 行这样的)
(1, <kmeans_clustering.Point instance at 0x2>, 1)
(3, <kmeans_clustering.Point instance at 0x3>, 1)
(2, <kmeans_clustering.Point instance at 0x4>, 1)
(2, <kmeans_clustering.Point instance at 0x5>, 2)
.
.
.
(2, <kmeans_clustering.Point instance at 0x20>, 13)
(2, <kmeans_clustering.Point instance at 0x21>, 14)
(1, <kmeans_clustering.Point instance at 0x22>, 10)
(4, <kmeans_clustering.Point instance at 0x23>, 4)
(2, <kmeans_clustering.Point instance at 0x24>, 15)
(2, <kmeans_clustering.Point instance at 0x25>, 16)
(1, <kmeans_clustering.Point instance at 0x26>, 11)
(4, <kmeans_clustering.Point instance at 0x27>, 5)
(2, <kmeans_clustering.Point instance at 0x28>, 17)
(2, <kmeans_clustering.Point instance at 0x29>, 18)
问题是它正在减少,但我只想获得每个组的 reduce 转换的最后一个值(据我所知,reduce 应该如何工作)。我做错了什么?
【问题讨论】:
【参考方案1】:你没有做错任何事;这是流式减少功能的预期行为。从概念上讲,数据流是无穷无尽的数据流——因此“等到最后”才产生结果是没有意义的。流媒体程序的标准行为是为每个事件生成一个结果。
当然,这可能有点不方便。如果你只想看到最终的结果,那么必须有某种方式表明结束已经到来。对于批处理程序,这很自然。对于流式应用程序,有限数据源发送一个值为 MAX_WATERMARK 的水印,可用于检测输入是否已到达终点——您可以在带有事件时间计时器的 ProcessFunction 中捕捉到这一点,但这是一个有点复杂的解决方案。您还可以使用 windows 来实现一种解决方法。
【讨论】:
有道理。使用.key_by(Selector()) .time_window(milliseconds(50)) .reduce(CentroidAccumulator())
工作我有点不喜欢在 ProcessFunction 解决方案中使用捕获。以上是关于Flink Streaming Python API - reduce() 产生增量结果而不是最终值的主要内容,如果未能解决你的问题,请参考以下文章
Apache Flink -Streaming(DataStream API)
Flink SQL管理平台flink-streaming-platform-web安装搭建
Kafka + Flink 出现异常 java.lang.NoClassDefFoundError: org/apache/flink/streaming/util/serialization/Des
FlinkFlink SQL 开源UI平台 flink-streaming-platform-web
ClassNotFoundException:org.apache.flink.streaming.api.checkpoint.CheckpointNotifier 同时消费一个 kafka 主题