Flink---窗口函数

Posted Shall潇

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink---窗口函数相关的知识,希望对你有一定的参考价值。

各种窗口

import Flink.beans.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Properties;

public class Win_Offset 
    public static void main(String[] args) 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //env.setParallelism(1);

        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.159.100:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"gro1");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");

        DataStreamSource<String> dss = env.addSource(new FlinkKafkaConsumer011<String>("flinkKafka", new SimpleStringSchema(), properties));

        SingleOutputStreamOperator<SensorReading> mapStream = dss.map(line -> 
            String[] split = line.split(",");
            SensorReading sr = new SensorReading(split[0], Long.valueOf(split[1]), Double.valueOf(split[2]));
            return sr;
        );

//        WindowedStream<SensorReading, Tuple, TimeWindow> windowedStream = mapStream.keyBy("id")
//                .timeWindow(Time.seconds(10));                     //滚动窗口
//                .timeWindow(Time.seconds(10),Time.seconds(5));     //滑动窗口
//                .countWindow(12);                 //固定大小窗口
//                .countWindow(12,4);
//                .window(EventTimeSessionWindows.withGap(Time.milliseconds(10000)));  //静态会话窗口
//                .window(EventTimeSessionWindows.withDynamicGap());
//                .window(TumblingEventTimeWindows.of(Time.seconds(15)));       //滚动窗口和上面等价,但是这个可以添加时区
//                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)));  //滑动窗口


/*
        SingleOutputStreamOperator<SensorReading> resultMaxStream = windowedStream.max("temperature");
        resultMaxStream.print("max");*/

        KeyedStream<SensorReading, Tuple> keyedStream = mapStream.keyBy("id");

        SingleOutputStreamOperator<SensorReading> reduceStream = keyedStream.countWindow(6, 2)
                .reduce(new ReduceFunction<SensorReading>() 
                    @Override
                    public SensorReading reduce(SensorReading sensorReading, SensorReading t1) throws Exception 
                        return new SensorReading(
                                sensorReading.getId(),
                                sensorReading.getTimestamp(),
                                sensorReading.getTemperature() + t1.getTemperature()
                        );
                    
                );

        reduceStream.print("countWindow");

        try 
            env.execute("滑动窗口,滚动窗口");
         catch (Exception e) 
            e.printStackTrace();
        

    

aggregate

import Flink.beans.SensorReading;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Properties;

/**
 * @Author shall潇
 * @Date 2021/6/30
 * @Description     求一段时间内的温度的平均值:aggregate
 */
public class Win_Avg 
    public static void main(String[] args) 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //env.setParallelism(1);

        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.159.100:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"gro1");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");

        DataStreamSource<String> dss = env.addSource(new FlinkKafkaConsumer011<String>("flinkKafka", new SimpleStringSchema(), properties));

        SingleOutputStreamOperator<SensorReading> mapStream = dss.map(line -> 
            String[] split = line.split(",");
            SensorReading sr = new SensorReading(split[0], Long.valueOf(split[1]), Double.valueOf(split[2]));
            return sr;
        );

        KeyedStream<SensorReading, Tuple> keyedStream = mapStream.keyBy("id");

        keyedStream.countWindow(6,2)
                .aggregate(new AggregateFunction<SensorReading, Tuple2<Double,Integer>, Double>() 
                    @Override
                    // 初始化
                    public Tuple2<Double, Integer> createAccumulator() 
                        return new Tuple2<>(0.0,0);     // 温度,个数
                    

                    @Override
                    // 每进来一个数据,进行温度累加,个数累加
                    public Tuple2<Double, Integer> add(SensorReading sensorReading, Tuple2<Double, Integer> doubleIntegerTuple2) 
                        Double temp = sensorReading.getTemperature()+doubleIntegerTuple2.f0;
                        Integer count = doubleIntegerTuple2.f1+1;
                        return new Tuple2<>(temp,count);
                    

                    @Override
                    // 统计平均值
                    public Double getResult(Tuple2<Double, Integer> doubleIntegerTuple2) 
                        Double avgValue = doubleIntegerTuple2.f0/doubleIntegerTuple2.f1;
                        return avgValue;
                    

                    @Override
                    // 组间合并
                    public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> doubleIntegerTuple2, Tuple2<Double, Integer> acc1) 
                        Double sumTemp = doubleIntegerTuple2.f0+acc1.f0;
                        Integer cnt = doubleIntegerTuple2.f1+acc1.f1;
                        return new Tuple2<>(sumTemp,cnt);
                    
                );

        try 
            env.execute("avg-window");
         catch (Exception e) 
            e.printStackTrace();
        
    

apply

import Flink.beans.SensorReading;
import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Iterator;
import java.util.List;
import java.util.Properties;

public class Win_4 
    public static void main(String[] args) 

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(3);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);  // 默认为事件事件
        // 配置kafka
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.159.100:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"gro5");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");

        DataStreamSource<String> dss = env.addSource(new FlinkKafkaConsumer011<String>("flinkKafka", new SimpleStringSchema(), properties));

        SingleOutputStreamOperator<SensorReading> mapStream = dss.map(line -> 
            String[] split = line.split(",");
            SensorReading sr = new SensorReading(split[0], Long.valueOf(split[1]), Double.valueOf(split[2]));
            return sr;
        ).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2))   //设置延迟时间
            @Override
            public long extractTimestamp(SensorReading sensorReading) 
                return sensorReading.getTimestamp() * 1000L;
            
        ).setParallelism(3);

        SingleOutputStreamOperator<Tuple4<String, Long, Long, Integer>> appStream = mapStream.keyBy("id")
                .timeWindow(Time.seconds(15))
                // 当窗口关闭的时候执行计算,和 reduce,agg不一样,它们是来一条执行一条
                .apply(new WindowFunction<SensorReading, Tuple4<String, Long, Long, Integer>, Tuple, TimeWindow>() 
                    @Override
                    // tuple : 存放的是 key,timeWindow:窗口信息,iterable:当前窗口内所有数据,
                    public void apply(Tuple tuple,
                                      TimeWindow timeWindow,
                                      Iterable<SensorReading> iterable,
                                      Collector<Tuple4<String, Long, Long, Integer>> collector) throws Exception 
                        String key = tuple.getField(0);  // key   sensor_1
                        long start = timeWindow.getStart();
                        long end = timeWindow.getEnd();
                        Iterator<SensorReading> iterator = iterable.iterator();
                        List list = IteratorUtils.toList(iterator);
                        int size = list.size();

                        Tuple4<String, Long, Long, Integer> returnValue = new Tuple4<>(key, start, end, size);
                        collector.collect(returnValue);     //返回
                    
                );

        dss.print("input");

        appStream.print("apply");

        try 
            env.execute("flink-windows");
         catch (Exception e) 
            e.printStackTrace();
        
    

以上是关于Flink---窗口函数的主要内容,如果未能解决你的问题,请参考以下文章

Flink---窗口函数

Flink---窗口函数

Flink---窗口函数

Flink 窗口处理函数 WindowFunction

Flink 窗口处理函数 WindowFunction

flink sql clinet 实战:窗口函数----flink-1.13.6