Flink 水位线机制WaterMark实践 处理乱序消息
Posted 二十六画生的博客
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 水位线机制WaterMark实践 处理乱序消息相关的知识,希望对你有一定的参考价值。
1 水位线(WaterMark)是一个时间戳,等于当前到达的消息最大时间戳减去配置的延迟时间,水位线是单调递增的,如果有晚到达的早消息也不会更新水位线,因为消息最大时间戳没变
水位线 = 消息最大时间戳 - 配置的INTERVAl(offset)时间
2 新消息到达时,才计算新的水位线,如果水位线大于等于窗口的endTime(左闭右开)则触发窗口计算,反之继续接收后续消息;消息的EventTime大于等于窗口beginTime则保留,反之被丢弃
水位线 >= 窗口的endTime,则触发窗口计算
3 消息的EventTime小于水位线时不一定被丢弃;消息的EventTime小于窗口beginTime时才会被丢弃
4 与window一起使用,可以对乱序到达的消息排序后再处理
5 引入水位线机制的目的是延迟窗口触发计算的时间,使晚到达的早的消息尽可能也能被保留,用于窗口计算,提高数据准确性
版本:flink1.9.2,java1.8
1 滚动窗口,延迟5s,window内不排序 :
package WaterMark;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.Date;
/**
* @Author you guess
* @Date 2020/6/20 15:55
* @Version 1.0
* @Desc
*/
public class WaterMarkTest
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<String> dataStream = env.addSource(new SourceFunction<String>()
@Override
public void run(SourceContext<String> ctx) throws Exception
ctx.collect("hello,1553503185000");
Thread.sleep(1000);
ctx.collect("hello,1553503186000");
Thread.sleep(1000);
ctx.collect("hello,1553503187000");
Thread.sleep(1000);
ctx.collect("hello,1553503188000");
Thread.sleep(1000);
ctx.collect("hello,1553503189000");
Thread.sleep(1000);
ctx.collect("hello,1553503190000");
Thread.sleep(1000);
ctx.collect("hello,1553503191000");
Thread.sleep(1000);
ctx.collect("hello,1553503186000");
Thread.sleep(1000);
ctx.collect("hello,1553503187000");
Thread.sleep(1000);
ctx.collect("hello,1553503185000");
Thread.sleep(1000);
ctx.collect("hello,1553503184000"); //丢弃
Thread.sleep(1000);
ctx.collect("hello,1553503183000"); //丢弃
Thread.sleep(1000);
ctx.collect("hello,1553503190000");
Thread.sleep(1000);
ctx.collect("hello,1553503192000");
Thread.sleep(1000);
ctx.collect("hello,1553503193000");
Thread.sleep(1000);
ctx.collect("hello,1553503194000");
Thread.sleep(1000);
ctx.collect("hello,1553503195000");
Thread.sleep(1000);
ctx.collect("hello,1553503196000");
Thread.sleep(1000);
ctx.collect("hello,1553503197000");
Thread.sleep(1000);
ctx.collect("hello,1553503198000");
Thread.sleep(1000);
ctx.collect("hello,1553503199000");
Thread.sleep(1000);
ctx.collect("hello,1553503200000");
Thread.sleep(1000);
ctx.collect("hello,1553503201000");
Thread.sleep(1000);
ctx.collect("hello,1553503202000");
@Override
public void cancel()
, "source1").assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<String>() //周期性水位线
long currentTimeStamp = 0l;
long maxDelayAllowed = 5000l; //延迟5s
long currentWaterMark;
/**
* 周期循环执行,默认是每200ms执行一次该方法
* @return
*/
@Override
public Watermark getCurrentWatermark()
currentWaterMark = currentTimeStamp - maxDelayAllowed;
System.out.println("当前水位线:" + currentWaterMark);
return new Watermark(currentWaterMark);
/**
* 来消息才执行。来消息时先执行该方法extractTimestamp,然后再执行getCurrentWatermark
* @param s
* @param l
* @return
*/
@Override
public long extractTimestamp(String s, long l)
String[] arr = s.split(",");
long eventTime = Long.parseLong(arr[1]);
currentTimeStamp = Math.max(eventTime, currentTimeStamp);
System.out.println("Key:" + arr[0] + ",EventTime:" + eventTime + ",currentTimeStamp:" + currentTimeStamp);
return eventTime;
);
dataStream.map(new MapFunction<String, Tuple2<String, String>>()
@Override
public Tuple2<String, String> map(String s) throws Exception
return new Tuple2<String, String>(s.split(",")[0], s.split(",")[1]);
).keyBy(0)
//.timeWindow(Time.seconds(5)).apply() // 作用同下,一样能获取到窗口开始和结束时间
.window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new WindowFunction<Tuple2<String, String>, String, Tuple, TimeWindow>()
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, String>> input, Collector<String> out) throws Exception
System.out.println("当前窗口开始时间[" + window.getStart() + ",结束时间" + window.getEnd() + ")");
for (Tuple2<String, String> element : input)
out.collect(" - " + element.f1);
).print();
env.execute("Flink WaterMark Test1");
输出:
Key:hello,EventTime:1553503185000,currentTimeStamp:1553503185000
当前水位线:1553503180000
当前水位线:1553503180000
当前水位线:1553503180000
当前水位线:1553503180000
Key:hello,EventTime:1553503186000,currentTimeStamp:1553503186000
当前水位线:1553503181000
当前水位线:1553503181000
当前水位线:1553503181000
当前水位线:1553503181000
当前水位线:1553503181000
Key:hello,EventTime:1553503187000,currentTimeStamp:1553503187000
当前水位线:1553503182000
当前水位线:1553503182000
当前水位线:1553503182000
当前水位线:1553503182000
当前水位线:1553503182000
Key:hello,EventTime:1553503188000,currentTimeStamp:1553503188000
当前水位线:1553503183000
当前水位线:1553503183000
当前水位线:1553503183000
当前水位线:1553503183000
当前水位线:1553503183000
Key:hello,EventTime:1553503189000,currentTimeStamp:1553503189000
当前水位线:1553503184000
当前水位线:1553503184000
当前水位线:1553503184000
当前水位线:1553503184000
当前水位线:1553503184000
Key:hello,EventTime:1553503190000,currentTimeStamp:1553503190000
当前水位线:1553503185000
当前水位线:1553503185000
当前水位线:1553503185000
当前水位线:1553503185000
当前水位线:1553503185000
Key:hello,EventTime:1553503191000,currentTimeStamp:1553503191000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
Key:hello,EventTime:1553503186000,currentTimeStamp:1553503191000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
Key:hello,EventTime:1553503187000,currentTimeStamp:1553503191000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
Key:hello,EventTime:1553503185000,currentTimeStamp:1553503191000 --消息EventTime小于水位线,消息EventTime大于等于窗口beginTime,保留
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
Key:hello,EventTime:1553503184000,currentTimeStamp:1553503191000 --消息EventTime小于窗口beginTime,被丢弃
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
Key:hello,EventTime:1553503183000,currentTimeStamp:1553503191000 --消息EventTime小于窗口beginTime,被丢弃
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
Key:hello,EventTime:1553503190000,currentTimeStamp:1553503191000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
Key:hello,EventTime:1553503192000,currentTimeStamp:1553503192000
当前水位线:1553503187000
当前水位线:1553503187000
当前水位线:1553503187000
当前水位线:1553503187000
当前水位线:1553503187000
Key:hello,EventTime:1553503193000,currentTimeStamp:1553503193000
当前水位线:1553503188000
当前水位线:1553503188000
当前水位线:1553503188000
当前水位线:1553503188000
当前水位线:1553503188000
Key:hello,EventTime:1553503194000,currentTimeStamp:1553503194000
当前水位线:1553503189000
当前水位线:1553503189000
当前水位线:1553503189000
当前水位线:1553503189000
当前水位线:1553503189000
Key:hello,EventTime:1553503195000,currentTimeStamp:1553503195000
当前水位线:1553503190000
当前水位线:1553503190000
当前窗口开始时间[1553503185000,结束时间1553503190000) -- 当前水位线:1553503190000 >= 窗口结束时间1553503190000 才触发计算
4> - 1553503185000
4> - 1553503186000
4> - 1553503187000
4> - 1553503188000
4> - 1553503189000
4> - 1553503186000 --放在第一个窗口里
4> - 1553503187000 --放在第一个窗口里
4> - 1553503185000 --放在第一个窗口里,消息1553503184000,消息1553503183000被丢弃
当前水位线:1553503190000
当前水位线:1553503190000
当前水位线:1553503190000
Key:hello,EventTime:1553503196000,currentTimeStamp:1553503196000
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
Key:hello,EventTime:1553503197000,currentTimeStamp:1553503197000
当前水位线:1553503192000
当前水位线:1553503192000
当前水位线:1553503192000
当前水位线:1553503192000
当前水位线:1553503192000
Key:hello,EventTime:1553503198000,currentTimeStamp:1553503198000
当前水位线:1553503193000
当前水位线:1553503193000
当前水位线:1553503193000
当前水位线:1553503193000
当前水位线:1553503193000
Key:hello,EventTime:1553503199000,currentTimeStamp:1553503199000
当前水位线:1553503194000
当前水位线:1553503194000
当前水位线:1553503194000
当前水位线:1553503194000
当前水位线:1553503194000
Key:hello,EventTime:1553503200000,currentTimeStamp:1553503200000
当前水位线:1553503195000
当前窗口开始时间[1553503190000,结束时间1553503195000) -- 当前水位线:1553503195000 >= 窗口结束时间1553503195000 才触发计算
4> - 1553503190000
4> - 1553503191000
4> - 1553503190000
4> - 1553503192000
4> - 1553503193000
4> - 1553503194000
当前水位线:1553503195000
当前水位线:1553503195000
当前水位线:1553503195000
当前水位线:1553503195000
Key:hello,EventTime:1553503201000,currentTimeStamp:1553503201000
当前水位线:1553503196000
当前水位线:1553503196000
当前水位线:1553503196000
当前水位线:1553503196000
当前水位线:1553503196000
Key:hello,EventTime:1553503202000,currentTimeStamp:1553503202000 --末尾元素入窗,也会触发计算
当前水位线:1553503197000
当前窗口开始时间[1553503195000,结束时间1553503200000)
4> - 1553503195000
4> - 1553503196000
4> - 1553503197000
4> - 1553503198000
4> - 1553503199000
当前窗口开始时间[1553503200000,结束时间1553503205000)
4> - 1553503200000
4> - 1553503201000
4> - 1553503202000
2 修改1中的代码,在window中加入排序:
dataStream.map(new MapFunction<String, Tuple2<String, String>>()
@Override
public Tuple2<String, String> map(String s) throws Exception
return new Tuple2<String, String>(s.split(",")[0], s.split(",")[1]);
).keyBy(0)
//.timeWindow(Time.seconds(5)).apply() // 作用同下,一样能获取到窗口开始和结束时间
.window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new WindowFunction<Tuple2<String, String>, String, Tuple, TimeWindow>()
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, String>> input, Collector<String> out) throws Exception
System.out.println("当前窗口开始时间[" + window.getStart() + ",结束时间" + window.getEnd() + ")");
List<Tuple2<String, String>> list = new ArrayList<>();
input.forEach(o -> list.add(o));
list.sort((o1, o2) -> o1.f1.compareTo(o2.f1));
//list.sort(Comparator.comparing(o -> o.f1)); // 与上句代码同义,按照第二个属性升序排序
list.forEach(o -> out.collect(" - " + o.f1));
).print();
env.execute("Flink WaterMark Test2");
输出:
............
Key:hello,EventTime:1553503195000,currentTimeStamp:1553503195000
当前水位线:1553503190000
当前窗口开始时间[1553503185000,结束时间1553503190000) -- 可见已对乱序到达的消息做了升序排序和处理
4> - 1553503185000
4> - 1553503185000
4> - 1553503186000
4> - 1553503186000
4> - 1553503187000
4> - 1553503187000
4> - 1553503188000
4> - 1553503189000
当前水位线:1553503190000
当前水位线:1553503190000
当前水位线:1553503190000
当前水位线:1553503190000
Key:hello,EventTime:1553503196000,currentTimeStamp:1553503196000
当前水位线:1553503191000
............
Key:hello,EventTime:1553503200000,currentTimeStamp:1553503200000
当前水位线:1553503195000
当前窗口开始时间[1553503190000,结束时间1553503195000) -- 可见已对乱序到达的消息做了升序排序和处理
4> - 1553503190000
4> - 1553503190000
4> - 1553503191000
4> - 1553503192000
4> - 1553503193000
4> - 1553503194000
当前水位线:1553503195000
3 修改1的代码,不延迟,其他代码不动:
long maxDelayAllowed = 0l; //不延迟
输出:
Key:hello,EventTime:1553503185000,currentTimeStamp:1553503185000
当前水位线:1553503185000
当前水位线:1553503185000
当前水位线:1553503185000
当前水位线:1553503185000
Key:hello,EventTime:1553503186000,currentTimeStamp:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
Key:hello,EventTime:1553503187000,currentTimeStamp:1553503187000
当前水位线:1553503187000
当前水位线:1553503187000
当前水位线:1553503187000
当前水位线:1553503187000
当前水位线:1553503187000
Key:hello,EventTime:1553503188000,currentTimeStamp:1553503188000
当前水位线:1553503188000
当前水位线:1553503188000
当前水位线:1553503188000
当前水位线:1553503188000
当前水位线:1553503188000
Key:hello,EventTime:1553503189000,currentTimeStamp:1553503189000
当前水位线:1553503189000
当前水位线:1553503189000
当前水位线:1553503189000
当前水位线:1553503189000
当前水位线:1553503189000
Key:hello,EventTime:1553503190000,currentTimeStamp:1553503190000
当前水位线:1553503190000
当前窗口开始时间[1553503185000,结束时间1553503190000) -- 当前水位线:1553503190000 >= 窗口结束时间1553503190000 才触发计算
4> - 1553503185000
4> - 1553503186000
4> - 1553503187000
4> - 1553503188000
4> - 1553503189000
当前水位线:1553503190000
当前水位线:1553503190000
当前水位线:1553503190000
当前水位线:1553503190000
Key:hello,EventTime:1553503191000,currentTimeStamp:1553503191000 -- 在第二个窗口
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
Key:hello,EventTime:1553503186000,currentTimeStamp:1553503191000 -- 在第二个窗口被丢弃,因为EventTime小于
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
Key:hello,EventTime:1553503187000,currentTimeStamp:1553503191000 -- 在第二个窗口被丢弃
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
Key:hello,EventTime:1553503185000,currentTimeStamp:1553503191000 -- 在第二个窗口被丢弃
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
Key:hello,EventTime:1553503184000,currentTimeStamp:1553503191000 -- 在第二个窗口被丢弃
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
Key:hello,EventTime:1553503183000,currentTimeStamp:1553503191000 -- 在第二个窗口被丢弃
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
Key:hello,EventTime:1553503190000,currentTimeStamp:1553503191000 -- EventTime大于等于窗口beginTime,保留
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
Key:hello,EventTime:1553503192000,currentTimeStamp:1553503192000
当前水位线:1553503192000
当前水位线:1553503192000
当前水位线:1553503192000
当前水位线:1553503192000
当前水位线:1553503192000
Key:hello,EventTime:1553503193000,currentTimeStamp:1553503193000
当前水位线:1553503193000
当前水位线:1553503193000
当前水位线:1553503193000
当前水位线:1553503193000
当前水位线:1553503193000
Key:hello,EventTime:1553503194000,currentTimeStamp:1553503194000
当前水位线:1553503194000
当前水位线:1553503194000
当前水位线:1553503194000
当前水位线:1553503194000
当前水位线:1553503194000
Key:hello,EventTime:1553503195000,currentTimeStamp:1553503195000
当前水位线:1553503195000
当前窗口开始时间[1553503190000,结束时间1553503195000)
4> - 1553503190000
4> - 1553503191000
4> - 1553503190000
4> - 1553503192000
4> - 1553503193000
4> - 1553503194000
当前水位线:1553503195000
当前水位线:1553503195000
当前水位线:1553503195000
当前水位线:1553503195000
Key:hello,EventTime:1553503196000,currentTimeStamp:1553503196000
当前水位线:1553503196000
当前水位线:1553503196000
当前水位线:1553503196000
当前水位线:1553503196000
当前水位线:1553503196000
Key:hello,EventTime:1553503197000,currentTimeStamp:1553503197000
当前水位线:1553503197000
当前水位线:1553503197000
当前水位线:1553503197000
当前水位线:1553503197000
当前水位线:1553503197000
Key:hello,EventTime:1553503198000,currentTimeStamp:1553503198000
当前水位线:1553503198000
当前水位线:1553503198000
当前水位线:1553503198000
当前水位线:1553503198000
当前水位线:1553503198000
Key:hello,EventTime:1553503199000,currentTimeStamp:1553503199000
当前水位线:1553503199000
当前水位线:1553503199000
当前水位线:1553503199000
当前水位线:1553503199000
当前水位线:1553503199000
Key:hello,EventTime:1553503200000,currentTimeStamp:1553503200000
当前水位线:1553503200000
当前窗口开始时间[1553503195000,结束时间1553503200000)
4> - 1553503195000
4> - 1553503196000
4> - 1553503197000
4> - 1553503198000
4> - 1553503199000
当前水位线:1553503200000
当前水位线:1553503200000
当前水位线:1553503200000
当前水位线:1553503200000
Key:hello,EventTime:1553503201000,currentTimeStamp:1553503201000
当前水位线:1553503201000
当前水位线:1553503201000
当前水位线:1553503201000
当前水位线:1553503201000
当前水位线:1553503201000
Key:hello,EventTime:1553503202000,currentTimeStamp:1553503202000
当前水位线:1553503202000
当前窗口开始时间[1553503200000,结束时间1553503205000)
4> - 1553503200000
4> - 1553503201000
4> - 1553503202000
参考:
Flink流计算编程--watermark(水位线)简介_720_tigerMouse的博客-CSDN博客_watermark
Flink WaterMark简介 | LouisvV's Blog
Flink WaterMark实例 | LouisvV's Blog
以上是关于Flink 水位线机制WaterMark实践 处理乱序消息的主要内容,如果未能解决你的问题,请参考以下文章
Flink 窗口函数处理数据(Watermark和SideOutput)
flink 时间语义水位线(Watermark)生成水位线水位线的传递