Flink 滑动计数窗口行为

Posted

技术标签:

【中文标题】Flink 滑动计数窗口行为【英文标题】:Flink Sliding count window behavior 【发布时间】:2018-08-25 06:11:13 【问题描述】:

假设我们有这样的数据结构:

Tuple2<ArryaList<Long>, Integer>

第一个字段是长度为 1 的 ArrayList,其中包含时间戳,而整数字段是一个介于 1 和 40 之间的数字,名为 channel。目标是使用相同的键 (channel) 聚合每 400 条消息,并在它们上应用 ReduceFunction(它只是将 400 条消息的时间戳合并到元组的第一个字段中)。 我将channel 字段设置为消息的键并创建一个400 的计数窗口。例如,如果我们有160000 消息作为输入,它应该输出160000/400 = 400 行和计数窗口按需要工作。问题是当我使用滑动计数窗口时,我的预期行为是:

Flink 为每个channel 数字创建逻辑窗口,并应用ReduceFunction 第一次,如果逻辑窗口的长度达到400,之后每100个输入数据,相同key 作为逻辑窗口的键,也会调用ReduceFunction 来表示窗口中的最后 400 条消息,所以我们应该有:

160000 - 400 = 159600 // 第一个 400 输入将第一次调用 reduce 函数 159600 / 100 = 1596 // 在前 400 个输入之后,对于每 100 个输入,Flink 调用最后 400 个输入的 reduce 函数 1 + 1596 = 1597 // 输出行数

但运行滑动计数窗口,它会输出 1600 行长度可变的行。 (我预计输出的长度只有 400)

要点:length我是指ArrayList的大小(Tuple2的第一个字段)

前40个通道-->长度为100 第二个40通道-->长度为299 第三个40通道-->长度598 第四个40通道-->长度997 剩下40个通道-->长度为400

我如何证明这种行为的合理性并实现我想要的滑动计数窗口?

这里是源代码:

DataStream<Tuple2<ArrayList<Long>, Integer>> data ;
data.keyBy(1).countWindow(400, 100)
                 .reduce(new ReduceFunction<Tuple2<ArrayList<Long>, Integer>>() 
             @Override
             public Tuple2<ArrayList<Long>, Integer> reduce(Tuple2<ArrayList<Long>, Integer> t0, Tuple2<ArrayList<Long>, Integer> t1) throws Exception 
                 t0.f0.add(t1.f0.get(0));
                 return t0;
             
         ).writeAsText("results400").setParallelism(1);

更新:根据@DavidAnderson 的建议,我也尝试在ReduceFunstion 中创建一个新的元组,而不是修改t0,但结果是一样的。

public Tuple2<ArrayList<Long>, Integer> reduce(Tuple2<ArrayList<Long>, Integer> t0, Tuple2<ArrayList<Long>, Integer> t1) throws Exception 
                         ArrayList<Long> times = t0.f0;

                         times.addAll(t1.f0);

                         return new Tuple2<>(times, t0.f1) ;
                     

【问题讨论】:

你有没有试过让reduce函数创建一个新的元组,而不是修改t0? @DavidAnderson 我根据您的建议更新了问题。它产生了相同的输出。 @DavidAnderson 您能否详细说明 Flink 中滑动计数窗口的行为?例如对于countWindow(400, 100),根据Sliding Windows 的定义,我希望它为最后400 条消息调用ReduceFunction,每100 条输入消息。我说的对吗? 【参考方案1】:

这是countWindow的实现

public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) 
    return window(GlobalWindows.create())
            .evictor(CountEvictor.of(size))
            .trigger(CountTrigger.of(slide));

它的行为与您期望的不太一样。窗口每 100 个元素(幻灯片)触发一次,无论它是否包含 400 个元素(大小)。大小控制最多保留多少个元素。

【讨论】:

【参考方案2】:

感谢David Anderson的建议,将ReduceFunction修改为以下解决问题。我们应该在ReduceFunction 中创建一个新对象:

public Tuple2<ArrayList<Long>, Integer> reduce(Tuple2<ArrayList<Long>, Integer> t0, Tuple2<ArrayList<Long>, Integer> t1) throws Exception 
                         ArrayList<Long> times = new ArrayList<>();

                         times.addAll(t0.f0);
                         times.addAll(t1.f0);


                         return new Tuple2<>(times, t0.f1) ;
                     

请注意,问题中的两种 reduce 方法都会导致输出不正确。 现在输出如下:

前40个通道-->长度为100 第二个40通道-->长度为200 第三个40通道-->长度300 每40个通道剩余-->长度为400

所以 Flink 滑动计数窗口的行为是它调用ReduceFunction 每个滑动计数输入消息。所以在我们有 160000 条输入消息的情况下,结果编号应该是: 160000/100 = 1600

【讨论】:

以上是关于Flink 滑动计数窗口行为的主要内容,如果未能解决你的问题,请参考以下文章

Apache Flink 两种类型的窗口,时间和“计数”窗口

flink计数不同的问题

「Flink」使用Managed Keyed State实现计数窗口功能

R语言计算时间窗口内的统计值(滑动平均滑动最大值滑动中位数滑动计数滑动总和等)实战

Hive SQL,在滑动 10 分钟窗口中找到最大计数

flink流计算随笔