Flink---窗口函数

Posted Shall潇

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink---窗口函数相关的知识,希望对你有一定的参考价值。

各种窗口

import Flink.beans.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Properties;

public class Win_Offset {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //env.setParallelism(1);

        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.159.100:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"gro1");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");

        DataStreamSource<String> dss = env.addSource(new FlinkKafkaConsumer011<String>("flinkKafka", new SimpleStringSchema(), properties));

        SingleOutputStreamOperator<SensorReading> mapStream = dss.map(line -> {
            String[] split = line.split(",");
            SensorReading sr = new SensorReading(split[0], Long.valueOf(split[1]), Double.valueOf(split[2]));
            return sr;
        });

//        WindowedStream<SensorReading, Tuple, TimeWindow> windowedStream = mapStream.keyBy("id")
//                .timeWindow(Time.seconds(10));                     //滚动窗口
//                .timeWindow(Time.seconds(10),Time.seconds(5));     //滑动窗口
//                .countWindow(12);                 //固定大小窗口
//                .countWindow(12,4);
//                .window(EventTimeSessionWindows.withGap(Time.milliseconds(10000)));  //静态会话窗口
//                .window(EventTimeSessionWindows.withDynamicGap());
//                .window(TumblingEventTimeWindows.of(Time.seconds(15)));       //滚动窗口和上面等价,但是这个可以添加时区
//                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)));  //滑动窗口


/*
        SingleOutputStreamOperator<SensorReading> resultMaxStream = windowedStream.max("temperature");
        resultMaxStream.print("max");*/

        KeyedStream<SensorReading, Tuple> keyedStream = mapStream.keyBy("id");

        SingleOutputStreamOperator<SensorReading> reduceStream = keyedStream.countWindow(6, 2)
                .reduce(new ReduceFunction<SensorReading>() {
                    @Override
                    public SensorReading reduce(SensorReading sensorReading, SensorReading t1) throws Exception {
                        return new SensorReading(
                                sensorReading.getId(),
                                sensorReading.getTimestamp(),
                                sensorReading.getTemperature() + t1.getTemperature()
                        );
                    }
                });

        reduceStream.print("countWindow");

        try {
            env.execute("滑动窗口,滚动窗口");
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

aggregate

import Flink.beans.SensorReading;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Properties;

/**
 * @Author shall潇
 * @Date 2021/6/30
 * @Description     求一段时间内的温度的平均值:aggregate
 */
public class Win_Avg {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //env.setParallelism(1);

        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.159.100:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"gro1");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");

        DataStreamSource<String> dss = env.addSource(new FlinkKafkaConsumer011<String>("flinkKafka", new SimpleStringSchema(), properties));

        SingleOutputStreamOperator<SensorReading> mapStream = dss.map(line -> {
            String[] split = line.split(",");
            SensorReading sr = new SensorReading(split[0], Long.valueOf(split[1]), Double.valueOf(split[2]));
            return sr;
        });

        KeyedStream<SensorReading, Tuple> keyedStream = mapStream.keyBy("id");

        keyedStream.countWindow(6,2)
                .aggregate(new AggregateFunction<SensorReading, Tuple2<Double,Integer>, Double>() {
                    @Override
                    // 初始化
                    public Tuple2<Double, Integer> createAccumulator() {
                        return new Tuple2<>(0.0,0);     // 温度,个数
                    }

                    @Override
                    // 每进来一个数据,进行温度累加,个数累加
                    public Tuple2<Double, Integer> add(SensorReading sensorReading, Tuple2<Double, Integer> doubleIntegerTuple2) {
                        Double temp = sensorReading.getTemperature()+doubleIntegerTuple2.f0;
                        Integer count = doubleIntegerTuple2.f1+1;
                        return new Tuple2<>(temp,count);
                    }

                    @Override
                    // 统计平均值
                    public Double getResult(Tuple2<Double, Integer> doubleIntegerTuple2) {
                        Double avgValue = doubleIntegerTuple2.f0/doubleIntegerTuple2.f1;
                        return avgValue;
                    }

                    @Override
                    // 组间合并
                    public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> doubleIntegerTuple2, Tuple2<Double, Integer> acc1) {
                        Double sumTemp = doubleIntegerTuple2.f0+acc1.f0;
                        Integer cnt = doubleIntegerTuple2.f1+acc1.f1;
                        return new Tuple2<>(sumTemp,cnt);
                    }
                });

        try {
            env.execute("avg-window");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

apply

import Flink.beans.SensorReading;
import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Iterator;
import java.util.List;
import java.util.Properties;

public class Win_4 {
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(3);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);  // 默认为事件事件
        // 配置kafka
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.159.100:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"gro5");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");

        DataStreamSource<String> dss = env.addSource(new FlinkKafkaConsumer011<String>("flinkKafka", new SimpleStringSchema(), properties));

        SingleOutputStreamOperator<SensorReading> mapStream = dss.map(line -> {
            String[] split = line.split(",");
            SensorReading sr = new SensorReading(split[0], Long.valueOf(split[1]), Double.valueOf(split[2]));
            return sr;
        }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {  //设置延迟时间
            @Override
            public long extractTimestamp(SensorReading sensorReading) {
                return sensorReading.getTimestamp() * 1000L;
            }
        }).setParallelism(3);

        SingleOutputStreamOperator<Tuple4<String, Long, Long, Integer>> appStream = mapStream.keyBy("id")
                .timeWindow(Time.seconds(15))
                // 当窗口关闭的时候执行计算,和 reduce,agg不一样,它们是来一条执行一条
                .apply(new WindowFunction<SensorReading, Tuple4<String, Long, Long, Integer>, Tuple, TimeWindow>() {
                    @Override
                    // tuple : 存放的是 key,timeWindow:窗口信息,iterable:当前窗口内所有数据,
                    public void apply(Tuple tuple,
                                      TimeWindow timeWindow,
                                      Iterable<SensorReading> iterable,
                                      Collector<Tuple4<String, Long, Long, Integer>> collector) throws Exception {
                        String key = tuple.getField(0);  // key   sensor_1
                        long start = timeWindow.getStart();
                        long end = timeWindow.getEnd();
                        Iterator<SensorReading> iterator = iterable.iterator();
                        List list = IteratorUtils.toList(iterator);
                        int size = list.size();

                        Tuple4<String, Long, Long, Integer> returnValue = new Tuple4<>(key, start, end, size);
                        collector.collect(returnValue);     //返回
                    }
                });

        dss.print("input");

        appStream.print("apply");

        try {
            env.execute("flink-windows");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

以上是关于Flink---窗口函数的主要内容,如果未能解决你的问题,请参考以下文章

Flink AggregateFunction窗口函数,merge何时执行

大数据(9f)Flink窗口函数练习:计算PV和UV

大数据(9f)Flink窗口函数练习:计算PV和UV

Flink SQL 分组窗口函数 Group Window 实战

Flink 窗口函数

Flink---窗口函数