Flink的窗口聚合操作(TimeCount Window)

Posted 月疯

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink的窗口聚合操作(TimeCount Window)相关的知识,希望对你有一定的参考价值。


窗口基本概念:Flink中的窗口是左闭右开的窗口

Flink认为批处理是流处理的一个特例,而窗口window就是从流处理到批处理的一个桥梁,通常来讲窗口就是用来将无线数据流转换为优先数据集,从而在优先数据集上进行操作的一种机制,在flink当中支持基于无限大(永久)窗口的集合操作以及特定窗口的集合操作。

在flink当中支持的窗口聚合主要分为2种:window Aggregate和Over Aggregate,window Aggregate只有当窗口结束时才会输出结果,window aggregate从整体上分为3种类型:Time Window、Count Cindow 和自定义Window,其中每一种Window从粒度上又细分为滚动窗口(tumbling windows)、滑动窗口(sliding windows)和Session回话窗口。 

无界窗口案列说明:

package Flink_Window;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

//flink原生支持的无界和操作
public class SocketInfiniteWindow 
    public static void main(String[] args) throws Exception 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> streamSource=env.socketTextStream("192.168.208.112",8821,"\\n");
        DataStream<Tuple2<String,Integer>> windowCounts = streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() 
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception 

                String[] split=s.split("\\\\W+");
                for(String word:split)
                    collector.collect(Tuple2.of(word,1));
                
            
        );
        //原来基础上,不断进行聚合操作
        //进行聚合keyby,没有time,这就是Flink原生操作的无界窗口操作
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = windowCounts.keyBy(0).sum(1);
        sum.print();
        env.execute("SocketInfiniteWindow");
    

有界窗口:

1、Time Window使用
 Time Window是根据时间对数据流进行分组的,它支持滚动窗口(tumbling windows)、滑动窗口(sliding windows)和Session回话窗口。

2种使用方式:
timeWindow(Time.minutes(1)):表示滚动窗口的大小是1min,对每一分钟内的数据进行聚合计算,即每隔1min计算一下最近内的数据;

timeWindow(Time.minutes(1).Time.seconds(30)):表示滑动窗口的大小为1min,滑动间隔30秒,即每隔30秒统计最近1min内的数据。

案列:

package Flink_Window;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

//flink原生支持的无界和操作
public class SocketInfiniteWindow 
    public static void main(String[] args) throws Exception 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> streamSource=env.socketTextStream("192.168.208.112",8821,"\\n");
        DataStream<Tuple2<String,Integer>> windowCounts = streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() 
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception 

                String[] split=s.split("\\\\W+");
                for(String word:split)
                    collector.collect(Tuple2.of(word,1));
                
            
        );
        //原来基础上,不断进行聚合操作
        //进行聚合keyby,没有time,这就是Flink原生操作的无界窗口操作
//        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = windowCounts.keyBy(0).sum(1);
//        sum.print();
        //有界滚动窗口(30秒计算一次)
//        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = windowCounts.keyBy(0).timeWindow(Time.seconds(30)).sum(1);
//        sum.print();
        //有界滑动窗口(30秒计算一次)
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = windowCounts.keyBy(0).timeWindow(Time.seconds(30),Time.seconds(2)).sum(1);
        sum.print();
        env.execute("SocketInfiniteWindow");
    

2、Count window使用

Count Window是根据元素的个数对数据流进行分组的,它支持滚动窗口(tumbling windows)、滑动窗口(sliding windows) 和Session回话窗口。

2种使用方式:
countWindow(100):表示滚动窗口的大小是100个元素,当窗口中填满100个元素的时候,就会对窗口进行计算,即没间隔100个元素计算一次

countWindow(100,10):表示滑动窗口的大小是100个元素,滑动间隔为10个元素,也就是说每新增10各元素就会对前面100个元素计算一次。

案例:

package Flink_Window;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

//flink原生支持的无界和操作
public class SocketInfiniteWindow 
    public static void main(String[] args) throws Exception 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> streamSource=env.socketTextStream("192.168.208.112",8821,"\\n");
        DataStream<Tuple2<String,Integer>> windowCounts = streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() 
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception 

                String[] split=s.split("\\\\W+");
                for(String word:split)
                    collector.collect(Tuple2.of(word,1));
                
            
        );
        //原来基础上,不断进行聚合操作
        //进行聚合keyby,没有time,这就是Flink原生操作的无界窗口操作
//        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = windowCounts.keyBy(0).sum(1);
//        sum.print();
        //有界滚动窗口(30秒计算一次)
//        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = windowCounts.keyBy(0).countWindow(5).sum(1);
//        sum.print();
        //有界滑动窗口(30秒计算一次)
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = windowCounts.keyBy(0).countWindow(5,2).sum(1);
        sum.print();
        env.execute("SocketInfiniteWindow");
    

3、自定义window

可以分为2种:一种是基于key的window,一种是不基于key的window,2种使用方式:
.keyBy.window():属于基于key的window,会先对窗口中的数据进行分组,然后在计算;
.windowAll():属于不基于key的window,会对窗口中的所有数据进行聚合操作;
其实:有keyBy就是window,没有keyBy就是windowALl。

Time window和Count Window的源码,这俩个window本质上就是针对window的封装实现。 

滚动窗口:

滚动窗口(TUMBLE)将每个袁旭分配到一个指定大小的窗口中,通常滚动窗口有一个固定的大小,窗口之间不会出现重叠;滑动窗口(HOP)不同于滚动窗口,窗口之间可以休闲重叠,对于滑动窗口来说,有俩个重要的参数:slide和size,size为窗口的大小,slide为每次窗口滑动的步长。

滚动窗口(tumbling windows):表示窗口内的数据没有重叠

 Session会话;

案列分析:

以上是关于Flink的窗口聚合操作(TimeCount Window)的主要内容,如果未能解决你的问题,请参考以下文章

Flink窗口聚合案例(增量聚合全量聚合)

Flink流处理的时间窗口

Flink SQL --- 窗口聚合

Flink SQL--- Over Aggregation

Apache Flink:测试使用reduce增量聚合和windowAll操作

flink流计算随笔