Flink打印当前Watermark

Posted 小基基o_O

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink打印当前Watermark相关的知识,希望对你有一定的参考价值。

文章目录

1、环境

WIN10+IDEA2021+JDK1.8+FLINK1.14

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <flink.version>1.14.6</flink.version>
    <scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>$flink.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_$scala.binary.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_$scala.binary.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime-web_$scala.binary.version</artifactId>
        <version>$flink.version</version>
    </dependency>
</dependencies>

2、测试一

2.1、Java代码

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

public class Hi 
    public static void main(String[] args) throws Exception 
        //创建执行环境,设置并行度
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        //创建流,确定 事件时间的水位线策略
        SingleOutputStreamOperator<Long> d = env.addSource(new AutomatedSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Long>forMonotonousTimestamps()
                        .withTimestampAssigner((element, recordTimestamp) -> element * 1000L));
        //定时器
        d.keyBy(s -> true).process(new KeyedProcessFunction<Boolean, Long, String>() 
            @Override
            public void processElement(Long aLong, Context context, Collector<String> collector) 
                collector.collect("当前watermark:" + context.timerService().currentWatermark());
                collector.collect("输入值:" + aLong);
                collector.collect("事件时间:" + context.timestamp());
                collector.collect("------------------------------------------");
            
        ).print();
        //环境执行
        env.execute();
    

    public static class AutomatedSource implements SourceFunction<Long> 
        public AutomatedSource() 

        @Override
        public void run(SourceFunction.SourceContext<Long> sc) throws InterruptedException 
            Long[] ls = 1L, 2L, 3L, 4L, 6L, 5L, 9L, 8L, 7L;
            for (Long l : ls) 
                Thread.sleep(201L);
                sc.collect(l);
            
        

        @Override
        public void cancel() 
    

2.2、测试结果

当前watermark:-9223372036854775808
输入值:1
事件时间:1000
------------------------------------------
当前watermark:999
输入值:2
事件时间:2000
------------------------------------------
当前watermark:1999
输入值:3
事件时间:3000
------------------------------------------
当前watermark:2999
输入值:4
事件时间:4000
------------------------------------------
当前watermark:3999
输入值:6
事件时间:6000
------------------------------------------
当前watermark:5999
输入值:5
事件时间:5000
------------------------------------------
当前watermark:5999
输入值:9
事件时间:9000
------------------------------------------
当前watermark:8999
输入值:8
事件时间:8000
------------------------------------------
当前watermark:8999
输入值:7
事件时间:7000
------------------------------------------

2.3、测试结果说明

  • 初始Watermark是Long.MIN_VALUE,即-9223372036854775808
  • Watermark默认自动更新时间200毫秒
    数据源每201毫秒发送1次数据,保证下1个数据到来时,Watermark被上1个数据更新
  • 本代码的水位线策略是单调递增(Monotonous)
    W a t e r m a r k = 最 大 事 件 时 间 − 1 毫 秒 Watermark=最大事件时间-1毫秒 Watermark=1

3、测试二

3.1、Java代码

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;

public class Hi 
    public static void main(String[] args) throws Exception 
        //创建执行环境,设置并行度
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        //设置Watermark自动更新间隔(每2秒更新1次)
        env.getConfig().setAutoWatermarkInterval(2000L);
        //创建流;事件时间的水位线策略:最大事件时间-3秒
        SingleOutputStreamOperator<Long> d = env.addSource(new AutomatedSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Long>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner((element, recordTimestamp) -> element * 1000L));
        //定时器
        d.keyBy(s -> true).process(new KeyedProcessFunction<Boolean, Long, String>() 
            @Override
            public void processElement(Long aLong, Context context, Collector<String> collector) 
                collector.collect("当前watermark:" + context.timerService().currentWatermark());
                collector.collect("事件时间:" + context.timestamp());
                collector.collect("------------------------------------------");
            
        ).print();
        //环境执行
        env.execute();
    

    public static class AutomatedSource implements SourceFunction<Long> 
        public AutomatedSource() 

        @Override
        public void run(SourceFunction.SourceContext<Long> sc) throws InterruptedException 
            Long[] ls = 1L, 2L, 3L, 4L, 5L, 6L, 9L, 8L, 7L, 10L, 0L, 0L;
            for (Long l : ls) 
                Thread.sleep(1001L);
                sc.collect(l);
            
        

        @Override
        public void cancel() 
    

3.2、测试结果

当前watermark:-9223372036854775808
事件时间:1000
------------------------------------------
当前watermark:-2001
事件时间:2000
------------------------------------------
当前watermark:-2001
事件时间:3000
------------------------------------------
当前watermark:-1
事件时间:4000
------------------------------------------
当前watermark:-1
事件时间:5000
------------------------------------------
当前watermark:1999
事件时间:6000
------------------------------------------
当前watermark:1999
事件时间:9000
------------------------------------------
当前watermark:5999
事件时间:8000
------------------------------------------
当前watermark:5999
事件时间:7000
------------------------------------------
当前watermark:5999
事件时间:10000
------------------------------------------
当前watermark:5999
事件时间:0
------------------------------------------
当前watermark:6999
事件时间:0
------------------------------------------

3.3、测试结果说明

  • Watermark默认自动更新时间改为2000毫秒
    数据源每1001毫秒发送1次数据,下1个数据到来时,Watermark不一定被上1个数据更新
  • 本代码的水位线策略:
    W a t e r m a r k = 最 大 事 件 时 间 − 3 秒 − 1 毫 秒 Watermark=最大事件时间-3秒-1毫秒 Watermark=31

以上是关于Flink打印当前Watermark的主要内容,如果未能解决你的问题,请参考以下文章

flink 时间语义水位线(Watermark)生成水位线水位线的传递

flink窗口与水位线watermark例子

实时即未来,大数据项目车联网之Flink Watermark(水位线)十四

实时即未来,大数据项目车联网之Flink Watermark(水位线)十四

Flink打印当前Watermark

Flink打印当前Watermark