Flink 滚动窗口滑动窗口详解

Posted 响彻天堂丶

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 滚动窗口滑动窗口详解相关的知识,希望对你有一定的参考价值。

1 滚动窗口(Tumbling Windows)

滚动窗口有固定的大小,是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。如果我们把多个窗口的创建,看作一个窗口的运动,那就好像它在不停地向前“翻滚”一样。这是最简单的窗口形式,我们之前所举的例子都是滚动窗口。也正是因为滚动窗口是“无缝衔接”,所以每个数据都会被分配到一个窗口,而且只会属于一个窗口。
滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有一个,就是窗口的大小(window size)。比如我们可以定义一个长度为 1 小时的滚动时间窗口,那么每个小时就会进行一次统计;或者定义一个长度为 10 的滚动计数窗口,就会每 10 个数进行一次统计。

2 滑动窗口(Sliding Windows)

与滚动窗口类似,滑动窗口的大小也是固定的。区别在于,窗口之间并不是首尾相接的,而是可以“错开”一定的位置。如果看作一个窗口的运动,那么就像是向前小步“滑动”一样。

既然是向前滑动,那么每一步滑多远,就也是可以控制的。所以定义滑动窗口的参数有两个:除去窗口大小(window size)之外,还有一个“滑动步长”(window slide),它其实就代
表了窗口计算的频率。滑动的距离代表了下个窗口开始的时间间隔,而窗口大小是固定的,所以也就是两个窗口结束时间的间隔;窗口在结束时间触发计算输出结果,那么滑动步长就代表
了计算频率。例如,我们定义一个长度为 1 小时、滑动步长为 5 分钟的滑动窗口,那么就会统计 1 小时内的数据,每 5 分钟统计一次。同样,滑动窗口可以基于时间定义,也可以基于数据
个数定义。


我们可以看到,当滑动步长小于窗口大小时,滑动窗口就会出现重叠,这时数据也可能会被同时分配到多个窗口中。而具体的个数,就由窗口大小和滑动步长的比值(size/slide)来决
定。如图 6-18 所示,滑动步长刚好是窗口大小的一半,那么每个数据都会被分配到 2 个窗口里。比如我们定义的窗口长度为 1 小时、滑动步长为 30 分钟,那么对于 8 点 55 分的数据,应该同时属于[8 点, 9 点)和[8 点半, 9 点半)两个窗口;而对于 8 点 10 分的数据,则同时属于[8点, 9 点)和[7 点半, 8 点半)两个窗口。所以,滑动窗口其实是固定大小窗口的更广义的一种形式;换句话说,滚动窗口也可以看作是一种特殊的滑动窗口——窗口大小等于滑动步长(size = slide)。当然,我们也可以定义滑动步长大于窗口大小,这样的话就会出现窗口不重叠、但会有间隔的情况;这时有些数据不
属于任何一个窗口,就会出现遗漏统计。所以一般情况下,我们会让滑动步长小于窗口大小,并尽量设置为整数倍的关系。

在一些场景中,可能需要统计最近一段时间内的指标,而结果的输出频率要求又很高,甚至要求实时更新,比如股票价格的 24 小时涨跌幅统计,或者基于一段时间内行为检测的异常报警。这时滑动窗口无疑就是很好的实现方式。

3 窗口API

3.1 按键分区窗口(Keyed Windows)

经过按键分区 keyBy 操作后,数据流会按照 key 被分为多条逻辑流(logical streams),这就是 KeyedStream。基于 KeyedStream 进行窗口操作时, 窗口计算会在多个并行子任务上同时
执行。相同 key 的数据会被发送到同一个并行子任务,而窗口操作会基于每个 key 进行单独的处理。所以可以认为,每个 key 上都定义了一组窗口,各自独立地进行统计计算。

在代码实现上,我们需要先对 DataStream 调用.keyBy()进行按键分区,然后再调用.window()定义窗口。

stream.keyBy(...) .window(...)

3.2 非按键分区(Non-Keyed Windows)

如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了 1。所以在实际应用中一般不推荐使用这种方式。

在代码中,直接基于 DataStream 调用.windowAll()定义窗口。

stream.windowAll(...)

3.3 代码中窗口 API 的调用

窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。

stream.keyBy(<key selector>) .window(<window assigner>) .aggregate(<window function>)

其中.window()方法需要传入一个窗口分配器,它指明了窗口的类型;而后面的.aggregate()方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。窗口分配器有各种形式,而窗口函数的调用方法也不只.aggregate()一种,另外,在实际应用中,一般都需要并行执行任务,非按键分区很少用到,所以我们之后都以按键分区窗口为例;如果想要实现非按键分区窗口,只要前面不做 keyBy,后面调用.window()时直接换成.windowAll()就可以了。

3.4 滚动处理时间窗口

窗口分配器由类 TumblingProcessingTimeWindows 提供,需要调用它的静态方法.of()。

stream.keyBy(...).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).aggregate(...)

这里.of()方法需要传入一个 Time 类型的参数 size,表示滚动窗口的大小,我们这里创建了一个长度为 5 秒的滚动窗口。另外,.of()还有一个重载方法,可以传入两个 Time 类型的参数:size 和 offset。第一个参数当然还是窗口大小,第二个参数则表示窗口起始点的偏移量,用这个偏移量可以处理时区。

例如:我们所在的时区是东八区,也就是 UTC+8,跟 UTC 有 8小时的时差。我们定义 1 天滚动窗口时,如果用默认的起始点,那么得到就是伦敦时间每天 0点开启窗口,这时是北京时间早上 8 点。那怎样得到北京时间每天 0 点开启的滚动窗口呢?只要设置-8 小时的偏移量就可以了。

.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))

3.5 滑动处理时间窗口

窗口分配器由类 SlidingProcessingTimeWindows 提供,同样需要调用它的静态方法.of()。

这里.of()方法需要传入两个 Time 类型的参数:size 和 slide,前者表示滑动窗口的大小,后者表示滑动窗口的滑动步长。我们这里创建了一个长度为 10 秒、滑动步长为 5 秒的滑动窗口。
滑动窗口同样可以追加第三个参数,用于指定窗口起始点的偏移量,用法与滚动窗口完全一致。

stream.keyBy(...).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).aggregate(...)

4 窗口函数

定义了窗口分配器,我们只是知道了数据属于哪个窗口,可以将数据收集起来了;至于收集起来到底要做什么,其实还完全没有头绪。所以在窗口分配器之后,必须再接上一个定义窗
口如何进行计算的操作,这就是所谓的“窗口函数”(window functions)。经窗口分配器处理之后,数据可以分配到对应的窗口中,而数据流经过转换得到的数据类型是 WindowedStream。这个类型并不是 DataStream,所以并不能直接进行其他转换,而必须进一步调用窗口函数,对收集到的数据进行处理计算之后,才能最终再次得到 DataStream。

4.1 增量聚合函数(incremental aggregation functions)

4.1.1 归约函数(ReduceFunction)

将窗口中收集到的数据两两进行归约。当我们进行流处理时,就是要保存一个状态;每来一个新的数据,就和之前的聚合状态做归约,这样就实现了增量式的聚合。

统计每一小时用户的访问量:

package com.rosh.flink.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@AllArgsConstructor
@NoArgsConstructor
@Data
public class UserPojo 

    private Integer userId;

    private String name;

    private String uri;

    private Long timestamp;


package com.rosh.flink.wartermark;

import com.rosh.flink.pojo.UserPojo;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

public class WindowTS 

    public static void main(String[] args) throws Exception 

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<UserPojo> dataDS = env.fromCollection(getUserLists());

        //生成有序水位线
        SingleOutputStreamOperator<UserPojo> orderStreamDS = dataDS.assignTimestampsAndWatermarks(WatermarkStrategy.<UserPojo>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<UserPojo>() 
                    @Override
                    public long extractTimestamp(UserPojo element, long recordTimestamp) 
                        return element.getTimestamp();
                    
                ));

        //聚合
        SingleOutputStreamOperator<Tuple2<Integer, Long>> userDS = orderStreamDS.map(new MapFunction<UserPojo, Tuple2<Integer, Long>>() 
            @Override
            public Tuple2<Integer, Long> map(UserPojo value) throws Exception 
                return Tuple2.of(value.getUserId(), 1L);
            
        );

        //开窗统计每1小时用户访问了多少次
        SingleOutputStreamOperator<Tuple2<Integer, Long>> resultDS = userDS.keyBy(tuple -> tuple.f0)
                .window(TumblingEventTimeWindows.of(Time.hours(1)))
                .reduce(new ReduceFunction<Tuple2<Integer, Long>>() 
                    @Override
                    public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception 
                        value1.f1 = value1.f1 + value2.f1;
                        return value1;
                    
                );

        resultDS.print();

        env.execute("WarterMarkTest");

    

    private static List<UserPojo> getUserLists() throws NoSuchAlgorithmException 
        List<UserPojo> lists = new ArrayList<>();
        Random random = SecureRandom.getInstanceStrong();
        for (int i = 1; i <= 1000; i++) 
            String uri = "/goods/" + i;
            int userId = random.nextInt(10);
            //有序时间
            UserPojo userPojo = new UserPojo(userId, "name" + userId, uri, (long) (1000 * i));
            //无序时间
            lists.add(userPojo);
        
        return lists;
    


4.1.2 聚合函数(AggregateFunction)

ReduceFunction 可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。这就迫使我们必须在聚合前,先将数
据转换(map)成预期结果类型;而在有些情况下,还需要对状态进行进一步处理才能得到输出结果,这时它们的类型可能不同,使用 ReduceFunction 就会非常麻烦。

例如,如果我们希望计算一组数据的平均值,应该怎样做聚合呢?很明显,这时我们需要计算两个状态量:数据的总和(sum),以及数据的个数(count),而最终输出结果是两者的商(sum/count)。如果用 ReduceFunction,那么我们应该先把数据转换成二元组(sum, count)的形式,然后进行归约聚合,最后再将元组的两个元素相除转换得到最后的平均值。本来应该只是一个任务,可我们却需要 map-reduce-map 三步操作,这显然不够高效。

于是自然可以想到,如果取消类型一致的限制,让输入数据、中间状态、输出结果三者类型都可以不同,不就可以一步直接搞定了吗?Flink 的 Window API 中的 aggregate 就提供了这样的操作。直接基于 WindowedStream 调用.aggregate()方法,就可以定义更加灵活的窗口聚合操作。这个方法需要传入一个AggregateFunction 的实现类作为参数。AggregateFunction 在源码中的定义如下:


/**
 *
 *  The type of the values that are aggregated (input values)
 *	The type of the accumulator (intermediate aggregate state).
 *	The type of the aggregated result
 *
 */
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable 

	   /**
      * 创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
			*/		
		 ACC createAccumulator();

     /**
	    * 将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进一步聚合的过程。方法传入两个参数:当前新到的数据 value,和当前的累加器accumulator;
		  * 返回一个新的累加器值,也就是对聚合状态进行更新。每条数据到来之后都会调用这个方法。
	    */
		 ACC add(IN value, ACC accumulator);

		
		/**
	   * 从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态,然后再基于这些聚合的状态计算出一个结果进行输出。比如之前我们提到的计算平均
	   * 值,就可以把 sum 和 count 作为状态放入累加器,而在调用这个方法时相除得到最终结果。这个方法只在窗口要输出结果时调用。
	   */
		 OUT getResult(ACC accumulator);

		/**
	   * 合并两个累加器,并将合并后的状态作为一个累加器返回。这个方法只在需要合并窗口的场景下才会被调用;最常见的合并窗口(Merging Window)的场景
	   * 就是会话窗口(Session Windows)。
	   */
		 ACC merge(ACC a, ACC b);

所以可以看到,AggregateFunction 的工作原理是:首先调用 createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次 add()方法,对数据进行聚合,得到的
结果保存在状态中;等到了窗口需要输出时,再调用 getResult()方法得到计算结果。很明显,与 ReduceFunction 相同,AggregateFunction 也是增量式的聚合;而由于输入、中间状态、输
出的类型可以不同,使得应用更加灵活方便。

·统计人均访问次数:

package com.rosh.flink.wartermark;

import com.rosh.flink.pojo.UserPojo;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.*;

public class AggWindowTest 

    public static void main(String[] args) throws Exception 

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<UserPojo> userDS = env.fromCollection(getUserLists()).assignTimestampsAndWatermarks(
                WatermarkStrategy.<UserPojo>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<UserPojo>() 
                            @Override
                            public long extractTimestamp(UserPojo element, long recordTimestamp) 
                                return element.getTimestamp();
                            
                        )
        );

        //统计5秒内,人均访问次数
        SingleOutputStreamOperator<Double> resultDS = userDS.keyBy(key -> true)
                .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .aggregate(new PeopleHourAvgCount());

        resultDS.print("人均访问次数为:");

        env.execute("AggWindowTest");
    

    private static class PeopleHourAvgCount implements AggregateFunction<UserPojo, Tuple2<HashSet<Integer>, Long>, Double> 

        /**
         * 初始化累加器
         */
        @Override
        public Tuple2<HashSet<Integer>, Long> createAccumulator() 
            return Tuple2.of(new HashSet<>(), 0L);
        

        /**
         *
         */
        @Override
        public Tuple2<HashSet<Integer>, Long> add(UserPojo value, Tuple2<HashSet<Integer>, Long> accumulator) 

            //distinct userId
            accumulator.f0.add(value.getUserId());
            //次数+1
            accumulator.f1 = accumulator.f1 + 1;

            //返回累加器
            return accumulator;
        

        @Override
        public Double getResult(Tuple2<HashSet<Integer>, Long> accumulator) 

            return accumulator.f1 * 1.0 / accumulator.f0.size();
        

        @Override
        public Tuple2<HashSet<Integer>, Long以上是关于Flink 滚动窗口滑动窗口详解的主要内容,如果未能解决你的问题,请参考以下文章

11.Flink四大基石Window窗口的分类Flink提供了很多各种场景用的WindowAssigner基于时间的滚动和滑动基于时间的滚动和滑动窗口基于数量的滚动和滑动

flink 滚动窗口滑动窗口会话窗口全局窗口

flink窗口的种类及详述

Flink之Watermark滑动窗口案例

Flink的窗口计算案例

Flink的窗口计算案例