大数据(9e)Flink定时器

Posted 小基基o_O

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据(9e)Flink定时器相关的知识,希望对你有一定的参考价值。

文章目录

环境

WIN10+IDEA2021+JDK1.8+FLINK1.14

基于处理时间的定时器

registerProcessingTimeTimer

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

public class Hi 
    public static void main(String[] args) throws Exception 
        //创建执行环境,设置并行度
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        //创建流
        SingleOutputStreamOperator<Long> d = env.addSource(new AutomatedSource());
        //定时器
        d.keyBy(s -> true).process(new KeyedProcessFunction<Boolean, Long, String>() 
            @Override
            public void processElement(Long value, Context ctx, Collector<String> out) 
                //获取处理时间
                long processingTime = ctx.timerService().currentProcessingTime();
                //输出
                out.collect("当前处理时间" + processingTime);
                //注册处理时间定时器(2秒后触发)
                ctx.timerService().registerProcessingTimeTimer(processingTime + 2000L);
            

            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) 
                out.collect("触发" + timestamp + "定时器");
            
        ).print();
        //环境执行
        env.execute();
    

    public static class AutomatedSource implements SourceFunction<Long> 
        public AutomatedSource() 

        @Override
        public void run(SourceFunction.SourceContext<Long> sc) throws InterruptedException 
            for (long l = 0L; l < 99; l++) 
                Thread.sleep(1000L);
                sc.collect(l);
            
        

        @Override
        public void cancel() 
    

基于事件时间的定时器

Setting timers is only supported on a keyed streams.

测试1

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;

public class Hi 
    public static void main(String[] args) throws Exception 
        //创建执行环境,设置并行度
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        //创建流,确定 事件时间的水位线策略
        SingleOutputStreamOperator<Long> d = env.addSource(new AutomatedSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Long>forMonotonousTimestamps()
                //.assignTimestampsAndWatermarks(WatermarkStrategy.<Long>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner((element, recordTimestamp) -> element * 1000L));
        //定时器
        d.keyBy(s -> true).process(new KeyedProcessFunction<Boolean, Long, String>() 
            @Override
            public void processElement(Long value, Context ctx, Collector<String> out) 
                //输出
                out.collect("当前水位线 " + ctx.timerService().currentWatermark() + ";事件时间 " + ctx.timestamp());
                //注册事件时间定时器(2秒后触发)
                ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 1999L);
            

            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) 
                out.collect("触发" + timestamp + "定时器");
            
        ).print();
        //环境执行
        env.execute();
    

    public static class AutomatedSource implements SourceFunction<Long> 
        public AutomatedSource() 

        @Override
        public void run(SourceFunction.SourceContext<Long> sc) throws InterruptedException 
            Long[] ls = 1L, 2L, 3L, 4L, 5L, 6L, 9L, 8L, 7L;
            for (Long l : ls) 
                Thread.sleep(299L);
                sc.collect(l);
            
        

        @Override
        public void cancel() 
    

打印结果

测试2

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;

public class Hi 
    public static void main(String[] args) throws Exception 
        //创建执行环境,设置并行度
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        //创建流,确定 事件时间的水位线策略
        SingleOutputStreamOperator<Long> d = env.addSource(new AutomatedSource())
                //.assignTimestampsAndWatermarks(WatermarkStrategy.<Long>forMonotonousTimestamps()
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Long>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner((element, recordTimestamp) -> element * 1000L));
        //定时器
        d.keyBy(s -> true).process(new KeyedProcessFunction<Boolean, Long, String>() 
            @Override
            public void processElement(Long value, Context ctx, Collector<String> out) 
                //输出
                out.collect("当前水位线 " + ctx.timerService().currentWatermark() + ";事件时间 " + ctx.timestamp());
                //注册事件时间定时器(2秒后触发)
                ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 1999L);
            

            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) 
                out.collect("触发" + timestamp + "定时器");
            
        ).print();
        //环境执行
        env.execute();
    

    public static class AutomatedSource implements SourceFunction<Long> 
        public AutomatedSource() 

        @Override
        public void run(SourceFunction.SourceContext<Long> sc) throws InterruptedException 
            Long[] ls = 1L, 2L, 3L, 4L, 5L, 6L, 9L, 8L, 7L;
            for (Long l : ls) 
                Thread.sleep(299L);
                sc.collect(l);
            
        

        @Override
        public void cancel() 
    

打印结果

以上是关于大数据(9e)Flink定时器的主要内容,如果未能解决你的问题,请参考以下文章

大数据(9e)图解Flink窗口

大数据(9e)图解Flink窗口

大数据(9e)图解Flink窗口

大数据(9e)Flink侧输出流

大数据(9e)Flink侧输出流

大数据(9e)图解Flink窗口