Flink Window

Flink 是一个支持流计算和批计算的计算框架,其中流计算和批计算的桥接就是 Window。本文会详细讲解 Window 相关的东西。

1 Window 初探

Windows are at the heart of processing infinite streams. Windows split the stream into “buckets” of finite size, over which we can apply computations.

在参考文档[1] 中,我们可以看到 Window 会将无限流分割成有限流,所有的 Window 都跟在 KeyedStream 上,不过有显示和隐式之分,其中隐式 KeyedStream 的情况下,是将所有数据进行一次 keyBy(new NulllByteKeySelector<T>()区别如下:

显示 Keyed Stream 
       .keyBy(...)               <-  
       .window(...)              <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"隐式 KeyedStream
       .windowAll(...)           <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

上述代码中 […] 中的操作是可选的。

2 Window 相关组件

2.1 WindowAssigner

WindowAssigner 负责将每条 accord 划分到零个或多个 Window,不同的 assigner 会生成不同的 Window 类型。现在 Flink 中有如下几种 Window 类型

2.1.1 Tumbling Window

使用 TumblingEventTimeWindows/TumblingProcessingTimeWindows 作为 WindowAssigner.生成的 Window 如下图所示,其中每条消息属于且仅属于一个 Window。
Tumbling Window

DataStream<T> input = ...;// tumbling event-time windowsinput
    .keyBy(<key selector>)
    .<windowed transformation>(<window function>);// tumbling processing-time windowsinput
    .keyBy(<key selector>)
    .<windowed transformation>(<window function>);// daily tumbling event-time windows offset by -8 hours.input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>);

2.1.2 Sliding Window

使用 SlidingEventTimeWindow/SlidingProcessingTimeWindow 作为 WindowAssigner,生成的 Window 如下所示,每条消息可能处于多个 Window 当中

DataStream<T> input = ...;// sliding event-time windowsinput
    .keyBy(<key selector>)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>);// sliding processing-time windowsinput
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>);// sliding processing-time windows offset by -8 hoursinput
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    .<windowed transformation>(<window function>);

2.1.3 SessionWindow

使用 EventTimeSessionWindows 作为 WindowAssigner,生成的 Window 如下所示,SessionWinodw 对于每条消息首先会生成一个单独的 Window,然后根据相应条件进行 Window 的合并

DataStream<T> input = ...;// event-time session windows with static gapinput
    .keyBy(<key selector>)
    .<windowed transformation>(<window function>);// event-time session windows with dynamic gapinput
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withDynamicGap((element) -> {        // determine and return session gap
    .<windowed transformation>(<window function>);// processing-time session windows with static gapinput
    .keyBy(<key selector>)
    .<windowed transformation>(<window function>);// processing-time session windows with dynamic gapinput
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {        // determine and return session gap
    .<windowed transformation>(<window function>);

2.1.4 Global Window

GlobalWindow 使用 GlobalWindows 作为 WindowAssigner,所有的元素会处于同一个 Window,如下图所示:

DataStream<T> input = ...;

    .keyBy(<key selector>)
    .<windowed transformation>(<window function>);

2.2 WindowFunction

在 Window 之后需要接相关的处理逻辑,也就是 WindowFunction,Flink 中提供了一些常用的 WindowFunction ,同时支持自定义

2.2.1 Flink 提供的常用 WindowFunction

1 ReduceFunction

  1.  DataStream<Tuple2<String, Long>> input = ...;
         .keyBy(<key selector>)
         .window(<window assigner>)
         .reduce(new ReduceFunction<Tuple2<String, Long>> {       public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {         return new Tuple2<>(v1.f0, v1.f1 + v2.f1);

    2 AggregateFunction

private static class AverageAggregate
 implements AggregateFunction<tuple2, Tuple2, Double> {
public Tuple2createAccumulator() {
 return new Tuple2<>(0L, 0L);

public Tuple2add(Tuple2value, Tuple2accumulator) {
 return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);

public Double getResult(Tuple2accumulator) {
 return ((double) accumulator.f0) / accumulator.f1;

public Tuple2merge(Tuple2a, Tuple2b) {
 return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);

DataStream<tuple2> input = …;</tuple2

    .aggregate(new AverageAggregate());

3 FoldFunction
DataStream<Tuple2<String, Long>> input = ...;

    .keyBy(<key selector>)
    .window(<window assigner>)
    .fold("", new FoldFunction<Tuple2<String, Long>, String>> {
       public String fold(String acc, Tuple2<String, Long> value) {
         return acc + value.f1;

2.2.2 自定义 WindowFunction

DataStream<Tuple2<String, Long>> input = ...;

    .keyBy(<key selector>)
    .window(<window assigner>)
    .process(new MyProcessWindowFunction());/* ... */public class MyProcessWindowFunction extends ProcessWindowFunction<Tuple<String, Long>, String, String, TimeWindow> {  void process(String key, Context context, Iterable<Tuple<String, Long>> input, Collector<String> out) {    long count = 0;    for (Tuple<String, Long> in: input) {
    out.collect("Window: " + context.window() + "count: " + count);

2.3 Trigger

Trigger 是所有会触发相应的操作 — 比如执行用户自定义的逻辑,主要有以下三个接口

public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;

其中 onElement 是每来一条消息都会触发,onProcessingTime 则是由 processing-time timer 进行触发,onEventTimeevent-time timer  进行触发。

Trigger 会有不同的结果,分别是 CONTINUE ,FIRE_AND_PURGE ,FIRE 以及 PURGE 其中 CONTINUE 不做任何操作,FIRE 仅仅触发操作而不清空 Window 的元素,PURGE 则情况 Window 的元素,FIRE_AND_PURGE 中是触发操作后情况 Window 的元素

2.4 其他

  • Evictor 则可以在 Trigger 触发之后,实际执行用户逻辑之前或者之后对 Window 中的元素进行处理

  • Lateness 是否容忍晚到的数据,以及容忍晚到多久,

  • SideOutput 表示超过 windowSize + lateness 之后来的数据怎么处理

3 Window 的生命周期

上面说了 Window 的基本概念和组件,本节说说 Window 的整个生命周期,这里以 EventTime_SlidingWindow  为例。

DataStream<T> input = ...;// sliding event-time windowsinput
    .keyBy(<key selector>)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>);

上面的代码会以 EventTime 为衡量标准,每 5 秒生成一个 window,window 的持续时间为 10 秒。当 operator 接收到的 watermark 超过某个 window 的 endTime 时,会 trigger 该 window,执行具体的 window function。

整个 Window 的处理过程大致如下


[1] Windows

