Flink AggregateFunction窗口函数,执行步骤流程与实例
Posted 二十六画生的博客
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink AggregateFunction窗口函数,执行步骤流程与实例相关的知识,希望对你有一定的参考价值。
先keyby分组,使用计数窗口计算,实例:
package operator;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.util.Collector;
// Tuple4<Long,Long,Long,Timestamp>
public class AggregateFunction_2
public static void main(String[] args)throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Tuple3<String, Integer, Timestamp>> dataStream = env.addSource(new SourceFunction<Tuple3<String, Integer, Timestamp>>()
boolean runing = true;
@Override
public void run(SourceContext<Tuple3<String, Integer, Timestamp>> ctx) throws Exception
//ctx.collect(new Tuple3("user" , 2, new Timestamp(new Date().getTime())));
int i = 1;
while (runing)
Tuple3<String, Integer, Timestamp> t3;
Thread.sleep(1000);
if (i % 2 == 1) //判断
t3 = new Tuple3("user" + 1, 1, new Timestamp(new Date().getTime()));
else
t3 = new Tuple3("user" + i, i, new Timestamp(new Date().getTime()));
//System.out.println("=======");
//System.out.println(t3);
i = i + 1;
ctx.collect(t3);
/* 返回
user1 1
user2 2
user1 1
user4 4
user1 1
user6 6
*/
@Override
public void cancel()
runing = false;
);
/* DataStream dataStream = env.fromElements(
Tuple3.of("1",333,new Timestamp(new Date().getTime())),
Tuple3.of("2", 111,new Timestamp(new Date().getTime())),
Tuple3.of("1",222,new Timestamp(new Date().getTime())),
Tuple3.of("2",444,new Timestamp(new Date().getTime())),
Tuple3.of("9",444,new Timestamp(new Date().getTime())),
Tuple3.of("6", 555,new Timestamp(new Date().getTime())),
Tuple3.of("1", 555,new Timestamp(new Date().getTime()))
)
;
*/
//dataStream.print();
// 输入类型IN 累加器类型ACC 输出 out
DataStream data_aggregate =dataStream
// .timeWindowAll(Time.seconds(2))\\
.keyBy(0) //分组
.countWindow(2) //2个
// .sum(1);
.aggregate(new AggregateFunction<Tuple3<String, Integer, Timestamp>, Tuple3<String, Integer, Timestamp>, Tuple3<String, Integer, Timestamp>>()
@Override
// 初始化列累加器 .创建一个新的累加器,启动一个新的聚合,负责迭代状态的初始化
//来一条数据.相应组内只有一条数据时候执行一次
//如果原先有一条,那么新进来一条时候,就不执行了。直接执行add getresult
//累加器有点像是中间传递的东西
//user1+user1 通过累加器就是 acc_1=acc(初始化)+第一个user, acc=acc_1+第一个user1
//相加的结果都保留在累加器中。相当于一个寄存的地方
public Tuple3<String, Integer, Timestamp> createAccumulator()
System.out.println("------createAccumulator--------"+new Timestamp(new Date().getTime()));
return new Tuple3<>("",0,new Timestamp(new Date().getTime()));
//累加器的累加方法 来一条数据执行一次 对于数据的每条数据,和迭代数据的聚合的具体实现
@Override
public Tuple3<String, Integer, Timestamp> add(Tuple3<String, Integer, Timestamp> value, Tuple3<String, Integer, Timestamp> accumulator)
System.out.println("------add--------"+value);
accumulator.f0=value.f0; //类加器的第一个值等于第一个数的fo
accumulator.f1+=value.f1; //第二个值累加
return accumulator;
// 返回值 在窗口内满足2个,计算结束的时候执行一次 从累加器获取聚合的结果
@Override
public Tuple3<String, Integer, Timestamp> getResult(Tuple3<String, Integer, Timestamp> accumulator)
System.out.println("------getResult--------"+accumulator);
return accumulator;
//合并两个累加器,返回一个具有合并状态的累加器 一般不触发这个
@Override
public Tuple3<String, Integer, Timestamp> merge(Tuple3<String, Integer, Timestamp> a, Tuple3<String, Integer, Timestamp> b)
System.out.println("------merge--------"+a);
return null;
);
data_aggregate.print();
env.execute("execute");
输出:
------createAccumulator--------2020-10-20 20:52:43.177
------add--------(user1,1,2020-10-20 20:52:43.095) --进来user1 分组后。组内只有一条user1数据 执行createAccumulator-->add (add是加的初始化的累加器)
------createAccumulator--------2020-10-20 20:52:44.179
------add--------(user2,2,2020-10-20 20:52:44.103) --进来user2 分组后。组内只有一条user2数据 执行createAccumulator-->add(add是加的初始化的累加器)
------add--------(user1,1,2020-10-20 20:52:45.103) --又进来user1 分组后 组内有两个user1 满足数量要求 执行add-->getresult (add:第二个user1+(第一个user1+初始的累加器) )同时由于AggregateFunction是增量计算的。所以清空组内的数据,
------getResult--------(user1,2,2020-10-20 20:52:43.178)
(user1,2,2020-10-20 20:52:43.178)
------createAccumulator--------2020-10-20 20:52:46.189 --进来user4 组内只有一条user4数据 执行createAccumulator-->add(add是加的初始化的累加器)
------add--------(user4,4,2020-10-20 20:52:46.103)
------createAccumulator--------2020-10-20 20:52:47.195 --!!!注意由于上面已经进来了两个user1,输出了。由于AggregateFunction是增量计算的。所以前面两个输出后。该组内被清空了 此时是组第一个
------add--------(user1,1,2020-10-20 20:52:47.104)
------createAccumulator--------2020-10-20 20:52:48.2 --进来一个user6 组内只有一个user6 执行createAccumulator-->add(add是加的初始化的累加器)
------add--------(user6,6,2020-10-20 20:52:48.104)
------add--------(user1,1,2020-10-20 20:52:49.104) --进来一个user1 此时组内有两个了 ,满足数量要求,就 add-->getresult输出。同时由于AggregateFunction是增量计算的。所以清空组内的数据,
------getResult--------(user1,2,2020-10-20 20:52:47.195)
(user1,2,2020-10-20 20:52:47.195)
------createAccumulator--------2020-10-20 20:52:50.109
------add--------(user8,8,2020-10-20 20:52:50.104)
------createAccumulator--------2020-10-20 20:52:51.114
------add--------(user1,1,2020-10-20 20:52:51.105)
------createAccumulator--------2020-10-20 20:52:52.119
------add--------(user10,10,2020-10-20 20:52:52.105)
------add--------(user1,1,2020-10-20 20:52:53.106)
------getResult--------(user1,2,2020-10-20 20:52:51.114)
(user1,2,2020-10-20 20:52:51.114)
以上是关于Flink AggregateFunction窗口函数,执行步骤流程与实例的主要内容,如果未能解决你的问题,请参考以下文章
Flink AggregateFunction窗口函数,merge何时执行
Flink AggregateFunction窗口函数,执行步骤流程与实例
Flink AggregateFunction窗口函数,执行步骤流程与实例
Flink AggregateFunction窗口函数,执行步骤流程与实例