Flink watermark
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink watermark相关的知识,希望对你有一定的参考价值。
Flink中watermark主要解决保序问题. 而保序问题的根本原因是多个任务同时从流中并行处理数据,顺序无法保证.
上游: 生成watermark
一般在WINDOW 操作之前生成WATERMARK, WATERMARK 有两种:
AssignWithPeriodicWatermarks:
每隔N秒自动向流里注入一个WATERMARK 时间间隔由ExecutionConfig.setAutoWatermarkInterval 决定. 每次调用getCurrentWatermark 方法, 如果得到的WATERMARK 不为空并且比之前的大就注入流中 (emitWatermark)
参考 TimestampsAndPeriodicWatermarksOperator.processElement
AssignWithPunctuatedWatermarks:
基于事件向流里注入一个WATERMARK,每一个元素都有机会判断是否生成一个WATERMARK. 如果得到的WATERMARK 不为空并且比之前的大就注入流中 (emitWatermark)
参考 TimestampsAndPunctuatedWatermarksOperator.processElement
每次生成WATERMARK将覆盖流中已有的WATERMARK
下游: 处理watermark
StatusWatermarkValve 负责将不同Channel 的Watermark 对齐,再传给pipeline 下游,对齐的概念是当前Channel的Watermark时间大于所有Channel最小的Watermark时间
WindowOperator 的处理:
WindowOperator.processElement
- WindowAssigner.assignWindows 为当前的消息分配滑动窗口
常用的有: TumblingEventTimeWindows: 按照消息的 EventTime 分配窗口 (每次生成单个窗口)
TumblingProcessingTimeWindows 按照当前的时间分配窗口 (每次生成单个窗口)
需要配合StreamExecutionEnvironment.setStreamTimeCharacteristic 使用 (默认是TimeCharacteristic.ProcessingTime), 这个必须匹配
否则无法正常触发滑动窗口
实际观察结果:
- 如果使用ProcessingTimeWindows 即使Event 本身的时间落后于窗口时间很多也会被触发
- 无论是否使用WATERMARK,窗口中的数据会有乱序,即后到窗口中的数据早于先到窗口中的数据
- 如果使用EventTimeWindow, 数据和窗口时间对齐不会乱序,同一窗口中的数据不能严格保证顺序,需要SORT.
- 最后一批数据有缺失,缺失的数据取决于WATERMARK的MAXOUTOFORDERNESS
- 默认的WATERMARK算法是根据元素的最大时间决定的,当没有新的元素进入流中的时候,水位不再上涨,再减去MAXOUTOFORDERNESS, 则最后一批数据无法落在水位之下,导致WINDOW无法触发
- 将当前的滑动窗口和对象加入WindowState, 根据不同的应用场景会使用不同的WindowState. WindowState 的类型由WindowedStream的具体操作决定, 生成对应的StateDescriptor, 不同的WindowState 的 add/get 行为会不同. 比如HeapListWindowState 会把当前的对象追加到currentNamespace (即Timewindow) 对应的List 下. 比如HeapAggregateState 会对当前的对象应用Aggregate function 并更新结果
Window 触发的条件
在 WindowOperator 中有两个点会检查窗口是否触发,两者的检查条件有所不同
-
processElement 这是在新的流数据进入时触发
检查条件: watermark时间 >= 窗口最大时间 参见 EventTimeTrigger.onElement
如果窗口不能被触发则调用InteralTimeService.registerEventTimeTimer 注册一个定时器,以KEY+窗口最大时间为条件触发, 到一定时间后定时器会被自动销毁. 时间为窗口最大时间+WindowOperator.allowedLateness WindowOperator.allowedLateness 可以通过 Stream.window(...).allowedLateness(...) 设置. 一般应该略大于WatermarkGenerator 的 maxOutOfOrderness
- onEventTime 或者 onProcessingTime 取决于Watermark的类型, 这是在Watermark更新的时候触发 (InteralTimeService.advanceWatermark). 这时会把当前Watermark 的时间和之前注册的定时器的时间做比较, 如果定时器还存在并且Watermark的时间大于定时器时间则可以触发窗口. 参见 EventTimeTrigger.onEventTime
参考 http://blog.csdn.net/lmalds/article/details/52704170
WATERMARK和普通数据分开处理
如果一个元素来的过晚 element.getTimestamp + allowedLateness < currentWatermark
会有一个特殊的OutputTag 和正常的流数据区分开
参考 https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/side_output.html
如果窗口来的过晚, window.maxTimestamp + allowedLateness < currentWatermark, 则窗口会被直接丢弃
Watermark 的问题:
默认的Watermark机制是数据驱动的,新的数据进入才会触发水位上升, 而由于maxOutOfOrderness 的存在, watermark < 最大流数据时间 < 当前窗口结束时间
根据之前的分析,最新的时间窗口总是不会被触发,除非更新的数据进入再次提高水位到当前窗口结束时间以后, 如果数据进入的频率低或者没有新的数据进入流,那最新的时间窗口被处理的延时会非常高甚至永远不会被触发,这在实时性要求高的流式系统是很致命的. 比如一个银行系统,要做客户账号层面的保序,每个账号的交易可能一天只有几笔甚至一笔,如果我们在Window 处理的时候KEY BY 账号就会引起上述问题. 我们可以考虑KEY BY的条件改为 HASH(账号) 再取模,然后在窗口处理中再次根据账号分组,这样虽然处理复杂一些,但是保证了窗口中数据的频次
另外一种方案是优化WATERMARK生成的机制,如果一段时间后WATERMARK仍然没有变化,那就将WATERMARK自动上涨一次到当前窗口的结束时间,这样保证窗口处理的延时有个上限
public abstract class AbstractWatermarkGenerator<T> implements AssignerWithPeriodicWatermarks<T> {
private static final long serialVersionUID = -2006930231735705083L;
private static final Logger logger = LoggerFactory.getLogger(AbstractWatermarkGenerator.class);
private final long maxOutOfOrderness; // 10 seconds
private long windowSize;
private long currentMaxTimestamp;
private long lastTimestamp = 0;
private long lastWatermarkChangeTime = 0;
private long windowPurgeTime = 0;
private boolean watermarkIncreased = false;
public AbstractWatermarkGenerator(long maxOutOfOrderness, long windowSize) {
this.maxOutOfOrderness = maxOutOfOrderness;
this.windowSize = windowSize;
}
public AbstractWatermarkGenerator() {
this(10000, 10000);
}
protected abstract long extractCurTimestamp(T element) throws Exception;
public long extractTimestamp(T element,
long previousElementTimestamp) {
try {
long curTimestamp = extractCurTimestamp(element);
currentMaxTimestamp = Math.max(curTimestamp, currentMaxTimestamp);
windowPurgeTime = getWindowExpireTime(currentMaxTimestamp);
if (logger.isDebugEnabled()) {
logger.debug("Extracting timestamp: {}", currentMaxTimestamp);
}
return curTimestamp;
} catch (Exception e) {
logger.error("Error extracting timestamp", e);
}
return 0;
}
protected long getWindowExpireTime(long currentMaxTimestamp) {
long windowStart = TimeWindow.getWindowStartWithOffset(currentMaxTimestamp, 0, windowSize);
long windowEnd = windowStart + windowSize;
return windowEnd + maxOutOfOrderness;
}
public Watermark getCurrentWatermark() {
long curTime = new Date().getTime();
if (currentMaxTimestamp > lastTimestamp) {
if (logger.isDebugEnabled()) {
logger.debug("Current max timestamp has been increased since last");
}
lastTimestamp = currentMaxTimestamp;
lastWatermarkChangeTime = curTime;
watermarkIncreased = false;
}
else {
if (curTime - lastWatermarkChangeTime >= (windowPurgeTime - currentMaxTimestamp)
&& watermarkIncreased == false) {
if (logger.isDebugEnabled()) {
logger.debug("Increase current MaxTimestamp once");
}
currentMaxTimestamp = windowPurgeTime;
lastTimestamp = currentMaxTimestamp;
lastWatermarkChangeTime = curTime;
watermarkIncreased = true;
}
}
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}
以上是关于Flink watermark的主要内容,如果未能解决你的问题,请参考以下文章
1.22.FLINK WatermarkFlink窗口(Window)watermark有什么用?如何使用Watermarks处理乱序的数据流?机制及实例详解生成方式代码实例