Flink---process处理温度

Posted Shall潇

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink---process处理温度相关的知识,希望对你有一定的参考价值。

文章目录

在10秒内如果一直超过38℃,10秒后报警,如果有低于38℃的数据,则不报警

package Flink.process;

import Flink.beans.SensorReading;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

/**
 * @Author shall潇
 * @Date 2021/7/5
 * @Description    在10秒内如果一直超过38℃,10秒后报警,如果有低于38℃的数据,则不报警
 */
public class Process_Key_Function 
    public static void main(String[] args) 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> dataStreamSource = env.socketTextStream("192.168.159.100", 7777);

        SingleOutputStreamOperator<SensorReading> mapStream = dataStreamSource.map(line -> 
            String[] split = line.split(",");
            SensorReading sensorReading = new SensorReading(split[0], Long.valueOf(split[1]), Double.valueOf(split[2]));
            return sensorReading;
        );

        KeyedStream<SensorReading, Tuple> keyedStream = mapStream.keyBy("id");
        keyedStream.process(new MyKeyProcessFunction()).print("myProcess");

        try 
            env.execute("process_1");
         catch (Exception e) 
            e.printStackTrace();
        
    

    private static class MyKeyProcessFunction extends KeyedProcessFunction<Tuple,SensorReading,Integer> 

        // 使用状态后端来保存注册时间,方便后面的删除
        ValueState<Long> tsTimeState = null;  //状态后端:保存上一个key对应的记录

        // 被激活后执行的函数
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception 
            System.out.println("当前时间:"+timestamp+"\\t定时器触发");
            tsTimeState.clear();
        

        @Override
        public void open(Configuration parameters) throws Exception 
            tsTimeState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("vsDesc",Long.class));
        

        @Override
        public void processElement(SensorReading sensorReading, Context context, Collector<Integer> collector) throws Exception 
            Tuple currentKey = context.getCurrentKey();
            Long timestamp = context.timestamp();
            Long l = context.timerService().currentProcessingTime();
            System.out.println(currentKey+":"+timestamp+":"+l);

            //在10秒内如果一直超过38℃,10秒后报警,如果有低于38℃的数据,则不报警
            if(sensorReading.getTemperature()>38 && tsTimeState.value()==null)
                //注册,指定时间过后,执行 onTimer 函数中的内容
                context.timerService().registerProcessingTimeTimer(context.timerService().currentProcessingTime()+10000L);
                tsTimeState.update(context.timerService().currentProcessingTime()+10000L);
            else if(sensorReading.getTemperature()<=38 && tsTimeState.value()!=null) 
                // 取消注册,不会触发 onTimer 函数中的内容,取消timeService服务
                context.timerService().deleteProcessingTimeTimer(tsTimeState.value());
                tsTimeState.clear();
            
            collector.collect(sensorReading.toString().length());
        
    

温度连续上升则报警

只是把上面的代码稍作修改,添加的一个新的状态后端,保存上一个记录的温度

package Flink.process;

import Flink.beans.SensorReading;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

/**
 * @Author shall潇
 * @Date 2021/7/6
 * @Description     状态后端实现温度连续上升则报警
 */
public class Process2_Key_Function 
    public static void main(String[] args) 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> dataStreamSource = env.socketTextStream("192.168.159.100", 7777);
        SingleOutputStreamOperator<SensorReading> mapStream = dataStreamSource.map(line -> 
            String[] split = line.split(",");
            SensorReading sensorReading = new SensorReading(split[0], Long.valueOf(split[1]), Double.valueOf(split[2]));
            return sensorReading;
        );

        KeyedStream<SensorReading, Tuple> keyedStream = mapStream.keyBy("id");
        keyedStream.process(new MyKeyProcessFunction2(10)).print("myProcess");

        try 
            env.execute("process-2");
         catch (Exception e) 
            e.printStackTrace();
        
    

    private static class MyKeyProcessFunction2 extends KeyedProcessFunction<Tuple,SensorReading,String>
        private Integer interval;                       //设置时间间隔
        ValueState<Long> tsTimeState = null;            //当前时间戳
        ValueState<Double> lastTimeState = null;        //存放上一次的温度

        MyKeyProcessFunction2(Integer interval)
            this.interval = interval;
        

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception 
            String str = "在"+interval+"间隔内,温度连续升高,Warning !!!\\n";
            out.collect(str);
            tsTimeState.clear();
        

        @Override
        public void open(Configuration parameters) throws Exception 
            tsTimeState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("vsDesc",Long.class));
            lastTimeState = getRuntimeContext().getState(new ValueStateDescriptor<Double>("lastDesc",Double.class,Double.MIN_VALUE));  //初始化
        

        @Override
        public void close() throws Exception 
            tsTimeState.clear();
            lastTimeState.clear();
        

        // TODO
        @Override
        public void processElement(SensorReading sensorReading, Context context, Collector<String> collector) throws Exception 
            if(tsTimeState.value()==null && sensorReading.getTemperature()>lastTimeState.value())
                //当前温度大于上一次的温度,则注册
                long ts = context.timerService().currentProcessingTime();
                long timeServiceLong = ts + this.interval*1000L;
                context.timerService().registerProcessingTimeTimer(timeServiceLong);
                tsTimeState.update(timeServiceLong);

            else if(tsTimeState.value()!=null && sensorReading.getTemperature()>lastTimeState.value())

            else if(tsTimeState.value()!=null && sensorReading.getTemperature()<=lastTimeState.value())
                //当温度小于上一次的温度,则取消注册
                context.timerService().deleteProcessingTimeTimer(tsTimeState.value());
                tsTimeState.clear();
            
            lastTimeState.update(sensorReading.getTemperature());
        
    

以上是关于Flink---process处理温度的主要内容,如果未能解决你的问题,请参考以下文章

Flink---process处理温度

idea 下flink Process finished with exit code 130

读取树莓派4B处理器(CPU)的实时温度

温度转换异常处理

温度转换异常处理

西门子博途编程-模拟量断线超量程处理(中值法)