大数据(9e)图解Flink窗口
Posted 小基基o_O
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据(9e)图解Flink窗口相关的知识,希望对你有一定的参考价值。
文章目录
1、代码模板
本地开发环境:WIN10+IDEA
只改##################### 业务逻辑 #####################
之间的代码
1.1、pom.xml
<!-- 配置 -->
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.14.6</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>2.0.3</slf4j.version>
<log4j.version>2.17.2</log4j.version>
</properties>
<!-- https://mvnrepository.com/ -->
<dependencies>
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<!-- 日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>$slf4j.version</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>$slf4j.version</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>$log4j.version</version>
</dependency>
</dependencies>
1.2、log4j.properties
log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
1.3、Java模板
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Scanner;
public class Hello
public static void main(String[] args) throws Exception
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(1);
//加入自定义数据源
DataStreamSource<String> dss = env.addSource(new MySource());
//################################### 业务逻辑 ########################################
dss.print();
//################################### 业务逻辑 ########################################
env.execute();
public static class MySource implements SourceFunction<String>
public MySource()
@Override
public void run(SourceContext<String> sc)
Scanner scanner = new Scanner(System.in);
while (true)
String str = scanner.nextLine().trim();
if (str.equals("STOP")) break;
if (!str.equals("")) sc.collect(str);
scanner.close();
@Override
public void cancel()
2、按键分区(Keyed)、非按键分区(Non-Keyed)
- 按键分区窗口 可在
keyBy
后执行 - 非按键分区窗口 流的并行度=1
语法示例 | Keyed | Non-Keyed |
---|---|---|
基于时间的窗口 | .keyBy(...).window(...) | .windowAll(...) |
基于事件个数的窗口 | .keyBy(...).countWindow(...) | .countWindowAll(...) |
3、窗口的分类
- 将 无界限的 数据 切分为 有界限的 数据
- https://yellow520.blog.csdn.net/article/details/121288240
3.1、基于时间的窗口
基于时间的滑动窗口
.window(SlidingProcessingTimeWindows.of(Time.seconds(6),Time.seconds(3)))
基于时间的滚动窗口
.window(TumblingProcessingTimeWindows.of(Time.seconds(3)))
基于时间的会话窗口
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(2)))
基于时间的全局窗口
.window(GlobalWindows.create())
3.2、基于事件个数的窗口
基于事件个数的滑动窗口
.countWindow(4,3)
基于事件个数的滚动窗口
.countWindow(4)
4、窗口函数
窗口函数 | 窗口关闭时,窗口函数就去处理窗口中的每个元素 |
---|---|
ReduceFunction | 增量处理,高效 |
AggregateFunction | 增量处理,高效 |
ProcessWindowFunction | 函数执行前要在内部缓存窗口上所有的元素,低效 |
修改代码模板中
##################### 业务逻辑 #####################
之间的代码
4.1、ReduceFunction
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
dss.keyBy(s -> s)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce((ReduceFunction<String>) (v1, v2) -> v1 + "," + v2)
.print("输出");
基于时间的滚动窗口
4.2、AggregateFunction
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
dss.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
//AggregateFunction<IN, ACC, OUT>
.aggregate(new AggregateFunction<String, Long, Long>()
//创建累加器
@Override
public Long createAccumulator() return 0L;
//累加
@Override
public Long add(String in, Long acc) return acc + 1L;
//从累加器获取结果
@Override
public Long getResult(Long acc) return acc;
//合并累加器
@Override
public Long merge(Long a1, Long a2) return a1 + a2;
)
.print("输出");
基于时间的滑动窗口
4.3、ProcessWindowFunction
源码截取
abstract class ProcessAllWindowFunction<IN, OUT, W extends Window>
abstract void process(
ProcessAllWindowFunction<IN, OUT, W>.Context var1, //上下文对象
Iterable<IN> var2, //窗口内的所有输入
Collector<OUT> var3 //收集器
);
代码
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
dss.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.process(new ProcessAllWindowFunction<String, String, TimeWindow>()
@Override
public void process(Context context, Iterable<String> in, Collector<String> out)
//打印窗口范围
System.out.println(context.window().toString());
//在窗口内,收集元素
out.collect(String.valueOf(in));
)
.print("输出");
测试运行截图
以上是关于大数据(9e)图解Flink窗口的主要内容,如果未能解决你的问题,请参考以下文章