11.Flink四大基石Window窗口的分类Flink提供了很多各种场景用的WindowAssigner基于时间的滚动和滑动基于时间的滚动和滑动窗口基于数量的滚动和滑动

Posted 涂作权的博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了11.Flink四大基石Window窗口的分类Flink提供了很多各种场景用的WindowAssigner基于时间的滚动和滑动基于时间的滚动和滑动窗口基于数量的滚动和滑动相关的知识,希望对你有一定的参考价值。

本文来自:Flink1.12-2021黑马程序员贺岁视频 的学习笔记

11.Flink四大基石
12.Window
12.1.窗口的分类
12.2.API
12.2.1.window和windowAll
12.2.2.API调用示例
12.2.3.Flink提供了很多各种场景用的WindowAssigner
12.3.代码演示–基于时间的滚动和滑动–掌握
12.3.1.本地ubuntu下安装nc
12.3.2.基于时间的滚动和滑动窗口
12.4.代码演示-基于数量的滚动和滑动
12.5.代码演示–Session会话窗口

11.Flink四大基石


Checkpoint
基于Chandy-Lamport算法,实现了分布式一致性快照,提供了一致性的语义。

State
丰富的State API。ValueState, ListState, MapState, BroadcastState。

Time
实现了Watermark机制。乱序数据处理,迟到数据容忍。

Window
开箱即用的滚动、滑动、会话窗口。以及灵活的自定义窗口。

12.Window

12.1.窗口的分类



time-window: 时间窗口:根据时间划分窗口,如:每xx分钟统计最近xx分钟的数据。
count-window: 数据窗口:根据数量划分窗口,如:每xx个数据统计最近xx个数据。

总结:
按照上面窗口的分类方式进行组合,可以得出如下的窗口:
1.基于时间的滚动窗口tumbling-time-window --用的较多。
2.基于时间的滑动窗口sliding-time-window–用的较多。
3.基于数量的滚动窗口tumbling-count-window – 用的较少。
4.基于数量的滑动窗口sliding-count-window–用的较少。
注意:Flink还支持一个特殊的窗口:Session会话窗口,需要设置一个会话超时时间,如30s,则表示30s内没有数据到来,则触发上一个窗口的计算。

12.2.API

12.2.1.window和windowAll

12.2.2.API调用示例

12.2.3.Flink提供了很多各种场景用的WindowAssigner

12.3.代码演示–基于时间的滚动和滑动–掌握

12.3.1.本地ubuntu下安装nc

转自:https://blog.csdn.net/qq_30653631/article/details/93749505

首先需要知道的一点是Ubuntu上默认安装的是netcat-openbsd,而不是经典的netcat-traditional. 因此,我们要想使用netcat-traditional则需要自己进行安装与配置,其详细过程如下所示:

sudo apt-get -y install netcat-traditional

sudo update-alternatives --config nc

这一步要在root下运行,选择对应编号,我这里是0

12.3.2.基于时间的滚动和滑动窗口

需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量–基于时间的滚动窗口。
需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量–基于时间的滑动窗口。

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * Desc 演示基于时间的滚动和滑动窗口
 *
 * @author tuzuoquan
 * @date 2022/4/26 18:39
 */
public class WindowDemo_1_2 

    public static void main(String[] args) throws Exception 
        //TODO 0.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //TODO 1.source
        DataStream<String> lines = env.socketTextStream("localhost", 9999);
        //TODO 2.transformation
        SingleOutputStreamOperator<CartInfo> carDS = lines.map(new MapFunction<String, CartInfo>() 
            @Override
            public CartInfo map(String value) throws Exception 
                String[] arr = value.split(",");
                return new CartInfo(arr[0], Integer.parseInt(arr[1]));
            
        );

        //注意: 需求中要求的是各个路口/红绿灯的结果,所以需要先分组
        //carDS.keyBy(car->car.getSensorId())
        KeyedStream<CartInfo, String> keyedDS = carDS.keyBy(CartInfo::getSensorId);
        //需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滚动窗口
        //keyedDS.timeWindow(Time.seconds(5))
        SingleOutputStreamOperator<CartInfo> result1 = keyedDS
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .sum("count");
        //需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滑动窗口
        SingleOutputStreamOperator<CartInfo> result2 = keyedDS
                //of(Time size, Time slide)
                .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
                .sum("count");

        //TODO 3.sink
        //result1.print();
        result2.print();
        /*
        1,5
        2,5
        3,5
        4,5
        */

        //TODO 4.execute
        env.execute();
    

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CartInfo 
        private String sensorId;   //信号灯id
        private Integer count;     //通过该信号灯的车的数量
    


nc中的内容:

nc -lk 9999
1,5
2,5
3,5
4,5
6,6

结果:

4> WindowDemo_1_2.CartInfo(sensorId=1, count=5)
4> WindowDemo_1_2.CartInfo(sensorId=1, count=5)
3> WindowDemo_1_2.CartInfo(sensorId=3, count=5)
2> WindowDemo_1_2.CartInfo(sensorId=2, count=5)
2> WindowDemo_1_2.CartInfo(sensorId=2, count=5)
3> WindowDemo_1_2.CartInfo(sensorId=3, count=5)
1> WindowDemo_1_2.CartInfo(sensorId=4, count=5)
2> WindowDemo_1_2.CartInfo(sensorId=6, count=6)
1> WindowDemo_1_2.CartInfo(sensorId=4, count=5)
2> WindowDemo_1_2.CartInfo(sensorId=6, count=6)

12.4.代码演示-基于数量的滚动和滑动

需求
需求1:统计最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计–基于数量的滚动窗口。
需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计–基于数量的滑动窗口。

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Desc 演示基于数量的滚动和滑动窗口
 *
 * @author tuzuoquan
 * @date 2022/4/28 9:30
 */
public class WindowDemo_3_4 

    public static void main(String[] args) throws Exception 
        //TODO 0.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //TODO 1.source
        DataStream<String> lines = env.socketTextStream("localhost", 9999);

        //TODO 2.transformation
        SingleOutputStreamOperator<CartInfo> carDS = lines.map(new MapFunction<String, CartInfo>() 
            @Override
            public CartInfo map(String value) throws Exception 
                String[] arr = value.split(",");
                return new CartInfo(arr[0], Integer.parseInt(arr[1]));
            
        );

        //注意:需求中要求的是各个路口/红绿灯的结果,所以需要先分组
        //carDS.keyBy(car->car.getSensorId())
        KeyedStream<CartInfo, String> keyedDS = carDS.keyBy(CartInfo::getSensorId);

        //需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口
        SingleOutputStreamOperator<CartInfo> result1 = keyedDS
                .countWindow(5)
                .sum("count");

        //需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口
        SingleOutputStreamOperator<CartInfo> result2 = keyedDS
                .countWindow(5,3)
                .sum("count");
        //TODO 3.sink
        //result1.print();

        result2.print();

        //TODO 4.execute
        env.execute();
    

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CartInfo 
        // 信号灯id
        private String sensorId;
        //通过该信号灯的车的数量
        private Integer count;
    


输入值:

输出结果:

4> WindowDemo_3_4.CartInfo(sensorId=1, count=3)
4> WindowDemo_3_4.CartInfo(sensorId=1, count=11)
4> WindowDemo_3_4.CartInfo(sensorId=1, count=19)

12.5.代码演示–Session会话窗口

需求:
设置会话超时时间为10s, 10s内没有数据到来,则触发上个窗口的计算。

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * TODO
 *
 * @author tuzuoquan
 * @date 2022/4/28 23:24
 */
public class WindowDemo_5 

    public static void main(String[] args) throws Exception 
        //TODO 0.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //TODO 1.source
        DataStream<String> lines = env.socketTextStream("localhost", 9999);

        //TODO 2.transformation
        SingleOutputStreamOperator<CartInfo> carDS = lines.map(new MapFunction<String, CartInfo>() 
            @Override
            public CartInfo map(String value) throws Exception 
                String[] arr = value.split(",");
                return new CartInfo(arr[0], Integer.parseInt(arr[1]));
            
        );

        //注意: 需求中要求的是各个路口/红绿灯的结果,所以需要先分组
        //carDS.keyBy(car->car.getSensorId())
        KeyedStream<CartInfo, String> keyedDS = carDS.keyBy(CartInfo::getSensorId);

        //需求:设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算(前提是上一个窗口得有数据!)
        SingleOutputStreamOperator<CartInfo> result =
                keyedDS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
                .sum("count");

        //TODO 3.sink
        result.print();

        /**
         * 1,1
         * 1,1
         * 2,1
         * 2,1
         */

        //TODO 4.execute
        env.execute();
    

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CartInfo 
        private String sensorId;   //信号灯id
        private Integer count;     //通过该信号灯的车的数量
    


toto@HIH-L-9511:~$ nc -lk 9999
1,1
1,2
2,3
4,1
4,2
3,5

输出结果:
4> WindowDemo_5.CartInfo(sensorId=1, count=3)
2> WindowDemo_5.CartInfo(sensorId=2, count=3)
1> WindowDemo_5.CartInfo(sensorId=4, count=3)
3> WindowDemo_5.CartInfo(sensorId=3, count=5)

以上是关于11.Flink四大基石Window窗口的分类Flink提供了很多各种场景用的WindowAssigner基于时间的滚动和滑动基于时间的滚动和滑动窗口基于数量的滚动和滑动的主要内容,如果未能解决你的问题,请参考以下文章

Flink1.12-四大基石详解

11-flink-1.10.1- Flink window API

11-flink-1.10.1- Flink window API

从0到1Flink的成长之路(十七)-高级特性(Flink四大基石)

day04_Flink高级API

Flink核心篇,四大基石容错机制广播反压序列化内存管理资源管理...