Flink的定时器(EventTime和ProcessTime)

Posted 月疯

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink的定时器(EventTime和ProcessTime)相关的知识,希望对你有一定的参考价值。

Flink定时器
1、Flink当中定时器Timer的基本用法
定时器Timer是Flink提供的用于感知并利用处理时间、事件事件变化的一种机制,通常在KeyedProcessFunction当中使用定时器Timer,具体步骤:
在processElement()方法当中注册Timer,然后通过重写onTimer()方法作为Timer定时器出发时的回调逻辑。
2、简述在Flink当中注册定时器Timer的俩种方法
方式1:通过调用
Context.timerService().registerProcessTimeTimer()注册定时器,定时器在系统时间达到设定的时间戳时出发;
方式2:通过调用
Context.timerService().registerEventTimeTimer()方法注册定时器,定时器在Flink内部水印达到或者超过Timer设定的时间戳时出发。

eventTimer案列demo:

package Flink_API;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.util.Properties;

public class TestEventTimer 
    public static void main(String[] args) throws Exception 
        //获取Flink的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        String topic="test_topi_2";
        Properties properties=new Properties();
        properties.setProperty("bootstrap.servers","page1:9002");
        properties.setProperty("group.id","con1");

        FlinkKafkaConsumer010<String> myConsumer =new  FlinkKafkaConsumer010<>(topic, (DeserializationSchema) new SimpleStringSchema(),properties);
        DataStreamSource<String> kafkaTestStream = env.addSource(myConsumer);
        DataStream<String> StreamOperator=kafkaTestStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(30)) 
            @Override
            public long extractTimestamp(String s) 
                String[] splited =s.split(",");
                long event_time=Long.parseLong(splited[1]);
                return event_time;
            
        );
        DataStream<Tuple2<String,Integer>> stream=StreamOperator.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() 
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception 
                String[] splited = s.split("\\\\W+");
                collector.collect(Tuple2.of(splited[0],1));
            
        ).setParallelism(5);

        stream.keyBy(0).process(new CountWithTimeoutFunction());
        env.execute("TestEventTimer");

    
    public static class CountWithTimestamp
        public String key;
        public Integer count;
        public long lastModified;

        public String getKey() 
            return key;
        

        public void setKey(String key) 
            this.key = key;
        

        public Integer getCount() 
            return count;
        

        public void setCount(Integer count) 
            this.count = count;
        

        public long getLastModified() 
            return lastModified;
        

        public void setLastModified(long lastModified) 
            this.lastModified = lastModified;
        

        @Override
        public String toString() 
            return "CountWithTimestamp" +
                    "key='" + key + '\\'' +
                    ", count=" + count +
                    ", lastModified=" + lastModified +
                    '';
        
    
    //eventTime的定时器
    public static class CountWithTimeoutFunction extends KeyedProcessFunction<Tuple,Tuple2<String,Integer>,Tuple2<String,Integer>> 
        private ValueState<CountWithTimestamp> state;

        @Override
        public void open(Configuration parameters) throws Exception 
            super.open(parameters);
            state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState",CountWithTimestamp.class));
        

        @Override
        public void processElement(Tuple2<String, Integer> value, Context context, Collector<Tuple2<String, Integer>> collector) throws Exception 
            CountWithTimestamp current = state.value();
            if(current == null)
                current = new CountWithTimestamp();
                current.key=value.f0;
                current.count = 0;
                current.lastModified = 0;
            

            current.count +=1;
            current.lastModified=context.timestamp();
            state.update(current);

            context.timerService().registerEventTimeTimer(current.lastModified + 60000);
        

        /**
         * 定时器出发时的回调函数
         * @param timestamp 定时器设定的时间,注意:不是watermark的时间。
         * @param ctx
         * @param out
         * @throws Exception
         */
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Integer>> out) throws Exception 
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
            System.out.print("====>定时器:"+sdf.format(timestamp)+"被处罚!处罚时的watermark是:"+ctx.timerService().currentWatermark());
            CountWithTimestamp result = state.value();
            if(timestamp == result.lastModified + 60000)
                out.collect(Tuple2.of(result.key,result.count));
            

        
    


processTimer的案例:一个商品在100秒之内连续上升,就会输出这个商品

 

package Flink_API;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.util.Properties;

public class TestEventTimer 
    public static void main(String[] args) throws Exception 
        //获取Flink的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        env.setParallelism(1);

        String topic="test_topi_2";
        Properties properties=new Properties();
        properties.setProperty("bootstrap.servers","page1:9002");
        properties.setProperty("group.id","con1");

        FlinkKafkaConsumer010<String> myConsumer =new  FlinkKafkaConsumer010<>(topic, (DeserializationSchema) new SimpleStringSchema(),properties);
        DataStreamSource<String> kafkaTestStream = env.addSource(myConsumer);
        DataStream<String> StreamOperator=kafkaTestStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(30)) 
            @Override
            public long extractTimestamp(String s) 
                String[] splited =s.split(",");
                long event_time=Long.parseLong(splited[1]);
                return event_time;
            
        );
        DataStream<Tuple2<String,Integer>> stream=StreamOperator.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() 
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception 
                String[] splited = s.split("\\\\W+");
                collector.collect(Tuple2.of(splited[0],1));
            
        ).setParallelism(5);

        stream.keyBy(0).process(new TempIncreWaring(10000)).print();
        env.execute("TestProcessTimer");

    
    public static class CountWithTimestamp
        public String key;
        public Integer count;
        public long lastModified;

        public String getKey() 
            return key;
        

        public void setKey(String key) 
            this.key = key;
        

        public Integer getCount() 
            return count;
        

        public void setCount(Integer count) 
            this.count = count;
        

        public long getLastModified() 
            return lastModified;
        

        public void setLastModified(long lastModified) 
            this.lastModified = lastModified;
        

        @Override
        public String toString() 
            return "CountWithTimestamp" +
                    "key='" + key + '\\'' +
                    ", count=" + count +
                    ", lastModified=" + lastModified +
                    '';
        
    

    //processTime的定时器
    public static class TempIncreWaring extends KeyedProcessFunction<Tuple,Tuple2<String,Integer>,String>
        private Integer intervalTime;
        public TempIncreWaring(Integer intervalTime)
            this.intervalTime=intervalTime;
        
        //定义状态:保存该key对应的上一个价格值进行比较,保存注册定时器时间戳用于删除
        private ValueState<Integer> lastPriceState;
        private ValueState<Long> timerTsState;

        @Override
        public void open(Configuration parameters) throws Exception 
            super.open(parameters);
            lastPriceState=getRuntimeContext().getState(new ValueStateDescriptor<>("myState1",Integer.class,0));
            timerTsState=getRuntimeContext().getState(new ValueStateDescriptor<>("myState2",Long.class,0L));
        

        @Override
        public void processElement(Tuple2<String, Integer> value, Context context, Collector<String> collector) throws Exception 
            Integer lastPrice = lastPriceState.value();
            long timerTs = timerTsState.value();

            //当前价格和上一次价格进行比较
            if(value.f1 > lastPrice && timerTs == 0L)
                long ts =context.timerService().currentProcessingTime() + this.intervalTime;
                context.timerService().registerProcessingTimeTimer(ts);
                //更新定时器时间
                timerTsState.update(ts);
            else if(value.f1 <= lastPrice)
                //如果价格下降,删除定时器
                context.timerService().deleteProcessingTimeTimer(timerTs);
                timerTsState.update(0L);//或者timerTsState.clear()
            
            lastPriceState.update(value.f1);

        

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception 
            System.out.print("------------>");
            out.collect("商品:"+ctx.getCurrentKey()+"的价格,连续:"+this.intervalTime/1000+"秒连续上升");
            timerTsState.clear();//定时器清零
            System.out.print("<--------------------");
        
    

以上是关于Flink的定时器(EventTime和ProcessTime)的主要内容,如果未能解决你的问题,请参考以下文章

Flink基于EventTime和WaterMark处理乱序事件和晚到的数据

Flink基于EventTime和WaterMark处理乱序事件和晚到的数据

Flink基于EventTime和WaterMark处理乱序事件和晚到的数据

Flink基于EventTime和WaterMark处理乱序事件和晚到的数据

flink笔记10 [实验]体验ProcessingTime和指定EventTime下的区别

flink笔记10 [实验]体验ProcessingTime和指定EventTime下的区别