大数据(9e)Flink定时器
Posted 小基基o_O
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据(9e)Flink定时器相关的知识,希望对你有一定的参考价值。
文章目录
环境
WIN10+IDEA2021+JDK1.8+FLINK1.14
基于处理时间的定时器
registerProcessingTimeTimer
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
public class Hi
public static void main(String[] args) throws Exception
//创建执行环境,设置并行度
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
//创建流
SingleOutputStreamOperator<Long> d = env.addSource(new AutomatedSource());
//定时器
d.keyBy(s -> true).process(new KeyedProcessFunction<Boolean, Long, String>()
@Override
public void processElement(Long value, Context ctx, Collector<String> out)
//获取处理时间
long processingTime = ctx.timerService().currentProcessingTime();
//输出
out.collect("当前处理时间" + processingTime);
//注册处理时间定时器(2秒后触发)
ctx.timerService().registerProcessingTimeTimer(processingTime + 2000L);
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out)
out.collect("触发" + timestamp + "定时器");
).print();
//环境执行
env.execute();
public static class AutomatedSource implements SourceFunction<Long>
public AutomatedSource()
@Override
public void run(SourceFunction.SourceContext<Long> sc) throws InterruptedException
for (long l = 0L; l < 99; l++)
Thread.sleep(1000L);
sc.collect(l);
@Override
public void cancel()
基于事件时间的定时器
Setting timers is only supported on a keyed streams.
测试1
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
public class Hi
public static void main(String[] args) throws Exception
//创建执行环境,设置并行度
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
//创建流,确定 事件时间的水位线策略
SingleOutputStreamOperator<Long> d = env.addSource(new AutomatedSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Long>forMonotonousTimestamps()
//.assignTimestampsAndWatermarks(WatermarkStrategy.<Long>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner((element, recordTimestamp) -> element * 1000L));
//定时器
d.keyBy(s -> true).process(new KeyedProcessFunction<Boolean, Long, String>()
@Override
public void processElement(Long value, Context ctx, Collector<String> out)
//输出
out.collect("当前水位线 " + ctx.timerService().currentWatermark() + ";事件时间 " + ctx.timestamp());
//注册事件时间定时器(2秒后触发)
ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 1999L);
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out)
out.collect("触发" + timestamp + "定时器");
).print();
//环境执行
env.execute();
public static class AutomatedSource implements SourceFunction<Long>
public AutomatedSource()
@Override
public void run(SourceFunction.SourceContext<Long> sc) throws InterruptedException
Long[] ls = 1L, 2L, 3L, 4L, 5L, 6L, 9L, 8L, 7L;
for (Long l : ls)
Thread.sleep(299L);
sc.collect(l);
@Override
public void cancel()
打印结果
测试2
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
public class Hi
public static void main(String[] args) throws Exception
//创建执行环境,设置并行度
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
//创建流,确定 事件时间的水位线策略
SingleOutputStreamOperator<Long> d = env.addSource(new AutomatedSource())
//.assignTimestampsAndWatermarks(WatermarkStrategy.<Long>forMonotonousTimestamps()
.assignTimestampsAndWatermarks(WatermarkStrategy.<Long>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner((element, recordTimestamp) -> element * 1000L));
//定时器
d.keyBy(s -> true).process(new KeyedProcessFunction<Boolean, Long, String>()
@Override
public void processElement(Long value, Context ctx, Collector<String> out)
//输出
out.collect("当前水位线 " + ctx.timerService().currentWatermark() + ";事件时间 " + ctx.timestamp());
//注册事件时间定时器(2秒后触发)
ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 1999L);
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out)
out.collect("触发" + timestamp + "定时器");
).print();
//环境执行
env.execute();
public static class AutomatedSource implements SourceFunction<Long>
public AutomatedSource()
@Override
public void run(SourceFunction.SourceContext<Long> sc) throws InterruptedException
Long[] ls = 1L, 2L, 3L, 4L, 5L, 6L, 9L, 8L, 7L;
for (Long l : ls)
Thread.sleep(299L);
sc.collect(l);
@Override
public void cancel()
打印结果
以上是关于大数据(9e)Flink定时器的主要内容,如果未能解决你的问题,请参考以下文章