大数据(9e)图解Flink窗口

Posted 小基基o_O

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据(9e)图解Flink窗口相关的知识,希望对你有一定的参考价值。

文章目录

1、代码模板

本地开发环境:WIN10+IDEA
只改##################### 业务逻辑 #####################之间的代码

1.1、pom.xml

<!-- 配置 -->
<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <flink.version>1.14.6</flink.version>
    <scala.binary.version>2.12</scala.binary.version>
    <slf4j.version>2.0.3</slf4j.version>
    <log4j.version>2.17.2</log4j.version>
</properties>
<!-- https://mvnrepository.com/ -->
<dependencies>
    <!-- Flink -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>$flink.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_$scala.binary.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_$scala.binary.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime-web_$scala.binary.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <!-- 日志 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>$slf4j.version</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>$slf4j.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-to-slf4j</artifactId>
        <version>$log4j.version</version>
    </dependency>
</dependencies>

1.2、log4j.properties

log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

1.3、Java模板

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Scanner;

public class Hello 
    public static void main(String[] args) throws Exception 
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度
        env.setParallelism(1);
        //加入自定义数据源
        DataStreamSource<String> dss = env.addSource(new MySource());
        //################################### 业务逻辑 ########################################
        dss.print();
        //################################### 业务逻辑 ########################################
        env.execute();
    

    public static class MySource implements SourceFunction<String> 
        public MySource() 

        @Override
        public void run(SourceContext<String> sc) 
            Scanner scanner = new Scanner(System.in);
            while (true) 
                String str = scanner.nextLine().trim();
                if (str.equals("STOP")) break;
                if (!str.equals("")) sc.collect(str);
            
            scanner.close();
        

        @Override
        public void cancel() 
    

2、按键分区(Keyed)、非按键分区(Non-Keyed)

Non-Keyed的窗口的流的并行度=1

2.1、Keyed

基于时间的窗口

.keyBy(...)
.window(...)

基于事件个数的窗口

.keyBy(...)
.countWindow(...)

2.2、Non-Keyed

基于时间的窗口

.windowAll(...)

基于事件个数的窗口

.countWindowAll(...)

3、窗口的分类

  • 将 无界限的 数据 切分为 有界限的 数据
  • https://yellow520.blog.csdn.net/article/details/121288240

3.1、基于时间的窗口

基于时间滑动窗口

.window(SlidingProcessingTimeWindows.of(Time.seconds(6),Time.seconds(3)))

基于时间滚动窗口

.window(TumblingProcessingTimeWindows.of(Time.seconds(3)))

基于时间会话窗口

.window(ProcessingTimeSessionWindows.withGap(Time.seconds(2)))

基于时间的全局窗口

.window(GlobalWindows.create())

3.2、基于事件个数的窗口

基于事件个数滑动窗口

.countWindow(4,3)

基于事件个数滚动窗口

.countWindow(4)

4、窗口函数

窗口函数窗口关闭时,窗口函数就去处理窗口中的每个元素
ReduceFunction增量处理,高效
AggregateFunction增量处理,高效
ProcessWindowFunction函数执行前要在内部缓存窗口上所有的元素,低效

5、示例代码

修改代码模板中##################### 业务逻辑 #####################之间的代码

5.1、ReduceFunction

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
dss.keyBy(s -> s)
   .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
   .reduce((ReduceFunction<String>) (v1, v2) -> v1 + "," + v2)
   .print("输出");

基于时间滚动窗口

5.2、AggregateFunction

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
dss.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
   //AggregateFunction<IN, ACC, OUT>
   .aggregate(new AggregateFunction<String, Long, Long>() 
       //创建累加器
       @Override
       public Long createAccumulator() return 0L;
       //累加
       @Override
       public Long add(String in, Long acc) return acc + 1L;
       //从累加器获取结果
       @Override
       public Long getResult(Long acc) return acc;
       //合并累加器
       @Override
       public Long merge(Long a1, Long a2) return a1 + a2;
   )
   .print("输出");

基于时间滑动窗口

5.3、ProcessWindowFunction

源码截取

abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> 
    abstract void process(
        ProcessAllWindowFunction<IN, OUT, W>.Context var1,  //上下文对象
        Iterable<IN> var2,                                  //窗口内的所有输入
        Collector<OUT> var3                                 //收集器
    );

代码

import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
dss.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
   .process(new ProcessAllWindowFunction<String, String, TimeWindow>() 
       @Override
       public void process(Context context, Iterable<String> in, Collector<String> out) 
           //打印窗口范围
           System.out.println(context.window().toString());
           //在窗口内,收集元素
           out.collect(String.valueOf(in));
       
   )
   .print("输出");

测试运行截图

以上是关于大数据(9e)图解Flink窗口的主要内容,如果未能解决你的问题,请参考以下文章

大数据(9e)图解Flink窗口

大数据(9e)图解Flink窗口

大数据(9e)图解Flink窗口

大数据(9e)Flink侧输出流

大数据(9e)Flink侧输出流

大数据(9e)Flink定时器