Flink---process处理温度
Posted Shall潇
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink---process处理温度相关的知识,希望对你有一定的参考价值。
文章目录
在10秒内如果一直超过38℃,10秒后报警,如果有低于38℃的数据,则不报警
package Flink.process;
import Flink.beans.SensorReading;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
/**
* @Author shall潇
* @Date 2021/7/5
* @Description 在10秒内如果一直超过38℃,10秒后报警,如果有低于38℃的数据,则不报警
*/
public class Process_Key_Function
public static void main(String[] args)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> dataStreamSource = env.socketTextStream("192.168.159.100", 7777);
SingleOutputStreamOperator<SensorReading> mapStream = dataStreamSource.map(line ->
String[] split = line.split(",");
SensorReading sensorReading = new SensorReading(split[0], Long.valueOf(split[1]), Double.valueOf(split[2]));
return sensorReading;
);
KeyedStream<SensorReading, Tuple> keyedStream = mapStream.keyBy("id");
keyedStream.process(new MyKeyProcessFunction()).print("myProcess");
try
env.execute("process_1");
catch (Exception e)
e.printStackTrace();
private static class MyKeyProcessFunction extends KeyedProcessFunction<Tuple,SensorReading,Integer>
// 使用状态后端来保存注册时间,方便后面的删除
ValueState<Long> tsTimeState = null; //状态后端:保存上一个key对应的记录
// 被激活后执行的函数
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception
System.out.println("当前时间:"+timestamp+"\\t定时器触发");
tsTimeState.clear();
@Override
public void open(Configuration parameters) throws Exception
tsTimeState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("vsDesc",Long.class));
@Override
public void processElement(SensorReading sensorReading, Context context, Collector<Integer> collector) throws Exception
Tuple currentKey = context.getCurrentKey();
Long timestamp = context.timestamp();
Long l = context.timerService().currentProcessingTime();
System.out.println(currentKey+":"+timestamp+":"+l);
//在10秒内如果一直超过38℃,10秒后报警,如果有低于38℃的数据,则不报警
if(sensorReading.getTemperature()>38 && tsTimeState.value()==null)
//注册,指定时间过后,执行 onTimer 函数中的内容
context.timerService().registerProcessingTimeTimer(context.timerService().currentProcessingTime()+10000L);
tsTimeState.update(context.timerService().currentProcessingTime()+10000L);
else if(sensorReading.getTemperature()<=38 && tsTimeState.value()!=null)
// 取消注册,不会触发 onTimer 函数中的内容,取消timeService服务
context.timerService().deleteProcessingTimeTimer(tsTimeState.value());
tsTimeState.clear();
collector.collect(sensorReading.toString().length());
温度连续上升则报警
只是把上面的代码稍作修改,添加的一个新的状态后端,保存上一个记录的温度
package Flink.process;
import Flink.beans.SensorReading;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
/**
* @Author shall潇
* @Date 2021/7/6
* @Description 状态后端实现温度连续上升则报警
*/
public class Process2_Key_Function
public static void main(String[] args)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> dataStreamSource = env.socketTextStream("192.168.159.100", 7777);
SingleOutputStreamOperator<SensorReading> mapStream = dataStreamSource.map(line ->
String[] split = line.split(",");
SensorReading sensorReading = new SensorReading(split[0], Long.valueOf(split[1]), Double.valueOf(split[2]));
return sensorReading;
);
KeyedStream<SensorReading, Tuple> keyedStream = mapStream.keyBy("id");
keyedStream.process(new MyKeyProcessFunction2(10)).print("myProcess");
try
env.execute("process-2");
catch (Exception e)
e.printStackTrace();
private static class MyKeyProcessFunction2 extends KeyedProcessFunction<Tuple,SensorReading,String>
private Integer interval; //设置时间间隔
ValueState<Long> tsTimeState = null; //当前时间戳
ValueState<Double> lastTimeState = null; //存放上一次的温度
MyKeyProcessFunction2(Integer interval)
this.interval = interval;
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception
String str = "在"+interval+"间隔内,温度连续升高,Warning !!!\\n";
out.collect(str);
tsTimeState.clear();
@Override
public void open(Configuration parameters) throws Exception
tsTimeState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("vsDesc",Long.class));
lastTimeState = getRuntimeContext().getState(new ValueStateDescriptor<Double>("lastDesc",Double.class,Double.MIN_VALUE)); //初始化
@Override
public void close() throws Exception
tsTimeState.clear();
lastTimeState.clear();
// TODO
@Override
public void processElement(SensorReading sensorReading, Context context, Collector<String> collector) throws Exception
if(tsTimeState.value()==null && sensorReading.getTemperature()>lastTimeState.value())
//当前温度大于上一次的温度,则注册
long ts = context.timerService().currentProcessingTime();
long timeServiceLong = ts + this.interval*1000L;
context.timerService().registerProcessingTimeTimer(timeServiceLong);
tsTimeState.update(timeServiceLong);
else if(tsTimeState.value()!=null && sensorReading.getTemperature()>lastTimeState.value())
else if(tsTimeState.value()!=null && sensorReading.getTemperature()<=lastTimeState.value())
//当温度小于上一次的温度,则取消注册
context.timerService().deleteProcessingTimeTimer(tsTimeState.value());
tsTimeState.clear();
lastTimeState.update(sensorReading.getTemperature());
以上是关于Flink---process处理温度的主要内容,如果未能解决你的问题,请参考以下文章