Flink 水位线机制WaterMark实践 处理乱序消息

Posted 二十六画生的博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 水位线机制WaterMark实践 处理乱序消息相关的知识,希望对你有一定的参考价值。

1 水位线(WaterMark)是一个时间戳,等于当前到达的消息最大时间戳减去配置的延迟时间,水位线是单调递增的,如果有晚到达的早消息也不会更新水位线,因为消息最大时间戳没变

水位线 = 消息最大时间戳 - 配置的INTERVAl(offset)时间

2 新消息到达时,才计算新的水位线,如果水位线大于等于窗口的endTime(左闭右开)则触发窗口计算,反之继续接收后续消息;消息的EventTime大于等于窗口beginTime则保留,反之被丢弃

水位线 >= 窗口的endTime,则触发窗口计算

3 消息的EventTime小于水位线时不一定被丢弃;消息的EventTime小于窗口beginTime时才会被丢弃

4 与window一起使用,可以对乱序到达的消息排序后再处理

5 引入水位线机制的目的是延迟窗口触发计算的时间,使晚到达的早的消息尽可能也能被保留,用于窗口计算,提高数据准确性

版本:flink1.9.2,java1.8

1 滚动窗口,延迟5s,window内不排序 :

package WaterMark;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.Date;

/**
 * @Author you guess
 * @Date 2020/6/20 15:55
 * @Version 1.0
 * @Desc
 */
public class WaterMarkTest 

    public static void main(String[] args) throws Exception 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<String> dataStream = env.addSource(new SourceFunction<String>() 
            @Override
            public void run(SourceContext<String> ctx) throws Exception 
                ctx.collect("hello,1553503185000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503186000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503187000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503188000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503189000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503190000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503191000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503186000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503187000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503185000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503184000"); //丢弃
                Thread.sleep(1000);
                ctx.collect("hello,1553503183000"); //丢弃
                Thread.sleep(1000);
                ctx.collect("hello,1553503190000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503192000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503193000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503194000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503195000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503196000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503197000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503198000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503199000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503200000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503201000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503202000");
            

            @Override
            public void cancel() 

            
        , "source1").assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<String>()  //周期性水位线
            long currentTimeStamp = 0l;
            long maxDelayAllowed = 5000l; //延迟5s
            long currentWaterMark;

            /**
             * 周期循环执行,默认是每200ms执行一次该方法
             * @return
             */
            @Override
            public Watermark getCurrentWatermark() 
                currentWaterMark = currentTimeStamp - maxDelayAllowed;
                System.out.println("当前水位线:" + currentWaterMark);
                return new Watermark(currentWaterMark);
            

            /**
             * 来消息才执行。来消息时先执行该方法extractTimestamp,然后再执行getCurrentWatermark
             * @param s
             * @param l
             * @return
             */
            @Override
            public long extractTimestamp(String s, long l) 
                String[] arr = s.split(",");
                long eventTime = Long.parseLong(arr[1]);
                currentTimeStamp = Math.max(eventTime, currentTimeStamp);
                System.out.println("Key:" + arr[0] + ",EventTime:" + eventTime + ",currentTimeStamp:" + currentTimeStamp);
                return eventTime;
            
        );

        dataStream.map(new MapFunction<String, Tuple2<String, String>>() 
            @Override
            public Tuple2<String, String> map(String s) throws Exception 
                return new Tuple2<String, String>(s.split(",")[0], s.split(",")[1]);
            
        ).keyBy(0)
                //.timeWindow(Time.seconds(5)).apply() // 作用同下,一样能获取到窗口开始和结束时间
                .window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new WindowFunction<Tuple2<String, String>, String, Tuple, TimeWindow>() 
            @Override
            public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, String>> input, Collector<String> out) throws Exception 
                System.out.println("当前窗口开始时间[" + window.getStart() + ",结束时间" + window.getEnd() + ")");
                for (Tuple2<String, String> element : input) 
                    out.collect(" - " + element.f1);
                
            
        ).print();

        env.execute("Flink WaterMark Test1");
    

输出:

Key:hello,EventTime:1553503185000,currentTimeStamp:1553503185000
当前水位线:1553503180000
当前水位线:1553503180000
当前水位线:1553503180000
当前水位线:1553503180000
Key:hello,EventTime:1553503186000,currentTimeStamp:1553503186000
当前水位线:1553503181000
当前水位线:1553503181000
当前水位线:1553503181000
当前水位线:1553503181000
当前水位线:1553503181000
Key:hello,EventTime:1553503187000,currentTimeStamp:1553503187000
当前水位线:1553503182000
当前水位线:1553503182000
当前水位线:1553503182000
当前水位线:1553503182000
当前水位线:1553503182000
Key:hello,EventTime:1553503188000,currentTimeStamp:1553503188000
当前水位线:1553503183000
当前水位线:1553503183000
当前水位线:1553503183000
当前水位线:1553503183000
当前水位线:1553503183000
Key:hello,EventTime:1553503189000,currentTimeStamp:1553503189000
当前水位线:1553503184000
当前水位线:1553503184000
当前水位线:1553503184000
当前水位线:1553503184000
当前水位线:1553503184000
Key:hello,EventTime:1553503190000,currentTimeStamp:1553503190000
当前水位线:1553503185000
当前水位线:1553503185000
当前水位线:1553503185000
当前水位线:1553503185000
当前水位线:1553503185000
Key:hello,EventTime:1553503191000,currentTimeStamp:1553503191000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
Key:hello,EventTime:1553503186000,currentTimeStamp:1553503191000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
Key:hello,EventTime:1553503187000,currentTimeStamp:1553503191000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
Key:hello,EventTime:1553503185000,currentTimeStamp:1553503191000  --消息EventTime小于水位线,消息EventTime大于等于窗口beginTime,保留
当前水位线:1553503186000

当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
Key:hello,EventTime:1553503184000,currentTimeStamp:1553503191000 --消息EventTime小于窗口beginTime,被丢弃
当前水位线:1553503186000

当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
Key:hello,EventTime:1553503183000,currentTimeStamp:1553503191000 --消息EventTime小于窗口beginTime,被丢弃
当前水位线:1553503186000

当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
Key:hello,EventTime:1553503190000,currentTimeStamp:1553503191000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
Key:hello,EventTime:1553503192000,currentTimeStamp:1553503192000
当前水位线:1553503187000
当前水位线:1553503187000
当前水位线:1553503187000
当前水位线:1553503187000
当前水位线:1553503187000
Key:hello,EventTime:1553503193000,currentTimeStamp:1553503193000
当前水位线:1553503188000
当前水位线:1553503188000
当前水位线:1553503188000
当前水位线:1553503188000
当前水位线:1553503188000
Key:hello,EventTime:1553503194000,currentTimeStamp:1553503194000
当前水位线:1553503189000
当前水位线:1553503189000
当前水位线:1553503189000
当前水位线:1553503189000
当前水位线:1553503189000
Key:hello,EventTime:1553503195000,currentTimeStamp:1553503195000
当前水位线:1553503190000
当前水位线:1553503190000
当前窗口开始时间[1553503185000,结束时间1553503190000) -- 当前水位线:1553503190000 >= 窗口结束时间1553503190000 才触发计算
4>  - 1553503185000
4>  - 1553503186000
4>  - 1553503187000
4>  - 1553503188000
4>  - 1553503189000
4>  - 1553503186000 --放在第一个窗口里
4>  - 1553503187000 --放在第一个窗口里
4>  - 1553503185000 --放在第一个窗口里,消息1553503184000,消息1553503183000被丢弃

当前水位线:1553503190000
当前水位线:1553503190000
当前水位线:1553503190000
Key:hello,EventTime:1553503196000,currentTimeStamp:1553503196000
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
Key:hello,EventTime:1553503197000,currentTimeStamp:1553503197000
当前水位线:1553503192000
当前水位线:1553503192000
当前水位线:1553503192000
当前水位线:1553503192000
当前水位线:1553503192000
Key:hello,EventTime:1553503198000,currentTimeStamp:1553503198000
当前水位线:1553503193000
当前水位线:1553503193000
当前水位线:1553503193000
当前水位线:1553503193000
当前水位线:1553503193000
Key:hello,EventTime:1553503199000,currentTimeStamp:1553503199000
当前水位线:1553503194000
当前水位线:1553503194000
当前水位线:1553503194000
当前水位线:1553503194000
当前水位线:1553503194000
Key:hello,EventTime:1553503200000,currentTimeStamp:1553503200000
当前水位线:1553503195000
当前窗口开始时间[1553503190000,结束时间1553503195000) -- 当前水位线:1553503195000 >= 窗口结束时间1553503195000 才触发计算
4>  - 1553503190000 
4>  - 1553503191000
4>  - 1553503190000
4>  - 1553503192000
4>  - 1553503193000
4>  - 1553503194000

当前水位线:1553503195000
当前水位线:1553503195000
当前水位线:1553503195000
当前水位线:1553503195000
Key:hello,EventTime:1553503201000,currentTimeStamp:1553503201000
当前水位线:1553503196000
当前水位线:1553503196000
当前水位线:1553503196000
当前水位线:1553503196000
当前水位线:1553503196000
Key:hello,EventTime:1553503202000,currentTimeStamp:1553503202000 --末尾元素入窗,也会触发计算
当前水位线:1553503197000
当前窗口开始时间[1553503195000,结束时间1553503200000)
4>  - 1553503195000
4>  - 1553503196000
4>  - 1553503197000
4>  - 1553503198000
4>  - 1553503199000
当前窗口开始时间[1553503200000,结束时间1553503205000)
4>  - 1553503200000
4>  - 1553503201000
4>  - 1553503202000

 

2 修改1中的代码,在window中加入排序:

 dataStream.map(new MapFunction<String, Tuple2<String, String>>() 
            @Override
            public Tuple2<String, String> map(String s) throws Exception 
                return new Tuple2<String, String>(s.split(",")[0], s.split(",")[1]);
            
        ).keyBy(0)
                //.timeWindow(Time.seconds(5)).apply() // 作用同下,一样能获取到窗口开始和结束时间
                .window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new WindowFunction<Tuple2<String, String>, String, Tuple, TimeWindow>() 
            @Override
            public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, String>> input, Collector<String> out) throws Exception 
                System.out.println("当前窗口开始时间[" + window.getStart() + ",结束时间" + window.getEnd() + ")");
                List<Tuple2<String, String>> list = new ArrayList<>();
                input.forEach(o -> list.add(o));
                list.sort((o1, o2) -> o1.f1.compareTo(o2.f1));
                //list.sort(Comparator.comparing(o -> o.f1)); // 与上句代码同义,按照第二个属性升序排序
                list.forEach(o -> out.collect(" - " + o.f1));
            
        ).print();

        env.execute("Flink WaterMark Test2");

输出:

............

Key:hello,EventTime:1553503195000,currentTimeStamp:1553503195000
当前水位线:1553503190000
当前窗口开始时间[1553503185000,结束时间1553503190000) -- 可见已对乱序到达的消息做了升序排序和处理
4>  - 1553503185000
4>  - 1553503185000
4>  - 1553503186000
4>  - 1553503186000
4>  - 1553503187000
4>  - 1553503187000
4>  - 1553503188000
4>  - 1553503189000

当前水位线:1553503190000
当前水位线:1553503190000
当前水位线:1553503190000
当前水位线:1553503190000
Key:hello,EventTime:1553503196000,currentTimeStamp:1553503196000
当前水位线:1553503191000

............

Key:hello,EventTime:1553503200000,currentTimeStamp:1553503200000
当前水位线:1553503195000
当前窗口开始时间[1553503190000,结束时间1553503195000) -- 可见已对乱序到达的消息做了升序排序和处理
4>  - 1553503190000
4>  - 1553503190000
4>  - 1553503191000
4>  - 1553503192000
4>  - 1553503193000
4>  - 1553503194000

当前水位线:1553503195000

3 修改1的代码,不延迟,其他代码不动:

            long maxDelayAllowed = 0l; //不延迟

输出:

Key:hello,EventTime:1553503185000,currentTimeStamp:1553503185000
当前水位线:1553503185000
当前水位线:1553503185000
当前水位线:1553503185000
当前水位线:1553503185000
Key:hello,EventTime:1553503186000,currentTimeStamp:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
当前水位线:1553503186000
Key:hello,EventTime:1553503187000,currentTimeStamp:1553503187000
当前水位线:1553503187000
当前水位线:1553503187000
当前水位线:1553503187000
当前水位线:1553503187000
当前水位线:1553503187000
Key:hello,EventTime:1553503188000,currentTimeStamp:1553503188000
当前水位线:1553503188000
当前水位线:1553503188000
当前水位线:1553503188000
当前水位线:1553503188000
当前水位线:1553503188000
Key:hello,EventTime:1553503189000,currentTimeStamp:1553503189000
当前水位线:1553503189000
当前水位线:1553503189000
当前水位线:1553503189000
当前水位线:1553503189000
当前水位线:1553503189000
Key:hello,EventTime:1553503190000,currentTimeStamp:1553503190000
当前水位线:1553503190000
当前窗口开始时间[1553503185000,结束时间1553503190000) -- 当前水位线:1553503190000 >= 窗口结束时间1553503190000 才触发计算
4>  - 1553503185000
4>  - 1553503186000
4>  - 1553503187000
4>  - 1553503188000
4>  - 1553503189000

当前水位线:1553503190000
当前水位线:1553503190000
当前水位线:1553503190000
当前水位线:1553503190000
Key:hello,EventTime:1553503191000,currentTimeStamp:1553503191000 -- 在第二个窗口
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
Key:hello,EventTime:1553503186000,currentTimeStamp:1553503191000 -- 在第二个窗口被丢弃,因为EventTime小于
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
Key:hello,EventTime:1553503187000,currentTimeStamp:1553503191000  -- 在第二个窗口被丢弃
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
Key:hello,EventTime:1553503185000,currentTimeStamp:1553503191000  -- 在第二个窗口被丢弃
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
Key:hello,EventTime:1553503184000,currentTimeStamp:1553503191000  -- 在第二个窗口被丢弃
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
Key:hello,EventTime:1553503183000,currentTimeStamp:1553503191000  -- 在第二个窗口被丢弃
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
Key:hello,EventTime:1553503190000,currentTimeStamp:1553503191000 -- EventTime大于等于窗口beginTime,保留
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
当前水位线:1553503191000
Key:hello,EventTime:1553503192000,currentTimeStamp:1553503192000
当前水位线:1553503192000
当前水位线:1553503192000
当前水位线:1553503192000
当前水位线:1553503192000
当前水位线:1553503192000
Key:hello,EventTime:1553503193000,currentTimeStamp:1553503193000
当前水位线:1553503193000
当前水位线:1553503193000
当前水位线:1553503193000
当前水位线:1553503193000
当前水位线:1553503193000
Key:hello,EventTime:1553503194000,currentTimeStamp:1553503194000
当前水位线:1553503194000
当前水位线:1553503194000
当前水位线:1553503194000
当前水位线:1553503194000
当前水位线:1553503194000
Key:hello,EventTime:1553503195000,currentTimeStamp:1553503195000
当前水位线:1553503195000
当前窗口开始时间[1553503190000,结束时间1553503195000)
4>  - 1553503190000
4>  - 1553503191000
4>  - 1553503190000
4>  - 1553503192000
4>  - 1553503193000
4>  - 1553503194000
当前水位线:1553503195000
当前水位线:1553503195000
当前水位线:1553503195000
当前水位线:1553503195000
Key:hello,EventTime:1553503196000,currentTimeStamp:1553503196000
当前水位线:1553503196000
当前水位线:1553503196000
当前水位线:1553503196000
当前水位线:1553503196000
当前水位线:1553503196000
Key:hello,EventTime:1553503197000,currentTimeStamp:1553503197000
当前水位线:1553503197000
当前水位线:1553503197000
当前水位线:1553503197000
当前水位线:1553503197000
当前水位线:1553503197000
Key:hello,EventTime:1553503198000,currentTimeStamp:1553503198000
当前水位线:1553503198000
当前水位线:1553503198000
当前水位线:1553503198000
当前水位线:1553503198000
当前水位线:1553503198000
Key:hello,EventTime:1553503199000,currentTimeStamp:1553503199000
当前水位线:1553503199000
当前水位线:1553503199000
当前水位线:1553503199000
当前水位线:1553503199000
当前水位线:1553503199000
Key:hello,EventTime:1553503200000,currentTimeStamp:1553503200000
当前水位线:1553503200000
当前窗口开始时间[1553503195000,结束时间1553503200000)
4>  - 1553503195000
4>  - 1553503196000
4>  - 1553503197000
4>  - 1553503198000
4>  - 1553503199000
当前水位线:1553503200000
当前水位线:1553503200000
当前水位线:1553503200000
当前水位线:1553503200000
Key:hello,EventTime:1553503201000,currentTimeStamp:1553503201000
当前水位线:1553503201000
当前水位线:1553503201000
当前水位线:1553503201000
当前水位线:1553503201000
当前水位线:1553503201000
Key:hello,EventTime:1553503202000,currentTimeStamp:1553503202000
当前水位线:1553503202000
当前窗口开始时间[1553503200000,结束时间1553503205000)
4>  - 1553503200000
4>  - 1553503201000
4>  - 1553503202000

参考:

Flink流计算编程--watermark(水位线)简介_720_tigerMouse的博客-CSDN博客_watermark

Flink WaterMark简介 | LouisvV's Blog

Flink WaterMark实例 | LouisvV's Blog

以上是关于Flink 水位线机制WaterMark实践 处理乱序消息的主要内容,如果未能解决你的问题,请参考以下文章

[白话解析] Flink的Watermark机制

Flink 窗口函数处理数据(Watermark和SideOutput)

flink窗口与水位线watermark例子

flink 时间语义水位线(Watermark)生成水位线水位线的传递

12-flink-1.10.1-Flink中的时间语义和watermark

12-flink-1.10.1-Flink中的时间语义和watermark