从0到1Flink的成长之路(十八)-Window API

Posted 熊老二-

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从0到1Flink的成长之路(十八)-Window API相关的知识,希望对你有一定的参考价值。

Window API

1 window和windowAll

使用keyBy的流:Keyed Stream,应该使用window方法
在这里插入图片描述
未使用keyBy的流:Non-Keyed Stream,应该调用windowAll方法
在这里插入图片描述
修改词频统计WordCount程序,演示案例如下:

package xx.xxxxx.flink.start;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* 基于 Flink 流计算实时词频统计WordCount, 进行窗口Window内数据统计
*/
public class WindowWordCount {
public static void main(String[] args) throws Exception {
// 1. 执行环境-env:流计算执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1) ;
// 2. 数据源-source:Socket接收数据
DataStreamSource<String> inputStream = env.socketTextStream("node1.itcast.cn", 9999);
// 3. 转换处理-transformation:调用DataSet函数,处理数据
SingleOutputStreamOperator<Tuple2<String, Integer>> tupleStream = inputStream
// a. 过滤数据
.filter(new FilterFunction<String>() {
@Override
public boolean filter(String line) throws Exception {
return null != line && line.trim().length() > 0;
}
})
// b. 分割单词
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
String[] words = line.trim().toLowerCase().split("\\\\W+");
for (String word : words) {
out.collect(word);
}
}
})
// c. 转换二元组,表示每个单词出现一次
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1);
}
});
// TODO: 针对KeyedDataStream进行时间窗口聚合操作,先分组,后窗口,再聚合
SingleOutputStreamOperator<Tuple2<String, Integer>> windowCountStream = tupleStream
// 按照单词分组,下标索引为0
.keyBy(0)
// TODO: 设置时间窗口大小为3秒
.window(TumblingProcessingTimeWindows.of(Time.seconds(3))) //.timeWindow(Time.seconds(3))
.sum(1);
windowCountStream.print("keyByWindow");
// TODO: 针对非KeyedDataStream进行时间窗口聚合操作,先窗口,再聚合
SingleOutputStreamOperator<Tuple2<String, Integer>> windowStream = tupleStream
// TODO: 设置时间窗口大小为3秒
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(3))) //.timeWindowAll(Time.seconds(3))
.sum(1);
windowStream.printToErr("windowAll");
// 5. 触发执行-execute
env.execute(WindowWordCount.class.getSimpleName()) ;
}
}

2 WindowAssigner
window/windowAll 方法接收的输入是一个 WindowAssigner, WindowAssigner 负责将每
条输入的数据分发到正确的 window 中,Flink提供了很多各种场景用的WindowAssigner:
在这里插入图片描述
3 Evictor(了解)
evictor 主要用于做一些数据的自定义操作,可以在执行用户代码之前,也可以在执行用户代
码之后,更详细的描述可以参考org.apache.flink.streaming.api.windowing.evictors.Evictor 的
evicBefore 和 evicAfter两个方法。
Flink 提供了如下三种通用的 evictor:
CountEvictor:保留指定数量的元素;
TimeEvictor:设定一个阈值 interval,删除所有不再 max_ts - interval 范围内的元素,其中
max_ts 是窗口内时间戳的最大值;
DeltaEvictor:执行用户给定的DeltaFunction及预设的theshold,判断是否删除一个元素;
4 Trigger(了解)
trigger 用来判断一个窗口是否需要被触发,每个 WindowAssigner 都自带一个默认
trigger,如果默认的 trigger 不能满足需求,则可以自定义一个类,继承自Trigger 即可,详细
描述下Trigger 的接口以及含义:

onElement():每次往 window 增加一个元素的时候都会触发;
onEventTime():当 event-time timer 被触发的时候会调用;
onProcessingTime():当 processing-time timer 被触发的时候会调用;
onMerge():对两个 trigger 的 state 进行 merge 操作;
clear():window 销毁的时候被调用;

上面的接口中前三个会返回一个 TriggerResult,TriggerResult有如下几种可能的选择:

CONTINUE 不做任何事情
FIRE 触发 window
PURGE 清空整个 window 的元素并销毁窗口
FIRE_AND_PURGE 触发窗口,然后销毁窗口

5 API 调用示例
在这里插入图片描述
source.keyBy(0).window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
在这里插入图片描述
source.keyBy(0).timeWindow(Time.seconds(5))
在这里插入图片描述

以上是关于从0到1Flink的成长之路(十八)-Window API的主要内容,如果未能解决你的问题,请参考以下文章

从0到1Flink的成长之路(二十)-案例:时间会话窗口

从0到1Flink的成长之路(十七)-高级特性(Flink四大基石)

从0到1Flink的成长之路

从0 到1Flink的成长之路

从0到1Flink的成长之路

从0到1Flink的成长之路