Flink---process处理温度
Posted Shall潇
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink---process处理温度相关的知识,希望对你有一定的参考价值。
在10秒内如果一直超过38℃,10秒后报警,如果有低于38℃的数据,则不报警
package Flink.process;
import Flink.beans.SensorReading;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
/**
* @Author shall潇
* @Date 2021/7/5
* @Description 在10秒内如果一直超过38℃,10秒后报警,如果有低于38℃的数据,则不报警
*/
public class Process_Key_Function {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> dataStreamSource = env.socketTextStream("192.168.159.100", 7777);
SingleOutputStreamOperator<SensorReading> mapStream = dataStreamSource.map(line -> {
String[] split = line.split(",");
SensorReading sensorReading = new SensorReading(split[0], Long.valueOf(split[1]), Double.valueOf(split[2]));
return sensorReading;
});
KeyedStream<SensorReading, Tuple> keyedStream = mapStream.keyBy("id");
keyedStream.process(new MyKeyProcessFunction()).print("myProcess");
try {
env.execute("process_1");
} catch (Exception e) {
e.printStackTrace();
}
}
private static class MyKeyProcessFunction extends KeyedProcessFunction<Tuple,SensorReading,Integer> {
// 使用状态后端来保存注册时间,方便后面的删除
ValueState<Long> tsTimeState = null; //状态后端:保存上一个key对应的记录
// 被激活后执行的函数
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception {
System.out.println("当前时间:"+timestamp+"\\t定时器触发");
tsTimeState.clear();
}
@Override
public void open(Configuration parameters) throws Exception {
tsTimeState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("vsDesc",Long.class));
}
@Override
public void processElement(SensorReading sensorReading, Context context, Collector<Integer> collector) throws Exception {
Tuple currentKey = context.getCurrentKey();
Long timestamp = context.timestamp();
Long l = context.timerService().currentProcessingTime();
System.out.println(currentKey+":"+timestamp+":"+l);
//在10秒内如果一直超过38℃,10秒后报警,如果有低于38℃的数据,则不报警
if(sensorReading.getTemperature()>38 && tsTimeState.value()==null){
//注册,指定时间过后,执行 onTimer 函数中的内容
context.timerService().registerProcessingTimeTimer(context.timerService().currentProcessingTime()+10000L);
tsTimeState.update(context.timerService().currentProcessingTime()+10000L);
}else if(sensorReading.getTemperature()<=38 && tsTimeState.value()!=null) {
// 取消注册,不会触发 onTimer 函数中的内容,取消timeService服务
context.timerService().deleteProcessingTimeTimer(tsTimeState.value());
tsTimeState.clear();
}
collector.collect(sensorReading.toString().length());
}
}
}
温度连续上升则报警
只是把上面的代码稍作修改,添加的一个新的状态后端,保存上一个记录的温度
package Flink.process;
import Flink.beans.SensorReading;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
/**
* @Author shall潇
* @Date 2021/7/6
* @Description 状态后端实现温度连续上升则报警
*/
public class Process2_Key_Function {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> dataStreamSource = env.socketTextStream("192.168.159.100", 7777);
SingleOutputStreamOperator<SensorReading> mapStream = dataStreamSource.map(line -> {
String[] split = line.split(",");
SensorReading sensorReading = new SensorReading(split[0], Long.valueOf(split[1]), Double.valueOf(split[2]));
return sensorReading;
});
KeyedStream<SensorReading, Tuple> keyedStream = mapStream.keyBy("id");
keyedStream.process(new MyKeyProcessFunction2(10)).print("myProcess");
try {
env.execute("process-2");
} catch (Exception e) {
e.printStackTrace();
}
}
private static class MyKeyProcessFunction2 extends KeyedProcessFunction<Tuple,SensorReading,String>{
private Integer interval; //设置时间间隔
ValueState<Long> tsTimeState = null; //当前时间戳
ValueState<Double> lastTimeState = null; //存放上一次的温度
MyKeyProcessFunction2(Integer interval){
this.interval = interval;
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
String str = "在"+interval+"间隔内,温度连续升高,Warning !!!\\n";
out.collect(str);
tsTimeState.clear();
}
@Override
public void open(Configuration parameters) throws Exception {
tsTimeState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("vsDesc",Long.class));
lastTimeState = getRuntimeContext().getState(new ValueStateDescriptor<Double>("lastDesc",Double.class,Double.MIN_VALUE)); //初始化
}
@Override
public void close() throws Exception {
tsTimeState.clear();
lastTimeState.clear();
}
// TODO
@Override
public void processElement(SensorReading sensorReading, Context context, Collector<String> collector) throws Exception {
if(tsTimeState.value()==null && sensorReading.getTemperature()>lastTimeState.value()){
//当前温度大于上一次的温度,则注册
long ts = context.timerService().currentProcessingTime();
long timeServiceLong = ts + this.interval*1000L;
context.timerService().registerProcessingTimeTimer(timeServiceLong);
tsTimeState.update(timeServiceLong);
}else if(tsTimeState.value()!=null && sensorReading.getTemperature()>lastTimeState.value()){
}else if(tsTimeState.value()!=null && sensorReading.getTemperature()<=lastTimeState.value()){
//当温度小于上一次的温度,则取消注册
context.timerService().deleteProcessingTimeTimer(tsTimeState.value());
tsTimeState.clear();
}
lastTimeState.update(sensorReading.getTemperature());
}
}
}
以上是关于Flink---process处理温度的主要内容,如果未能解决你的问题,请参考以下文章