Flink中实现自定义ProcessFunction实现定时器侧输出
Posted Leo Han
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink中实现自定义ProcessFunction实现定时器侧输出相关的知识,希望对你有一定的参考价值。
在Flink中,当我们需要获取到算子的Processing Time或者Water Mark以及定时器时,可以实现ProcessFunction函数。
目前该函数主要有:K恶业的ProcessFuntion,ProcessFunction,CoPropcessFunction等,核心功能主要如下:
- 可以使用状态计算,能够在算子中访问Keyed State
- 可以设置定时器
- 侧输出,可以将一部分数据发送到另外一个数据流中,而且输出的两个数据流数据类型可以不一样。
如下自定义实现一个KeyedProcessFunction
:
public class MyKeyedProcessFunctionJava extends KeyedProcessFunction<String, StockPrice,String>
private ValueState<Long> currentTime;
private ValueState<Double> lastPrice ;
private static long intervalMs = 500 ;
OutputTag<StockPrice> highLevel ;
OutputTag<StockPrice> middleLevel;
OutputTag<StockPrice> lowLevel;
public MyKeyedProcessFunctionJava(OutputTag<StockPrice> highLevel,
OutputTag<StockPrice> middleLevel,
OutputTag<StockPrice> lowLevel)
this.lowLevel = lowLevel;
this.middleLevel= middleLevel;
this.highLevel = highLevel;
@Override
public void open(Configuration parameters) throws Exception
super.open(parameters);
currentTime = getRuntimeContext().getState(new ValueStateDescriptor<Long>("currentTime",Long.class));
lastPrice = getRuntimeContext().getState(new ValueStateDescriptor<Double>("lastPrice",Double.class));
@Override
public void processElement(StockPrice stockPrice, Context context, Collector<String> collector) throws Exception
double price = 0;
if(lastPrice.value() != null)
price = lastPrice.value() ;
long currentTimeStamp = 0;
if(currentTime.value() != null)
currentTimeStamp = currentTime.value();
if(price < stockPrice.getPrice())
context.timerService().deleteEventTimeTimer(currentTimeStamp);
else
long time = context.timestamp()+intervalMs;
context.timerService().registerEventTimeTimer(time);
currentTime.update(time);
lastPrice.update(stockPrice.getPrice());
if(price>10000)
context.output(highLevel,stockPrice);
else if(price<= 10000 && price > 3000)
context.output(middleLevel,stockPrice);
else
context.output(lowLevel,stockPrice);
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception
SimpleDateFormat format = new SimpleDateFormat("yyy-MM-dd HH:mm:ss");
String timeStr = format.format(timestamp);
String warnings = String.format("warnings: time=%s ,key=%s increased",timeStr,ctx.getCurrentKey());
out.collect(warnings);
public static void main(String[] args)
String topic = "test001";
Properties kafkaProps = new Properties();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer<StockPrice> consumer = new FlinkKafkaConsumer<StockPrice>(topic, new MyKafkaDeserializationSchema(), kafkaProps);
OutputTag<StockPrice> highLevel = new OutputTag<StockPrice>("highLevel");
OutputTag<StockPrice> middleLevel = new OutputTag<StockPrice>("middleLevel");
OutputTag<StockPrice> lowLevel = new OutputTag<StockPrice>("lowLevel");
SingleOutputStreamOperator<String> warning = env.addSource(consumer)
.keyBy(stockPrice -> stockPrice.getId())
.process(new MyKeyedProcessFunctionJava(highLevel,middleLevel,lowLevel));
DataStream<StockPrice> highLevelStream = warning.getSideOutput(highLevel);
DataStream<StockPrice> middleLevelStream = warning.getSideOutput(middleLevel);
DataStream<StockPrice> lowLevelStream = warning.getSideOutput(lowLevel);
以上是关于Flink中实现自定义ProcessFunction实现定时器侧输出的主要内容,如果未能解决你的问题,请参考以下文章