Flink的窗口聚合操作(TimeCount Window)
Posted 月疯
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink的窗口聚合操作(TimeCount Window)相关的知识,希望对你有一定的参考价值。
窗口基本概念:Flink中的窗口是左闭右开的窗口
Flink认为批处理是流处理的一个特例,而窗口window就是从流处理到批处理的一个桥梁,通常来讲窗口就是用来将无线数据流转换为优先数据集,从而在优先数据集上进行操作的一种机制,在flink当中支持基于无限大(永久)窗口的集合操作以及特定窗口的集合操作。
在flink当中支持的窗口聚合主要分为2种:window Aggregate和Over Aggregate,window Aggregate只有当窗口结束时才会输出结果,window aggregate从整体上分为3种类型:Time Window、Count Cindow 和自定义Window,其中每一种Window从粒度上又细分为滚动窗口(tumbling windows)、滑动窗口(sliding windows)和Session回话窗口。
无界窗口案列说明:
package Flink_Window;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
//flink原生支持的无界和操作
public class SocketInfiniteWindow
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> streamSource=env.socketTextStream("192.168.208.112",8821,"\\n");
DataStream<Tuple2<String,Integer>> windowCounts = streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>()
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception
String[] split=s.split("\\\\W+");
for(String word:split)
collector.collect(Tuple2.of(word,1));
);
//原来基础上,不断进行聚合操作
//进行聚合keyby,没有time,这就是Flink原生操作的无界窗口操作
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = windowCounts.keyBy(0).sum(1);
sum.print();
env.execute("SocketInfiniteWindow");
有界窗口:
1、Time Window使用
Time Window是根据时间对数据流进行分组的,它支持滚动窗口(tumbling windows)、滑动窗口(sliding windows)和Session回话窗口。
2种使用方式:
timeWindow(Time.minutes(1)):表示滚动窗口的大小是1min,对每一分钟内的数据进行聚合计算,即每隔1min计算一下最近内的数据;timeWindow(Time.minutes(1).Time.seconds(30)):表示滑动窗口的大小为1min,滑动间隔30秒,即每隔30秒统计最近1min内的数据。
案列:
package Flink_Window;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
//flink原生支持的无界和操作
public class SocketInfiniteWindow
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> streamSource=env.socketTextStream("192.168.208.112",8821,"\\n");
DataStream<Tuple2<String,Integer>> windowCounts = streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>()
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception
String[] split=s.split("\\\\W+");
for(String word:split)
collector.collect(Tuple2.of(word,1));
);
//原来基础上,不断进行聚合操作
//进行聚合keyby,没有time,这就是Flink原生操作的无界窗口操作
// SingleOutputStreamOperator<Tuple2<String, Integer>> sum = windowCounts.keyBy(0).sum(1);
// sum.print();
//有界滚动窗口(30秒计算一次)
// SingleOutputStreamOperator<Tuple2<String, Integer>> sum = windowCounts.keyBy(0).timeWindow(Time.seconds(30)).sum(1);
// sum.print();
//有界滑动窗口(30秒计算一次)
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = windowCounts.keyBy(0).timeWindow(Time.seconds(30),Time.seconds(2)).sum(1);
sum.print();
env.execute("SocketInfiniteWindow");
2、Count window使用
Count Window是根据元素的个数对数据流进行分组的,它支持滚动窗口(tumbling windows)、滑动窗口(sliding windows) 和Session回话窗口。
2种使用方式:
countWindow(100):表示滚动窗口的大小是100个元素,当窗口中填满100个元素的时候,就会对窗口进行计算,即没间隔100个元素计算一次countWindow(100,10):表示滑动窗口的大小是100个元素,滑动间隔为10个元素,也就是说每新增10各元素就会对前面100个元素计算一次。
案例:
package Flink_Window;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
//flink原生支持的无界和操作
public class SocketInfiniteWindow
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> streamSource=env.socketTextStream("192.168.208.112",8821,"\\n");
DataStream<Tuple2<String,Integer>> windowCounts = streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>()
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception
String[] split=s.split("\\\\W+");
for(String word:split)
collector.collect(Tuple2.of(word,1));
);
//原来基础上,不断进行聚合操作
//进行聚合keyby,没有time,这就是Flink原生操作的无界窗口操作
// SingleOutputStreamOperator<Tuple2<String, Integer>> sum = windowCounts.keyBy(0).sum(1);
// sum.print();
//有界滚动窗口(30秒计算一次)
// SingleOutputStreamOperator<Tuple2<String, Integer>> sum = windowCounts.keyBy(0).countWindow(5).sum(1);
// sum.print();
//有界滑动窗口(30秒计算一次)
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = windowCounts.keyBy(0).countWindow(5,2).sum(1);
sum.print();
env.execute("SocketInfiniteWindow");
3、自定义window
可以分为2种:一种是基于key的window,一种是不基于key的window,2种使用方式:
.keyBy.window():属于基于key的window,会先对窗口中的数据进行分组,然后在计算;
.windowAll():属于不基于key的window,会对窗口中的所有数据进行聚合操作;
其实:有keyBy就是window,没有keyBy就是windowALl。
Time window和Count Window的源码,这俩个window本质上就是针对window的封装实现。
滚动窗口:
滚动窗口(TUMBLE)将每个袁旭分配到一个指定大小的窗口中,通常滚动窗口有一个固定的大小,窗口之间不会出现重叠;滑动窗口(HOP)不同于滚动窗口,窗口之间可以休闲重叠,对于滑动窗口来说,有俩个重要的参数:slide和size,size为窗口的大小,slide为每次窗口滑动的步长。
滚动窗口(tumbling windows):表示窗口内的数据没有重叠
Session会话;
案列分析:
以上是关于Flink的窗口聚合操作(TimeCount Window)的主要内容,如果未能解决你的问题,请参考以下文章