从0到1Flink的成长之路(二十)-案例:时间会话窗口
Posted 熊老二-
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从0到1Flink的成长之路(二十)-案例:时间会话窗口相关的知识,希望对你有一定的参考价值。
案例:时间会话窗口
1 需求
设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算
2 代码实现
窗口统计案例演示:时间会话窗口(Time Session Window),数字累加求和统计
package xx.xxxxxx.flink.window.session;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* 窗口统计案例演示:时间会话窗口(Time Session Window),数字累加求和统计
*/
public class StreamSessionWindow {
public static void main(String[] args) throws Exception {
// 1. 执行环境-env:流计算执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1) ;
// 2. 数据源-source:Socket接收数据
DataStreamSource<String> inputStream = env.socketTextStream("node1.itcast.cn", 9999);
// 3. 转换处理-transformation:调用DataStream函数,处理数据
SingleOutputStreamOperator<Integer> numberStream = inputStream
// a. 过滤数据
.filter(line -> null != line && line.trim().length() > 0)
// b. 转换整数
.map(line -> Integer.parseInt(line.trim()));
// TODO:设置会话超时时间为5秒,5秒内没有数据到来,则触发上个窗口计算
SingleOutputStreamOperator<Integer> sumStream = numberStream
// 设置会话超时时间为5秒
.windowAll(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
.sum(0);
// 4. 数据终端-sink
sumStream.printToErr("sum");
// 5. 触发执行-execute
env.execute(StreamSessionWindow.class.getSimpleName()) ; } }
以上是关于从0到1Flink的成长之路(二十)-案例:时间会话窗口的主要内容,如果未能解决你的问题,请参考以下文章
从0到1Flink的成长之路(二十)-Flink 高级特性之 Flink 容错机制
从0到1Flink的成长之路(二十)-Flink 高级特性之有状态计算场景
从0到1Flink的成长之路(二十)-Flink 高级特性之Checkpoint 配置方式