Flink事件时间和水印详解
Posted 长臂人猿
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink事件时间和水印详解相关的知识,希望对你有一定的参考价值。
前言
Flink使用版本:1.12.1。
水印是一个标记的时间戳,是一个标记:意味着水印代表时间前的数据均已到达(人为的设定——开发人员可以控制延迟和完整性之间的权衡),这一点水印保障了乱序问题的解决(这很重要,特别是多分区kafka消费)。因为在流处理中,面对乱序问题,你不可能一直等待数据的到达而不去对数据进行操作(尤其像是聚合操作这类操作)。故此面对超时到达的数据你必须进行处理,如何判断超时数据——水印,你也可以设置一定的延迟时间。这两点(解决乱序;允许延迟)也是水印的主要功能。第二点通常与窗口一起使用:水印能拒绝过期数据,但是不能将流式数据进行“短暂”的“批处理”,所以用到窗口。
时间分类
Flink 中的时间分为三种:
事件时间(Event Time)指的是数据产生的时间,这个时间一般由数据生产方自身携带,比如 Kafka 消息,每个生成的消息中自带一个时间戳代表每条数据的产生时间。
- 事件时间(Event Time),即事件实际发生的时间,生产时间;
- 摄入时间(Ingestion Time),事件进入Flink流处理框架的时间;
- 处理时间(Processing Time),事件被处理的时间。
在Flink 1.12版本前可以设置Flink主程时间特征:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置Flink系统使用时间为事件时间EventTime , 即时间特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
但是在在1.12版本后,该方法setStreamTimeCharacteristic被弃用!(FLIP-134: Batch execution for the DataStream API)时间特征默认为EvnetTime,若使用的是IngestionTime需要在手动实现水印策略WatermarkStrategy 接口
,具体见下文的水印的产生
。
水印的意义
官网有言:如果你想使用事件时间(EventTime),你还需要提供一个时间戳提取器(Timestamp Extractor)和水印生成器(Watermark Generator)。
水印所做的
——它们定义了何时停止等待较早的事件:水印是 Flink 判断迟到数据的标准,同时也是窗口触发的标记
(窗口结束条件之一),是开发人员可以控制延迟和完整性之间的权衡。与批处理不同,在批处理中,人们可以在产生任何结果之前拥有完整的输入知识,而在流式处理中,则最终必须停止等待查看更多输入,并产生某种结果。流式处理中聚合事件(如count,sum)与批处理不同,需要在一定的范围内运算,而流是无界的,那么我们就需要搭配窗口
来限定这个范围。窗口是这个范围限定,而水印则是范围的延迟
。水印能拒绝过期数据,但是不能将流式数据进行“短暂”的“批处理”,所以用到窗口。
程序并行度大于 1 的情况下,会有多个流产生水印和窗口,这时候 Flink 会选取时间戳最小的水印。
设计模拟数据源
我们模拟kafka数据自带eventTime时间戳(最后一个),生产数据格式:(oneToic,6)
+ eventTime,逐渐递增。
package com.cbry.windows;
import java.util.Random;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
public class WindowImageSource implements SourceFunction<Tuple2<String, Long>>{
private static final long serialVersionUID = 1L;
private boolean is_Running = true;
@Override
public void run(SourceContext<Tuple2<String, Long>> ctx) throws Exception {
// TODO Auto-generated method stub
Random random = new Random();
int i = 1;
while(is_Running) {
Tuple2<String, Long> element = new Tuple2<String, Long>("oneToic",(long)i);
//ctx.collect(element);
//生成水印
if(i % 6 ==0) {
ctx.collectWithTimestamp(element, (long)i*1000+2617160286000L);
}else {
ctx.collectWithTimestamp(element, (long)i*1000+1617160286000L);
}
i++;
//每1秒一个数据
Thread.sleep(1000);
}
}
@Override
public void cancel() {
// TODO Auto-generated method stub
is_Running=false;
}
}
水印的产生
水印的使用
WatermarkStrategy的静态实现策略
在老版本中Watermark 的生成方式有两种:
- AssignerWithPeriodicWatermarks 生成周期水印,周期默认的时间是 200ms;
- AssignerWithPunctuatedWatermarks 按需生成水印。
为了避免代码的重复,在flink 1.11 中对flink的水印生成接口进行了重构:出现了新的方法:
dataStream.assignTimestampsAndWatermarks( WatermarkStrategy watermarkStrategy )
assignTimestampsAndWatermarks里面的参数可以是 WatermarkStrategy接口
里面开箱即用的静态方法:内置水印生成器
-
forMonotonousTimestamps:为
时间戳单调递增
的情况创建水印策略。这个也就是相当于以event(流元素)中的时间戳充当了水印。 -
forBoundedOutOfOrderness(Duration maxOutOfOrderness):
固定延迟生成水印
:好比下面的自定义的水印生成器,将元素的时间戳,“提前”固定延迟:也就是入参maxOutOfOrderness。比如说event元素的时间戳为11.30分,入参15min,生成水印时间为11.15,我们实际上就允许了11.30这个延迟了15min的数据进入窗口(可能是新窗口,但是被水印拦截掉的不会进入窗口)。关于窗口和水印的结合使用如果有疑问,请看完全文结合用例结果再回过头来看思考一下。
WatermarkStrategy接口
除去静态实现的方法(本质上也官方实现该接口的类),我们关注一下本身这个接口的核心方法,我们可以自己自定义实现该接口实现自己个性化的水印策略
:
public interface WatermarkStrategy<T>
extends TimestampAssignerSupplier<T>,
WatermarkGeneratorSupplier<T>{
/**
* Instantiates a {@link TimestampAssigner} for assigning timestamps according to this
* strategy.
*/
@Override
TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);
/**
* Instantiates a WatermarkGenerator that generates watermarks according to this strategy.
*/
@Override
WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}
使用WatermarkStrategy
这种方式需要一个流并产生一个带有时间戳元素和水印的新流。如果原始流已经有时间戳和/或水印,时间戳分配器会覆盖它们。对于时间戳和水印该接口需要实现createTimestampAssigner和createWatermarkGenerator方法。
createTimestampAssigner
实例化一个TimestampAssigner用于根据此水印战略分配时间戳。
TimestampAssigner
是一个从事件中提取字段的简单函数。
createWatermarkGenerator
编写一个水印生成器。
对于该方法需要实现的WatermarkGenerator接口
存在两个方法:
-
onEvent:对流数据处理函数;
-
onPeriodicEmit:定期执行函数;
1、定期水印生成器:
onEvent方法什么也不写,赋予周期方法onPeriodicEmit逻辑。
public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> {
private final long maxTimeLag = 5000; // 5 seconds
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
// 什么也不做,我们专注于周期生产水印
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));
}
}
2、特殊事件水印生成器:
onPeriodicEmit周期方法什么也不做,赋予onEvent方法对每个事件(流数据)逻辑。
public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
if (event.hasWatermarkMarker()) {
output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));
}
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 什么也不做,我们专注于每个事件(流数据)的逻辑
}
}
自定义实现水印生产者
package com.cbry.windows;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.java.tuple.Tuple2;
/**
* 实现一个简单的周期性的发射水印的例子
* 在这个onEvent方法里,我们从每个元素里抽取了一个时间字段,但是我们并没有生成水印发射给下游,而是自己保存了在一个变量里,在onPeriodicEmit方法里,使用最大的日志时间减去我们想要的延迟时间作为水印发射给下游。
* **/
public class MyWaterMarks implements WatermarkGenerator<Tuple2<String,Long>>{
private long maxTimestamp;
//设置允许乱序时间
private long delay = 5000;
/*
* 为每个事件调用,允许水印生成器检查并记住事件时间戳,或基于事件本身发出水印。
* */
@Override
public void onEvent(Tuple2<String, Long> event, long eventTimestamp, WatermarkOutput output) {
// TODO Auto-generated method stub
//记录最新的数据时间的值
//maxTimestamp = Math.max(maxTimestamp, event.f1);
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
System.err.println("maxTimestamp: " + maxTimestamp + " eventTimestamp: " + eventTimestamp);
}
/*
* 定期调用,并且可能会发出新的水印。
* */
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// TODO Auto-generated method stub
//返回实际可接纳的时间,保障已有的数据时间 《= 水印
output.emitWatermark(new Watermark(maxTimestamp - delay));
//System.err.println("水印:" + new Watermark(maxTimestamp - delay).toString());
}
}
onEvent函数里面的 说明:即数据携带的时间戳信息;我们用弃用方法:设置成进入程序时间(IngestionTime)。打印的是当前时间,而非数据源产生携带时间。如果不设置(Flink 1.12默认为EventTime)即为正确时间。
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
maxTimestamp = Math.max(maxTimestamp, event.f1); //便于展示:maxTimestamp打印的是event数据内容
自定义水印搭配滚动时间窗口效果
上面的模拟Source,为了便于描述我们从(onTopic,1)
作为第一条数据开始生产。
实现代码
public class WaterMark {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//设置水印生成时间间隔100ms
env.getConfig().setAutoWatermarkInterval(100);
//设置Flink系统使用时间为事件时间EventTime , 即时间特征
//env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
//设置一个延迟6秒的固定水印
DataStream<Tuple2<String,Long>> source = env.addSource(new WindowImageSource());
source.print("in ");
/* env.socketTextStream("localhost", 9999);*/
SingleOutputStreamOperator<Tuple2<String,Long>> dataStream = source.map(data -> new Tuple2<String,Long>(
data.f0,data.f1
// data.split(",")[0], Long.parseLong(data.split(",")[1].trim())
)).returns(Types.TUPLE(Types.STRING, Types.LONG)); //使用Lambda表达式返回必须指定返回类型
dataStream.assignTimestampsAndWatermarks(
//WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(6))
//WatermarkStrategy.forMonotonousTimestamps()
//WatermarkStrategy.forGenerator(new MyWaterMarks);
new WatermarkStrategy<Tuple2<String,Long>>(){
private static final long serialVersionUID = 1L;
@Override
public WatermarkGenerator<Tuple2<String, Long>> createWatermarkGenerator(
org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier.Context context) {
// TODO Auto-generated method stub
return new MyWaterMarks();
}
}
).keyBy((event) -> event.f0)
//.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.window(TumblingEventTimeWindows.of(Time.seconds(6)))
//.max(1)
.process(new ProcessWindowFunction<Tuple2<String,Long>, String, String, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
long count = 0;
//集合
ArrayList<Long> conllect = new ArrayList<Long>();
for (Tuple2<String, Long> in: input) {
conllect.add(in.f1);
count++;
}
out.collect("Window: " + context.window() + "count: " + count + " 数据:" + input.toString());
}
})
.printToErr("out ")
;
env.execute("Cbry WaterMark Test");
}
}
水印和窗口初始参数
水印生产器:允许慢5s的数据加入 , 水印比事件时间EventTime提前5s;
滚动窗口6s ;
输出结果
思考第一个窗口的开始时间的区间怎么确定的?答在第一个窗口。
in > (oneToic,1)
maxTimestamp: 1617160287000 eventTimestamp: 1617160287000
in > (oneToic,2)
maxTimestamp: 1617160288000 eventTimestamp: 1617160288000
in > (oneToic,3)
maxTimestamp: 1617160289000 eventTimestamp: 1617160289000
in > (oneToic,4)
maxTimestamp: 1617160290000 eventTimestamp: 1617160290000
in > (oneToic,5)
maxTimestamp: 1617160291000 eventTimestamp: 1617160291000
in > (oneToic,6)
maxTimestamp: 2617160292000 eventTimestamp: 2617160292000水印:Watermark @ 2617160287000 (2052-12-07 12:58:07.000)
out > Window: TimeWindow{start=1617160284000, end=1617160290000}count: 3 数据:[(oneToic,1), (oneToic,2), (oneToic,3)]
out > Window: TimeWindow{start=1617160290000, end=1617160296000}count: 2 数据:[(oneToic,4), (oneToic,5)]
in > (oneToic,7)
maxTimestamp: 2617160292000 eventTimestamp: 1617160293000
in > (oneToic,8)
maxTimestamp: 2617160292000 eventTimestamp: 1617160294000… …
水印:Watermark @ 2617160287000 (2052-12-07 12:58:07.000)
in > (oneToic,12)
maxTimestamp: 2617160298000 eventTimestamp: 2617160298000
水印:Watermark @ 2617160293000 (2052-12-07 12:58:13.000)… …
水印:Watermark @ 2617160293000 (2052-12-07 12:58:13.000)
in > (oneToic,18)
maxTimestamp: 2617160304000 eventTimestamp: 2617160304000
水印:Watermark @ 2617160299000 (2052-12-07 12:58:19.000)
out > Window: TimeWindow{start=2617160292000, end=2617160298000}count: 1 数据:[(oneToic,6)]
具体加上水印输出效果图(0.1s有点小,可以适当调大):
分析前,先了解配合使用的eventTimeWindows窗口:
EventTimeWindow
疑惑:数据的事件时间(EventTime)大于窗口,会否终止?
答曰:窗口是左闭右开的,形式为:[window_start_time,window_end_time),故EventTime大于等于窗口建立新窗口。
EventTimeWindow结果是按照Event Time的时间窗口计算得出的,而无关系统的时间(包括输入的快慢)。
首先根据event流数据的时间戳EventTime创建第一个窗口,窗口Window会在以下的条件满足时被触发执行:
- watermark时间 >= window_end_time;
- 在[window_start_time,window_end_time)中有数据存在(入窗);
官网有对应的描述:Interaction of watermarks and windows :水印与窗口的交互列出了这两点,但是晦涩难懂,意思就是上面的两点意思。
分析
第一个窗口
第一条数据:1617160287000
第一个窗口:TimeWindow{start=1617160284000, end=1617160290000}
我们在生产数据的时候以6结尾的数据
:(long)i*1000+2617160286000L
in > (oneToic,6)
maxTimestamp: 2617160292000 eventTimestamp: 2617160292000
关于窗口初始化的范围:由TumblingEventTimeWindows源码知:由窗口大小size,和
protected TumblingEventTimeWindows(long size, long offset, WindowStagger windowStagger)
public static TumblingEventTimeWindows of(Time size) //多态可设置偏移量:Time offset ,单size构造offset为0
//后面一直追到:WindowStagger.getStaggerOffset —> TimeWindow.getWindowStartWithOffset:
最终层层取余运算只与:窗口大小size
+ 偏移量offset
(这里为0,无关) + 第一个数据时间戳timestamp
+ 当前程序处理时间currentProcessingTime
有关 。
源码有解释道: 窗口的开始和当前处理时间作为偏移量,这样,窗户是交错的。
第二个窗口
第二个窗口:TimeWindow{start=1617160290000, end=1617160296000}
这个时候二窗口从第四个数据(oneToic,4)开始:
in > (oneToic,4)
maxTimestamp: 1617160290000 eventTimestamp: 1617160290000
in > (oneToic,5)
maxTimestamp: 1617160291000 eventTimestamp: 1617160291000
in > (oneToic,6)
maxTimestamp: 2617160292000 eventTimestamp: 2617160292000
in > (oneToic,7)
maxTimestamp: 2617160292000 eventTimestamp: 1617160293000
in > (oneToic,8)
maxTimestamp: 2617160292000 eventTimestamp: 1617160294000
第6个数据(oneToic,6)是 2617160292000源自:
原数据产生:
if(i % 6 ==0) {
ctx.collectWithTimestamp(element, (long)i*1000+2617160286000L);
}
这时候水印超出了窗口
时间:
in > (oneToic,6)
maxTimestamp: 2617160292000 eventTimestamp: 2617160292000
水印:Watermark @ 2617160287000 (2052-12-07 12:58:07.000)
数据不被窗口(1617160290000-1617160296000 )采纳,生成新的窗口三(2617160292000-2617160298000),同时水印更新最大值为2617160287000
,水印意味着——水印前的数据均已到达,再有舍弃。所以后面的7/8/9均被舍弃。
这里水印为2617160287000
的原因:
//自定义水印MyWaterMarks中 // topic6 // 2617160292000 - 5000 = 2617160287000 output.emitWatermark(new Watermark(maxTimestamp - delay)); System.err.println("水印:" + new Watermark(maxTimestamp - delay).toString());
故此二窗口只有:
count: 2 数据:[(oneToic,4), (oneToic,5)]
第三个窗口
【!important】:且第三个窗口根据新的值(topic6的EventTime)来,且到达topic18
的时候水印:2617160299000
>=窗口的结束时间:2617160298000
触发窗口计算—>生成新的窗口。
水印:Watermark @ 2617160293000 (2052-12-07 12:58:13.000)
in > (oneToic,18)
maxTimestamp: 2617160304000 eventTimestamp: 2617160304000
水印:Watermark @ 2617160299000 (2052-12-07 12:58:19.000)
out > Window: TimeWindow{start=2617160292000, end=2617160298000}count: 1 数据:[(oneToic,6)]
那为什么12没有进去三窗口呢?因为12:eventTimestamp: 2617160298000,刚好在窗口的边界上,窗口左开右闭(>=start 且 <end)。
后面每个窗口存放第6的倍数个数据:
水印:Watermark @ 2617160299000 (2052-12-07 12:58:19.000)
in > (oneToic,24)
maxTimestamp: 2617160310000 eventTimestamp: 2617160310000
水印:Watermark @ 2617160305000 (2052-12-07 12:58:25.000)
out > Window: TimeWindow{start=2617160298000, end=2617160304000}count: 1 数据:[(oneToic,12)]
关于迟到数据处理
有两种方法:
- 窗口允许延迟
- 侧流输出
窗口允许延迟
但是有些业务场景需要我们等待一段时间,也就是接受一定范围的迟到数据,此时 allowedLateness 的设置就显得尤为重要。简单地说,allowedLateness 的设置就是对于那些水印通过窗口的结束时间后,还允许等待一段时间。
stream.
.keyBy(...)
.window(...)
.allowedLateness(Time.seconds(10))
.process(...);
侧流输出
我们通过调用WindowedStream.sideOutputLateData()方法将迟到数据发送到指定OutputTag的侧输出流。
关于侧流输出请看:Flink 三种数据流分流(推荐旁路分流)
OutputTag<Event> lateTag = new OutputTag<Event>("late"){};
SingleOutputStreamOperator<Event> result = stream.
.keyBy(...)
.window(...)
.sideOutputLateData(lateTag)
.process(...);
DataStream<Event以上是关于Flink事件时间和水印详解的主要内容,如果未能解决你的问题,请参考以下文章
Flink 1.8 Generating Timestamps, Watermarks 生成时间戳, 水印
Apache Flink:使用事件时间方式处理工业数据和延迟数据