Flink Window
Posted 心智成熟之路
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink Window相关的知识,希望对你有一定的参考价值。
Flink 是一个支持流计算和批计算的计算框架,其中流计算和批计算的桥接就是 Window。本文会详细讲解 Window 相关的东西。
1 Window 初探
Windows are at the heart of processing infinite streams. Windows split the stream into “buckets” of finite size, over which we can apply computations.
在参考文档[1] 中,我们可以看到 Window 会将无限流分割成有限流,所有的 Window 都跟在 KeyedStream 上,不过有显示和隐式之分,其中隐式 KeyedStream 的情况下,是将所有数据进行一次 keyBy(new NulllByteKeySelector<T>()
区别如下:
显示 Keyed Stream
stream
.keyBy(...) <-
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"隐式 KeyedStream
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
上述代码中 […] 中的操作是可选的。
2 Window 相关组件
2.1 WindowAssigner
WindowAssigner 负责将每条 accord 划分到零个或多个 Window,不同的 assigner 会生成不同的 Window 类型。现在 Flink 中有如下几种 Window 类型
2.1.1 Tumbling Window
使用 TumblingEventTimeWindows/TumblingProcessingTimeWindows
作为 WindowAssigner.生成的 Window 如下图所示,其中每条消息属于且仅属于一个 Window。
Tumbling Window
DataStream<T> input = ...;// tumbling event-time windowsinput
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);// tumbling processing-time windowsinput
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);// daily tumbling event-time windows offset by -8 hours.input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
2.1.2 Sliding Window
使用 SlidingEventTimeWindow/SlidingProcessingTimeWindow
作为 WindowAssigner,生成的 Window 如下所示,每条消息可能处于多个 Window 当中
DataStream<T> input = ...;// sliding event-time windowsinput
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);// sliding processing-time windowsinput
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);// sliding processing-time windows offset by -8 hoursinput
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
2.1.3 SessionWindow
使用 EventTimeSessionWindows
作为 WindowAssigner,生成的 Window 如下所示,SessionWinodw 对于每条消息首先会生成一个单独的 Window,然后根据相应条件进行 Window 的合并
DataStream<T> input = ...;// event-time session windows with static gapinput
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);// event-time session windows with dynamic gapinput
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap((element) -> { // determine and return session gap
}))
.<windowed transformation>(<window function>);// processing-time session windows with static gapinput
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);// processing-time session windows with dynamic gapinput
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withDynamicGap((element) -> { // determine and return session gap
}))
.<windowed transformation>(<window function>);
2.1.4 Global Window
GlobalWindow 使用 GlobalWindows
作为 WindowAssigner,所有的元素会处于同一个 Window,如下图所示:
DataStream<T> input = ...;
input
.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>);
2.2 WindowFunction
在 Window 之后需要接相关的处理逻辑,也就是 WindowFunction,Flink 中提供了一些常用的 WindowFunction ,同时支持自定义
2.2.1 Flink 提供的常用 WindowFunction
1 ReduceFunction
DataStream<Tuple2<String, Long>> input = ...; input .keyBy(<key selector>) .window(<window assigner>) .reduce(new ReduceFunction<Tuple2<String, Long>> { public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) { return new Tuple2<>(v1.f0, v1.f1 + v2.f1); } });
2 AggregateFunction
private static class AverageAggregate
implements AggregateFunction<tuple2, Tuple2, Double> {
@Override
public Tuple2createAccumulator() {
return new Tuple2<>(0L, 0L);
}</tuple2
@Override
public Tuple2add(Tuple2value, Tuple2accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
}
@Override
public Double getResult(Tuple2accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}
@Override
public Tuple2merge(Tuple2a, Tuple2b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}
DataStream<tuple2> input = …;</tuple2
input
.keyBy()
.window()
.aggregate(new AverageAggregate());
3 FoldFunction
```java
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.fold("", new FoldFunction<Tuple2<String, Long>, String>> {
public String fold(String acc, Tuple2<String, Long> value) {
return acc + value.f1;
}
});
2.2.2 自定义 WindowFunction
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.process(new MyProcessWindowFunction());/* ... */public class MyProcessWindowFunction extends ProcessWindowFunction<Tuple<String, Long>, String, String, TimeWindow> { void process(String key, Context context, Iterable<Tuple<String, Long>> input, Collector<String> out) { long count = 0; for (Tuple<String, Long> in: input) {
count++;
}
out.collect("Window: " + context.window() + "count: " + count);
}
}
2.3 Trigger
Trigger 是所有会触发相应的操作 — 比如执行用户自定义的逻辑,主要有以下三个接口
public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
其中 onElement
是每来一条消息都会触发,onProcessingTime
则是由 processing-time timer
进行触发,onEventTime
由 event-time timer
进行触发。
Trigger 会有不同的结果,分别是 CONTINUE
,FIRE_AND_PURGE
,FIRE
以及 PURGE
其中 CONTINUE
不做任何操作,FIRE
仅仅触发操作而不清空 Window 的元素,PURGE
则情况 Window 的元素,FIRE_AND_PURGE
中是触发操作后情况 Window 的元素
2.4 其他
Evictor 则可以在 Trigger 触发之后,实际执行用户逻辑之前或者之后对 Window 中的元素进行处理
Lateness 是否容忍晚到的数据,以及容忍晚到多久,
SideOutput 表示超过 windowSize + lateness 之后来的数据怎么处理
3 Window 的生命周期
上面说了 Window 的基本概念和组件,本节说说 Window 的整个生命周期,这里以 EventTime_SlidingWindow
为例。
DataStream<T> input = ...;// sliding event-time windowsinput
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
上面的代码会以 EventTime 为衡量标准,每 5 秒生成一个 window,window 的持续时间为 10 秒。当 operator 接收到的 watermark 超过某个 window 的 endTime 时,会 trigger 该 window,执行具体的 window function。
整个 Window 的处理过程大致如下
参考文档
[1] Windows
https://flink.apache.org/news/2015/12/04/Introducing-windows.html
以上是关于Flink Window的主要内容,如果未能解决你的问题,请参考以下文章
flink sql 知其所以然:flink sql tumble window 的奇妙解析之路
1.22.FLINK WatermarkFlink窗口(Window)watermark有什么用?如何使用Watermarks处理乱序的数据流?机制及实例详解生成方式代码实例
11.Flink四大基石Window窗口的分类Flink提供了很多各种场景用的WindowAssigner基于时间的滚动和滑动基于时间的滚动和滑动窗口基于数量的滚动和滑动
11.Flink四大基石Window窗口的分类Flink提供了很多各种场景用的WindowAssigner基于时间的滚动和滑动基于时间的滚动和滑动窗口基于数量的滚动和滑动