Flink AggregateFunction窗口函数,执行步骤流程与实例

Posted 二十六画生的博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink AggregateFunction窗口函数,执行步骤流程与实例相关的知识,希望对你有一定的参考价值。

先keyby分组,使用计数窗口计算,实例: 

 
package operator;

import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.util.Collector;
// Tuple4<Long,Long,Long,Timestamp>
public class AggregateFunction_2 
    public static void main(String[] args)throws Exception 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<Tuple3<String, Integer, Timestamp>> dataStream = env.addSource(new SourceFunction<Tuple3<String, Integer, Timestamp>>() 
            boolean runing = true;
            @Override
            public void run(SourceContext<Tuple3<String, Integer, Timestamp>> ctx) throws Exception 
                //ctx.collect(new Tuple3("user" , 2, new Timestamp(new Date().getTime())));
                int i = 1;
                while (runing) 
                    Tuple3<String, Integer, Timestamp> t3;
                    Thread.sleep(1000);
                    if (i % 2 == 1)  //判断
                        t3 = new Tuple3("user" + 1, 1, new Timestamp(new Date().getTime()));
                     else 
                        t3 = new Tuple3("user" + i, i, new Timestamp(new Date().getTime()));
                    
                    //System.out.println("=======");
                    //System.out.println(t3);
                    i = i + 1;
                    ctx.collect(t3);
                    /* 返回
                    user1  1
                    user2  2
                    user1  1
                    user4  4
                    user1  1
                    user6  6
                     */
                
            
            @Override
            public void cancel() 
                runing = false;
            
        );
 
/*        DataStream dataStream = env.fromElements(
                Tuple3.of("1",333,new Timestamp(new Date().getTime())),
                Tuple3.of("2", 111,new Timestamp(new Date().getTime())),
                Tuple3.of("1",222,new Timestamp(new Date().getTime())),
                Tuple3.of("2",444,new Timestamp(new Date().getTime())),
                Tuple3.of("9",444,new Timestamp(new Date().getTime())),
                Tuple3.of("6", 555,new Timestamp(new Date().getTime())),
                Tuple3.of("1", 555,new Timestamp(new Date().getTime()))
                     )
                ;
        */
        //dataStream.print();
 
 
        //  输入类型IN    累加器类型ACC   输出 out
        DataStream  data_aggregate  =dataStream
               // .timeWindowAll(Time.seconds(2))\\
                .keyBy(0)     //分组
                .countWindow(2) //2个
               // .sum(1);
               .aggregate(new AggregateFunction<Tuple3<String, Integer, Timestamp>, Tuple3<String, Integer, Timestamp>, Tuple3<String, Integer, Timestamp>>() 
                               @Override
                               // 初始化列累加器 .创建一个新的累加器,启动一个新的聚合,负责迭代状态的初始化
                               //来一条数据.相应组内只有一条数据时候执行一次
                               //如果原先有一条,那么新进来一条时候,就不执行了。直接执行add getresult
                               //累加器有点像是中间传递的东西
                               //user1+user1 通过累加器就是 acc_1=acc(初始化)+第一个user,  acc=acc_1+第一个user1
                               //相加的结果都保留在累加器中。相当于一个寄存的地方
                               public Tuple3<String, Integer, Timestamp> createAccumulator() 
                                   System.out.println("------createAccumulator--------"+new Timestamp(new Date().getTime()));
                                   return new Tuple3<>("",0,new Timestamp(new Date().getTime()));
                               
                               //累加器的累加方法 来一条数据执行一次 对于数据的每条数据,和迭代数据的聚合的具体实现
                               @Override
                               public Tuple3<String, Integer, Timestamp> add(Tuple3<String, Integer, Timestamp> value, Tuple3<String, Integer, Timestamp> accumulator) 
                                   System.out.println("------add--------"+value);
                                   accumulator.f0=value.f0;  //类加器的第一个值等于第一个数的fo
                                   accumulator.f1+=value.f1; //第二个值累加
                                   return accumulator;
                               
 
                               // 返回值  在窗口内满足2个,计算结束的时候执行一次  从累加器获取聚合的结果
                               @Override
                               public Tuple3<String, Integer, Timestamp> getResult(Tuple3<String, Integer, Timestamp> accumulator) 
                                   System.out.println("------getResult--------"+accumulator);
                                   return accumulator;
                               
 
                              //合并两个累加器,返回一个具有合并状态的累加器  一般不触发这个
                               @Override
                               public Tuple3<String, Integer, Timestamp> merge(Tuple3<String, Integer, Timestamp> a, Tuple3<String, Integer, Timestamp> b) 
                                   System.out.println("------merge--------"+a);
                                   return null;
                               
                           
                );
 
       data_aggregate.print();
       env.execute("execute");
 
    
 


输出:
------createAccumulator--------2020-10-20 20:52:43.177
------add--------(user1,1,2020-10-20 20:52:43.095)      --进来user1 分组后。组内只有一条user1数据  执行createAccumulator-->add (add是加的初始化的累加器)
 
------createAccumulator--------2020-10-20 20:52:44.179
------add--------(user2,2,2020-10-20 20:52:44.103)      --进来user2 分组后。组内只有一条user2数据   执行createAccumulator-->add(add是加的初始化的累加器)
 
 
------add--------(user1,1,2020-10-20 20:52:45.103)      --又进来user1 分组后 组内有两个user1  满足数量要求 执行add-->getresult  (add:第二个user1+(第一个user1+初始的累加器) )同时由于AggregateFunction是增量计算的。所以清空组内的数据,
------getResult--------(user1,2,2020-10-20 20:52:43.178)
(user1,2,2020-10-20 20:52:43.178)
 
 
------createAccumulator--------2020-10-20 20:52:46.189   --进来user4  组内只有一条user4数据   执行createAccumulator-->add(add是加的初始化的累加器)
------add--------(user4,4,2020-10-20 20:52:46.103)
 
------createAccumulator--------2020-10-20 20:52:47.195   --!!!注意由于上面已经进来了两个user1,输出了。由于AggregateFunction是增量计算的。所以前面两个输出后。该组内被清空了 此时是组第一个
------add--------(user1,1,2020-10-20 20:52:47.104)
 
------createAccumulator--------2020-10-20 20:52:48.2     --进来一个user6 组内只有一个user6   执行createAccumulator-->add(add是加的初始化的累加器)
------add--------(user6,6,2020-10-20 20:52:48.104)
 
------add--------(user1,1,2020-10-20 20:52:49.104)       --进来一个user1 此时组内有两个了 ,满足数量要求,就 add-->getresult输出。同时由于AggregateFunction是增量计算的。所以清空组内的数据,
------getResult--------(user1,2,2020-10-20 20:52:47.195)
(user1,2,2020-10-20 20:52:47.195)
 
 
------createAccumulator--------2020-10-20 20:52:50.109
------add--------(user8,8,2020-10-20 20:52:50.104)
 
------createAccumulator--------2020-10-20 20:52:51.114
------add--------(user1,1,2020-10-20 20:52:51.105)
 
------createAccumulator--------2020-10-20 20:52:52.119
------add--------(user10,10,2020-10-20 20:52:52.105)
 
------add--------(user1,1,2020-10-20 20:52:53.106)
------getResult--------(user1,2,2020-10-20 20:52:51.114)
(user1,2,2020-10-20 20:52:51.114)

以上是关于Flink AggregateFunction窗口函数,执行步骤流程与实例的主要内容,如果未能解决你的问题,请参考以下文章

Flink AggregateFunction窗口函数,merge何时执行

Flink AggregateFunction窗口函数,执行步骤流程与实例

Flink AggregateFunction窗口函数,执行步骤流程与实例

Flink AggregateFunction窗口函数,执行步骤流程与实例

Flink AggregateFunction窗口函数,自定义UDAF,UDF

Flink AggregateFunction窗口函数,自定义UDAF,UDF