Flink的定时器(EventTime和ProcessTime)
Posted 月疯
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink的定时器(EventTime和ProcessTime)相关的知识,希望对你有一定的参考价值。
Flink定时器
1、Flink当中定时器Timer的基本用法
定时器Timer是Flink提供的用于感知并利用处理时间、事件事件变化的一种机制,通常在KeyedProcessFunction当中使用定时器Timer,具体步骤:
在processElement()方法当中注册Timer,然后通过重写onTimer()方法作为Timer定时器出发时的回调逻辑。
2、简述在Flink当中注册定时器Timer的俩种方法
方式1:通过调用
Context.timerService().registerProcessTimeTimer()注册定时器,定时器在系统时间达到设定的时间戳时出发;
方式2:通过调用
Context.timerService().registerEventTimeTimer()方法注册定时器,定时器在Flink内部水印达到或者超过Timer设定的时间戳时出发。
eventTimer案列demo:
package Flink_API;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.util.Collector;
import java.text.SimpleDateFormat;
import java.util.Properties;
public class TestEventTimer
public static void main(String[] args) throws Exception
//获取Flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
String topic="test_topi_2";
Properties properties=new Properties();
properties.setProperty("bootstrap.servers","page1:9002");
properties.setProperty("group.id","con1");
FlinkKafkaConsumer010<String> myConsumer =new FlinkKafkaConsumer010<>(topic, (DeserializationSchema) new SimpleStringSchema(),properties);
DataStreamSource<String> kafkaTestStream = env.addSource(myConsumer);
DataStream<String> StreamOperator=kafkaTestStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(30))
@Override
public long extractTimestamp(String s)
String[] splited =s.split(",");
long event_time=Long.parseLong(splited[1]);
return event_time;
);
DataStream<Tuple2<String,Integer>> stream=StreamOperator.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>()
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception
String[] splited = s.split("\\\\W+");
collector.collect(Tuple2.of(splited[0],1));
).setParallelism(5);
stream.keyBy(0).process(new CountWithTimeoutFunction());
env.execute("TestEventTimer");
public static class CountWithTimestamp
public String key;
public Integer count;
public long lastModified;
public String getKey()
return key;
public void setKey(String key)
this.key = key;
public Integer getCount()
return count;
public void setCount(Integer count)
this.count = count;
public long getLastModified()
return lastModified;
public void setLastModified(long lastModified)
this.lastModified = lastModified;
@Override
public String toString()
return "CountWithTimestamp" +
"key='" + key + '\\'' +
", count=" + count +
", lastModified=" + lastModified +
'';
//eventTime的定时器
public static class CountWithTimeoutFunction extends KeyedProcessFunction<Tuple,Tuple2<String,Integer>,Tuple2<String,Integer>>
private ValueState<CountWithTimestamp> state;
@Override
public void open(Configuration parameters) throws Exception
super.open(parameters);
state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState",CountWithTimestamp.class));
@Override
public void processElement(Tuple2<String, Integer> value, Context context, Collector<Tuple2<String, Integer>> collector) throws Exception
CountWithTimestamp current = state.value();
if(current == null)
current = new CountWithTimestamp();
current.key=value.f0;
current.count = 0;
current.lastModified = 0;
current.count +=1;
current.lastModified=context.timestamp();
state.update(current);
context.timerService().registerEventTimeTimer(current.lastModified + 60000);
/**
* 定时器出发时的回调函数
* @param timestamp 定时器设定的时间,注意:不是watermark的时间。
* @param ctx
* @param out
* @throws Exception
*/
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Integer>> out) throws Exception
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
System.out.print("====>定时器:"+sdf.format(timestamp)+"被处罚!处罚时的watermark是:"+ctx.timerService().currentWatermark());
CountWithTimestamp result = state.value();
if(timestamp == result.lastModified + 60000)
out.collect(Tuple2.of(result.key,result.count));
processTimer的案例:一个商品在100秒之内连续上升,就会输出这个商品
package Flink_API;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.util.Collector;
import java.text.SimpleDateFormat;
import java.util.Properties;
public class TestEventTimer
public static void main(String[] args) throws Exception
//获取Flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setParallelism(1);
String topic="test_topi_2";
Properties properties=new Properties();
properties.setProperty("bootstrap.servers","page1:9002");
properties.setProperty("group.id","con1");
FlinkKafkaConsumer010<String> myConsumer =new FlinkKafkaConsumer010<>(topic, (DeserializationSchema) new SimpleStringSchema(),properties);
DataStreamSource<String> kafkaTestStream = env.addSource(myConsumer);
DataStream<String> StreamOperator=kafkaTestStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(30))
@Override
public long extractTimestamp(String s)
String[] splited =s.split(",");
long event_time=Long.parseLong(splited[1]);
return event_time;
);
DataStream<Tuple2<String,Integer>> stream=StreamOperator.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>()
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception
String[] splited = s.split("\\\\W+");
collector.collect(Tuple2.of(splited[0],1));
).setParallelism(5);
stream.keyBy(0).process(new TempIncreWaring(10000)).print();
env.execute("TestProcessTimer");
public static class CountWithTimestamp
public String key;
public Integer count;
public long lastModified;
public String getKey()
return key;
public void setKey(String key)
this.key = key;
public Integer getCount()
return count;
public void setCount(Integer count)
this.count = count;
public long getLastModified()
return lastModified;
public void setLastModified(long lastModified)
this.lastModified = lastModified;
@Override
public String toString()
return "CountWithTimestamp" +
"key='" + key + '\\'' +
", count=" + count +
", lastModified=" + lastModified +
'';
//processTime的定时器
public static class TempIncreWaring extends KeyedProcessFunction<Tuple,Tuple2<String,Integer>,String>
private Integer intervalTime;
public TempIncreWaring(Integer intervalTime)
this.intervalTime=intervalTime;
//定义状态:保存该key对应的上一个价格值进行比较,保存注册定时器时间戳用于删除
private ValueState<Integer> lastPriceState;
private ValueState<Long> timerTsState;
@Override
public void open(Configuration parameters) throws Exception
super.open(parameters);
lastPriceState=getRuntimeContext().getState(new ValueStateDescriptor<>("myState1",Integer.class,0));
timerTsState=getRuntimeContext().getState(new ValueStateDescriptor<>("myState2",Long.class,0L));
@Override
public void processElement(Tuple2<String, Integer> value, Context context, Collector<String> collector) throws Exception
Integer lastPrice = lastPriceState.value();
long timerTs = timerTsState.value();
//当前价格和上一次价格进行比较
if(value.f1 > lastPrice && timerTs == 0L)
long ts =context.timerService().currentProcessingTime() + this.intervalTime;
context.timerService().registerProcessingTimeTimer(ts);
//更新定时器时间
timerTsState.update(ts);
else if(value.f1 <= lastPrice)
//如果价格下降,删除定时器
context.timerService().deleteProcessingTimeTimer(timerTs);
timerTsState.update(0L);//或者timerTsState.clear()
lastPriceState.update(value.f1);
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception
System.out.print("------------>");
out.collect("商品:"+ctx.getCurrentKey()+"的价格,连续:"+this.intervalTime/1000+"秒连续上升");
timerTsState.clear();//定时器清零
System.out.print("<--------------------");
以上是关于Flink的定时器(EventTime和ProcessTime)的主要内容,如果未能解决你的问题,请参考以下文章
Flink基于EventTime和WaterMark处理乱序事件和晚到的数据
Flink基于EventTime和WaterMark处理乱序事件和晚到的数据
Flink基于EventTime和WaterMark处理乱序事件和晚到的数据
Flink基于EventTime和WaterMark处理乱序事件和晚到的数据