Flink打印当前Watermark
Posted 小基基o_O
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink打印当前Watermark相关的知识,希望对你有一定的参考价值。
文章目录
1、环境
WIN10+IDEA2021+JDK1.8+FLINK1.14
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.14.6</flink.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
</dependencies>
2、测试一
2.1、Java代码
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
public class Hi
public static void main(String[] args) throws Exception
//创建执行环境,设置并行度
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
//创建流,确定 事件时间的水位线策略
SingleOutputStreamOperator<Long> d = env.addSource(new AutomatedSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Long>forMonotonousTimestamps()
.withTimestampAssigner((element, recordTimestamp) -> element * 1000L));
//定时器
d.keyBy(s -> true).process(new KeyedProcessFunction<Boolean, Long, String>()
@Override
public void processElement(Long aLong, Context context, Collector<String> collector)
collector.collect("当前watermark:" + context.timerService().currentWatermark());
collector.collect("输入值:" + aLong);
collector.collect("事件时间:" + context.timestamp());
collector.collect("------------------------------------------");
).print();
//环境执行
env.execute();
public static class AutomatedSource implements SourceFunction<Long>
public AutomatedSource()
@Override
public void run(SourceFunction.SourceContext<Long> sc) throws InterruptedException
Long[] ls = 1L, 2L, 3L, 4L, 6L, 5L, 9L, 8L, 7L;
for (Long l : ls)
Thread.sleep(201L);
sc.collect(l);
@Override
public void cancel()
2.2、测试结果
当前watermark:-9223372036854775808
输入值:1
事件时间:1000
------------------------------------------
当前watermark:999
输入值:2
事件时间:2000
------------------------------------------
当前watermark:1999
输入值:3
事件时间:3000
------------------------------------------
当前watermark:2999
输入值:4
事件时间:4000
------------------------------------------
当前watermark:3999
输入值:6
事件时间:6000
------------------------------------------
当前watermark:5999
输入值:5
事件时间:5000
------------------------------------------
当前watermark:5999
输入值:9
事件时间:9000
------------------------------------------
当前watermark:8999
输入值:8
事件时间:8000
------------------------------------------
当前watermark:8999
输入值:7
事件时间:7000
------------------------------------------
2.3、测试结果说明
- 初始Watermark是
Long.MIN_VALUE
,即-9223372036854775808
- Watermark默认自动更新时间200毫秒
数据源每201毫秒发送1次数据,保证下1个数据到来时,Watermark被上1个数据更新 - 本代码的水位线策略是单调递增(Monotonous)
W a t e r m a r k = 最 大 事 件 时 间 − 1 毫 秒 Watermark=最大事件时间-1毫秒 Watermark=最大事件时间−1毫秒
3、测试二
3.1、Java代码
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
public class Hi
public static void main(String[] args) throws Exception
//创建执行环境,设置并行度
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
//设置Watermark自动更新间隔(每2秒更新1次)
env.getConfig().setAutoWatermarkInterval(2000L);
//创建流;事件时间的水位线策略:最大事件时间-3秒
SingleOutputStreamOperator<Long> d = env.addSource(new AutomatedSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Long>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element, recordTimestamp) -> element * 1000L));
//定时器
d.keyBy(s -> true).process(new KeyedProcessFunction<Boolean, Long, String>()
@Override
public void processElement(Long aLong, Context context, Collector<String> collector)
collector.collect("当前watermark:" + context.timerService().currentWatermark());
collector.collect("事件时间:" + context.timestamp());
collector.collect("------------------------------------------");
).print();
//环境执行
env.execute();
public static class AutomatedSource implements SourceFunction<Long>
public AutomatedSource()
@Override
public void run(SourceFunction.SourceContext<Long> sc) throws InterruptedException
Long[] ls = 1L, 2L, 3L, 4L, 5L, 6L, 9L, 8L, 7L, 10L, 0L, 0L;
for (Long l : ls)
Thread.sleep(1001L);
sc.collect(l);
@Override
public void cancel()
3.2、测试结果
当前watermark:-9223372036854775808
事件时间:1000
------------------------------------------
当前watermark:-2001
事件时间:2000
------------------------------------------
当前watermark:-2001
事件时间:3000
------------------------------------------
当前watermark:-1
事件时间:4000
------------------------------------------
当前watermark:-1
事件时间:5000
------------------------------------------
当前watermark:1999
事件时间:6000
------------------------------------------
当前watermark:1999
事件时间:9000
------------------------------------------
当前watermark:5999
事件时间:8000
------------------------------------------
当前watermark:5999
事件时间:7000
------------------------------------------
当前watermark:5999
事件时间:10000
------------------------------------------
当前watermark:5999
事件时间:0
------------------------------------------
当前watermark:6999
事件时间:0
------------------------------------------
3.3、测试结果说明
- Watermark默认自动更新时间改为2000毫秒
数据源每1001毫秒发送1次数据,下1个数据到来时,Watermark不一定被上1个数据更新 - 本代码的水位线策略:
W a t e r m a r k = 最 大 事 件 时 间 − 3 秒 − 1 毫 秒 Watermark=最大事件时间-3秒-1毫秒 Watermark=最大事件时间−3秒−1毫秒
以上是关于Flink打印当前Watermark的主要内容,如果未能解决你的问题,请参考以下文章
flink 时间语义水位线(Watermark)生成水位线水位线的传递
实时即未来,大数据项目车联网之Flink Watermark(水位线)十四